You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2021/04/20 13:56:31 UTC

[spark] branch master updated: [SPARK-34877][CORE][YARN] Add the code change for adding the Spark AM log link in spark UI

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e64b4f  [SPARK-34877][CORE][YARN] Add the code change for adding the Spark AM log link in spark UI
1e64b4f is described below

commit 1e64b4fa27882f81989bda2aefdda66343625436
Author: SaurabhChawla <s....@gmail.com>
AuthorDate: Tue Apr 20 08:56:07 2021 -0500

    [SPARK-34877][CORE][YARN] Add the code change for adding the Spark AM log link in spark UI
    
    ### What changes were proposed in this pull request?
    On Running Spark job with yarn and deployment mode as client, Spark Driver and Spark Application master launch in two separate containers. In various scenarios there is need to see Spark Application master logs to see the resource allocation, Decommissioning status and other information shared between yarn RM and Spark Application master.
    
    In Cluster mode Spark driver and Spark AM is on same container, So Log link of the driver already there to see the logs in Spark UI
    
    This PR is for adding the spark AM log link for spark job running in the client mode for yarn. Instead of searching the container id and then find the logs. We can directly check in the Spark UI
    
    This change is only for showing the AM log links in the Client mode when resource manager is yarn.
    
    ### Why are the changes needed?
    Till now the only way to check this by finding the container id of the AM and check the logs either using Yarn utility or Yarn RM Application History server.
    
    This PR is for adding the spark AM log link for spark job running in the client mode for yarn. Instead of searching the container id and then find the logs. We can directly check in the Spark UI
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added the unit test also checked the Spark UI
    **In Yarn Client mode**
    Before Change
    
    ![image](https://user-images.githubusercontent.com/34540906/112644861-e1733200-8e6b-11eb-939b-c76ca9902a4e.png)
    
    After the Change - The AM info is there
    
    ![image](https://user-images.githubusercontent.com/34540906/115264198-b7075280-a153-11eb-98f3-2aed66ffad2a.png)
    
    AM Log
    
    ![image](https://user-images.githubusercontent.com/34540906/112645680-c0f7a780-8e6c-11eb-8b82-4ccc0aee927b.png)
    
    **In Yarn Cluster Mode**  - The AM log link will not be there
    
    ![image](https://user-images.githubusercontent.com/34540906/112649512-86900980-8e70-11eb-9b37-69d5c4b53ffa.png)
    
    Closes #31974 from SaurabhChawla100/SPARK-34877.
    
    Authored-by: SaurabhChawla <s....@gmail.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../spark/ui/static/executorspage-template.html    | 17 ++++++++
 .../org/apache/spark/ui/static/executorspage.js    | 50 +++++++++++++++++++++-
 .../resources/org/apache/spark/ui/static/utils.js  | 22 ++++++++++
 .../scheduler/MiscellaneousProcessDetails.scala    | 30 +++++++++++++
 .../org/apache/spark/scheduler/SparkListener.scala |  4 ++
 .../cluster/CoarseGrainedClusterMessage.scala      |  7 ++-
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  5 +++
 .../apache/spark/status/AppStatusListener.scala    | 22 ++++++++++
 .../org/apache/spark/status/AppStatusStore.scala   | 10 +++++
 .../scala/org/apache/spark/status/LiveEntity.scala | 24 +++++++++++
 .../status/api/v1/OneApplicationResource.scala     |  4 ++
 .../scala/org/apache/spark/status/api/v1/api.scala |  9 ++++
 .../scala/org/apache/spark/status/storeTypes.scala | 13 ++++++
 .../miscellaneous_process_expectation.json         | 11 +++++
 .../spark-events/application_1555004656427_0144    |  1 +
 .../spark/deploy/history/HistoryServerSuite.scala  |  3 +-
 .../spark/status/AppStatusListenerSuite.scala      | 38 ++++++++++++++++
 .../spark/deploy/yarn/ApplicationMaster.scala      | 15 +++++++
 .../scheduler/cluster/YarnSchedulerBackend.scala   |  6 +++
 19 files changed, 288 insertions(+), 3 deletions(-)

diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
index 7a064dc..be6d7bc 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
@@ -134,4 +134,21 @@ limitations under the License.
         </tbody>
       </table>
   </div>
+
+  <div class="container-fluid active-process-container">
+    <h4 class="title-table">Miscellaneous Process</h4>
+    <table id="active-process-table" class="table table-striped compact cell-border" style="width: 100%">
+      <thead>
+      <tr>
+        <th>Process ID</th>
+        <th>Address</th>
+        <th>Status</th>
+        <th>Cores</th>
+        <th>Logs</th>
+      </tr>
+      </thead>
+      <tbody>
+      </tbody>
+    </table>
+  </div>
 </script>
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
index c8dc619..9133ef8 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
@@ -39,6 +39,13 @@ function formatStatus(status, type, row) {
     return "Dead"
 }
 
+function formatProcessStatus(activeStatus) {
+    if (activeStatus) {
+        return "Active"
+    }
+    return "Dead"
+}
+
 function formatResourceCells(resources) {
     var result = ""
     var count = 0
@@ -548,7 +555,48 @@ $(document).ready(function () {
                 execDataTable.column('executorLogsCol:name').visible(logsExist(response));
                 execDataTable.column('threadDumpCol:name').visible(getThreadDumpEnabled());
                 $('#active-executors [data-toggle="tooltip"]').tooltip();
-    
+
+                 // This section should be visible once API gives the response.
+                 $('.active-process-container').hide()
+                 var endPoint = createRESTEndPointForMiscellaneousProcess(appId);
+                 $.getJSON(endPoint, function( response, status, jqXHR ) {
+                    if (response.length) {
+                        var processSummaryResponse = response;
+                        var processSummaryConf = {
+                            "data": processSummaryResponse,
+                            "columns": [{
+                                    data: "id"
+                                },
+                                {
+                                    data: "hostPort"
+                                },
+                                {
+                                    data: function(row) {
+                                        return formatProcessStatus(row.isActive);
+                                    }
+                                },
+                                {
+                                    data: "totalCores"
+                                },
+                                {
+                                    data: "processLogs",
+                                    render: formatLogsCells
+                                },
+                            ],
+                            "deferRender": true,
+                            "order": [
+                                [0, "asc"]
+                            ],
+                            "bAutoWidth": false,
+                            "oLanguage": {
+                                "sEmptyTable": "No data to show yet"
+                            }
+                        };
+                        $("#active-process-table").DataTable(processSummaryConf);
+                        $('.active-process-container').show()
+                    }
+                 });
+
                 var sumSelector = "#summary-execs-table";
                 var sumConf = {
                     "data": [activeSummary, deadSummary, totalSummary],
diff --git a/core/src/main/resources/org/apache/spark/ui/static/utils.js b/core/src/main/resources/org/apache/spark/ui/static/utils.js
index f4914f0..0026850 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/utils.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/utils.js
@@ -195,3 +195,25 @@ function createRESTEndPointForExecutorsPage(appId) {
     }
     return uiRoot + "/api/v1/applications/" + appId + "/allexecutors";
 }
+
+function createRESTEndPointForMiscellaneousProcess(appId) {
+    var words = document.baseURI.split('/');
+    var ind = words.indexOf("proxy");
+    if (ind > 0) {
+        var appId = words[ind + 1];
+        var newBaseURI = words.slice(0, ind + 2).join('/');
+        return newBaseURI + "/api/v1/applications/" + appId + "/allmiscellaneousprocess";
+    }
+    ind = words.indexOf("history");
+    if (ind > 0) {
+        var appId = words[ind + 1];
+        var attemptId = words[ind + 2];
+        var newBaseURI = words.slice(0, ind).join('/');
+        if (isNaN(attemptId)) {
+            return newBaseURI + "/api/v1/applications/" + appId + "/allmiscellaneousprocess";
+        } else {
+            return newBaseURI + "/api/v1/applications/" + appId + "/" + attemptId + "/allmiscellaneousprocess";
+        }
+    }
+    return uiRoot + "/api/v1/applications/" + appId + "/allmiscellaneousprocess";
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MiscellaneousProcessDetails.scala b/core/src/main/scala/org/apache/spark/scheduler/MiscellaneousProcessDetails.scala
new file mode 100644
index 0000000..0e5fe0a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/MiscellaneousProcessDetails.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Stores information about an Miscellaneous Process to pass from the scheduler to SparkListeners.
+ */
+
+@DeveloperApi
+class MiscellaneousProcessDetails(
+    val hostPort: String,
+    val cores: Int,
+    val logUrlInfo: Map[String, String]) extends Serializable
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index c0fa1cf..d00866e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -227,6 +227,10 @@ case class SparkListenerUnschedulableTaskSetRemoved(
 @DeveloperApi
 case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent
 
+@DeveloperApi
+case class SparkListenerMiscellaneousProcessAdded(time: Long, processId: String,
+    info: MiscellaneousProcessDetails) extends SparkListenerEvent
+
 /**
  * Periodic updates from executors.
  * @param execId executor id
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index a6f52f9..66ac40f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
 import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.scheduler.ExecutorLossReason
+import org.apache.spark.scheduler.{ExecutorLossReason, MiscellaneousProcessDetails}
 import org.apache.spark.util.SerializableBuffer
 
 private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
@@ -124,6 +124,11 @@ private[spark] object CoarseGrainedClusterMessages {
 
   case class RegisterClusterManager(am: RpcEndpointRef) extends CoarseGrainedClusterMessage
 
+  // Send Miscellaneous Process information to the driver
+  case class MiscellaneousProcessAdded(
+      time: Long, processId: String, info: MiscellaneousProcessDetails)
+    extends CoarseGrainedClusterMessage
+
   // Used by YARN's client mode AM to retrieve the current set of delegation tokens.
   object RetrieveDelegationTokens extends CoarseGrainedClusterMessage
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b44f677..3c121c5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -213,6 +213,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           data.freeCores = data.totalCores
         }
         makeOffers(executorId)
+
+      case MiscellaneousProcessAdded(time: Long,
+          processId: String, info: MiscellaneousProcessDetails) =>
+        listenerBus.post(SparkListenerMiscellaneousProcessAdded(time, processId, info))
+
       case e =>
         logError(s"Received unexpected message. ${e}")
     }
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 4245243..7c403c4 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -78,6 +78,7 @@ private[spark] class AppStatusListener(
   private val liveRDDs = new HashMap[Int, LiveRDD]()
   private val pools = new HashMap[String, SchedulerPool]()
   private val liveResourceProfiles = new HashMap[Int, LiveResourceProfile]()
+  private[spark] val liveMiscellaneousProcess = new HashMap[String, LiveMiscellaneousProcess]()
 
   private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
   // Keep the active executor count as a separate variable to avoid having to do synchronization
@@ -107,6 +108,8 @@ private[spark] class AppStatusListener(
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
     case SparkListenerLogStart(version) => sparkVersion = version
+    case processInfoEvent: SparkListenerMiscellaneousProcessAdded =>
+      onMiscellaneousProcessAdded(processInfoEvent)
     case _ =>
   }
 
@@ -1124,6 +1127,13 @@ private[spark] class AppStatusListener(
     })
   }
 
+  private def getOrCreateOtherProcess(processId: String,
+      addTime: Long): LiveMiscellaneousProcess = {
+    liveMiscellaneousProcess.getOrElseUpdate(processId, {
+      new LiveMiscellaneousProcess(processId, addTime)
+    })
+  }
+
   private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = {
     val storageLevel = event.blockUpdatedInfo.storageLevel
     if (storageLevel.isValid) {
@@ -1353,4 +1363,16 @@ private[spark] class AppStatusListener(
     }
   }
 
+  private def onMiscellaneousProcessAdded(
+      processInfoEvent: SparkListenerMiscellaneousProcessAdded): Unit = {
+    val processInfo = processInfoEvent.info
+    val miscellaneousProcess =
+      getOrCreateOtherProcess(processInfoEvent.processId, processInfoEvent.time)
+    miscellaneousProcess.processLogs = processInfo.logUrlInfo
+    miscellaneousProcess.hostPort = processInfo.hostPort
+    miscellaneousProcess.isActive = true
+    miscellaneousProcess.totalCores = processInfo.cores
+    update(miscellaneousProcess, System.nanoTime())
+  }
+
 }
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 8d9c7cc..a8a16cd 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -92,6 +92,16 @@ private[spark] class AppStatusStore(
     filtered.asScala.map(_.info).filter(_.id != FALLBACK_BLOCK_MANAGER_ID.executorId).toSeq
   }
 
+  def miscellaneousProcessList(activeOnly: Boolean): Seq[v1.ProcessSummary] = {
+    val base = store.view(classOf[ProcessSummaryWrapper])
+    val filtered = if (activeOnly) {
+      base.index("active").reverse().first(true).last(true)
+    } else {
+      base
+    }
+    filtered.asScala.map(_.info).toSeq
+  }
+
   def executorSummary(executorId: String): v1.ExecutorSummary = {
     store.read(classOf[ExecutorSummaryWrapper], executorId).info
   }
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 77fc6bb..5af76e9 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -914,3 +914,27 @@ private class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] {
   }
 
 }
+
+private[spark] class LiveMiscellaneousProcess(val processId: String,
+    creationTime: Long) extends LiveEntity {
+
+  var hostPort: String = null
+  var isActive = true
+  var totalCores = 0
+  val addTime = new Date(creationTime)
+  var removeTime: Date = null
+  var processLogs = Map[String, String]()
+
+  override protected def doUpdate(): Any = {
+
+    val info = new v1.ProcessSummary(
+      processId,
+      hostPort,
+      isActive,
+      totalCores,
+      addTime,
+      Option(removeTime),
+      processLogs)
+    new ProcessSummaryWrapper(info)
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
index fb64ff5..ef17168 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala
@@ -79,6 +79,10 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
   @Path("allexecutors")
   def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false))
 
+  @GET
+  @Path("allmiscellaneousprocess")
+  def allProcessList(): Seq[ProcessSummary] = withUI(_.store.miscellaneousProcessList(false))
+
   @Path("stages")
   def stages(): Class[StagesResource] = classOf[StagesResource]
 
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 89f1423..8c08232 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -486,3 +486,12 @@ case class ThreadStackTrace(
     val blockedByThreadId: Option[Long],
     val blockedByLock: String,
     val holdingLocks: Seq[String])
+
+class ProcessSummary private[spark](
+     val id: String,
+     val hostPort: String,
+     val isActive: Boolean,
+     val totalCores: Int,
+     val addTime: Date,
+     val removeTime: Option[Date],
+     val processLogs: Map[String, String])
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index 081cca7..a34b1a5 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -514,3 +514,16 @@ private[spark] class CachedQuantile(
   def stage: Array[Int] = Array(stageId, stageAttemptId)
 
 }
+
+private[spark] class ProcessSummaryWrapper(val info: ProcessSummary) {
+
+  @JsonIgnore @KVIndex
+  private def id: String = info.id
+
+  @JsonIgnore @KVIndex("active")
+  private def active: Boolean = info.isActive
+
+  @JsonIgnore @KVIndex("host")
+  val host: String = Utils.parseHostPort(info.hostPort)._1
+
+}
diff --git a/core/src/test/resources/HistoryServerExpectations/miscellaneous_process_expectation.json b/core/src/test/resources/HistoryServerExpectations/miscellaneous_process_expectation.json
new file mode 100644
index 0000000..93d674a
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/miscellaneous_process_expectation.json
@@ -0,0 +1,11 @@
+[ {
+  "id" : "yarn-am",
+  "hostPort" : "192.168.1.19:8042",
+  "isActive" : true,
+  "totalCores" : 1,
+  "addTime" : "2021-04-19T15:35:50.218GMT",
+  "processLogs" : {
+    "stdout" : "http://192.168.1.19:8042/node/containerlogs/container_1555004656427_0144_01_000001/test/stdout?start=-4096",
+    "stderr" : "http://192.168.1.19:8042/node/containerlogs/container_1555004656427_0144_01_000001/test/stderr?start=-4096"
+  }
+} ]
diff --git a/core/src/test/resources/spark-events/application_1555004656427_0144 b/core/src/test/resources/spark-events/application_1555004656427_0144
index 91dae7e..6868fe6 100644
--- a/core/src/test/resources/spark-events/application_1555004656427_0144
+++ b/core/src/test/resources/spark-events/application_1555004656427_0144
@@ -2,6 +2,7 @@
 {"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"10.28.9.112","Port":37319},"Maximum Memory":384093388,"Timestamp":1562101345595,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":0}
 {"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/usr/lib/jvm/java-8-openjdk-amd64/jre","Java Version":"1.8.0_212 (Oracle Corporation)","Scala Version":"version 2.12.8"},"Spark Properties":{"spark.executor.resource.gpu.amount":"1","spark.yarn.dist.files":"file:///home/tgraves/getGpus","spark.driver.host":"10.28.9.112","spark.executor.resource.gpu.discoveryScript":"./getGpus","spark.eventLog.enabled":"true","spark.driver.port":"38895","spark.repl.class.uri":"spark [...]
 {"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"application_1555004656427_0144","Timestamp":1562101337180,"User":"tgraves"}
+{"Event":"org.apache.spark.scheduler.SparkListenerMiscellaneousProcessAdded","time":1618846550218,"processId":"yarn-am","info":{"hostPort":"192.168.1.19:8042","cores":1,"logUrlInfo":{"stdout":"http://192.168.1.19:8042/node/containerlogs/container_1555004656427_0144_01_000001/test/stdout?start=-4096","stderr":"http://192.168.1.19:8042/node/containerlogs/container_1555004656427_0144_01_000001/test/stderr?start=-4096"}}}
 {"Event":"SparkListenerExecutorAdded","Timestamp":1562101348551,"Executor ID":"1","Executor Info":{"Host":"tomg-test","Total Cores":1,"Log Urls":{"stdout":"http://tomg-test:8042/node/containerlogs/container_1555004656427_0144_01_000002/tgraves/stdout?start=-4096","stderr":"http://tomg-test:8042/node/containerlogs/container_1555004656427_0144_01_000002/tgraves/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"tomg-test:8042","USER":"tgraves","LOG_FILES":"stderr,stdout","NM_HTTP_PORT": [...]
 {"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"1","Host":"tomg-test","Port":44873},"Maximum Memory":384093388,"Timestamp":1562101348609,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":0}
 {"Event":"SparkListenerExecutorAdded","Timestamp":1562101349256,"Executor ID":"2","Executor Info":{"Host":"tomg-test","Total Cores":1,"Log Urls":{"stdout":"http://tomg-test:8042/node/containerlogs/container_1555004656427_0144_01_000003/tgraves/stdout?start=-4096","stderr":"http://tomg-test:8042/node/containerlogs/container_1555004656427_0144_01_000003/tgraves/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"tomg-test:8042","USER":"tgraves","LOG_FILES":"stderr,stdout","NM_HTTP_PORT": [...]
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 1de0844..5da5835 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -191,7 +191,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
 
     // Enable "spark.eventLog.logBlockUpdates.enabled", to get the storage information
     // in the history server.
-    "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
+    "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0",
+    "miscellaneous process" -> "applications/application_1555004656427_0144/allmiscellaneousprocess"
   )
 
   // run a bunch of characterization tests -- just verify the behavior is the same as what is saved
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index a251c16..424b328 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1796,6 +1796,44 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     }
   }
 
+  test("SPARK-34877 - check YarnAmInfoEvent is populated correctly") {
+    def checkInfoPopulated(listener: AppStatusListener,
+       logUrlMap: Map[String, String], processId: String): Unit = {
+      val yarnAmInfo = listener.liveMiscellaneousProcess.get(processId)
+      assert(yarnAmInfo.isDefined)
+      yarnAmInfo.foreach { info =>
+        assert(info.processId == processId)
+        assert(info.isActive)
+        assert(info.processLogs == logUrlMap)
+      }
+      check[ProcessSummaryWrapper](processId) { process =>
+        assert(process.info.id === processId)
+        assert(process.info.isActive)
+        assert(process.info.processLogs == logUrlMap)
+      }
+    }
+    val processId = "yarn-am"
+    val listener = new AppStatusListener(store, conf, true)
+    var stdout = "http:yarnAmHost:2453/con1/stdout"
+    var stderr = "http:yarnAmHost:2453/con2/stderr"
+    var logUrlMap: Map[String, String] = Map("stdout" -> stdout,
+      "stderr" -> stderr)
+    var hostport = "yarnAmHost:2453"
+    var info = new MiscellaneousProcessDetails(hostport, 1, logUrlMap)
+    listener.onOtherEvent(SparkListenerMiscellaneousProcessAdded(123678L, processId, info))
+    checkInfoPopulated(listener, logUrlMap, processId)
+
+    // Launch new AM in case of failure
+    // New container entry will be updated in this scenario
+    stdout = "http:yarnAmHost:2451/con1/stdout"
+    stderr = "http:yarnAmHost:2451/con2/stderr"
+    logUrlMap = Map("stdout" -> stdout,
+      "stderr" -> stderr)
+    hostport = "yarnAmHost:2451"
+    info = new MiscellaneousProcessDetails(hostport, 1, logUrlMap)
+    listener.onOtherEvent(SparkListenerMiscellaneousProcessAdded(123678L, processId, info))
+    checkInfoPopulated(listener, logUrlMap, processId)
+  }
 
   private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber)
 
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e7377e0..ebe1286 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -50,6 +50,7 @@ import org.apache.spark.internal.config.UI._
 import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc._
+import org.apache.spark.scheduler.MiscellaneousProcessDetails
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util._
@@ -65,6 +66,11 @@ private[spark] class ApplicationMaster(
   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
   // optimal as more containers are available. Might need to handle this better.
 
+  private def extractLogUrls: Map[String, String] = {
+    YarnContainerInfoHelper.getLogUrls(SparkHadoopUtil.
+      newConfiguration(sparkConf), None).getOrElse(Map())
+  }
+
   private val appAttemptId =
     if (System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) != null) {
       YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId()
@@ -776,6 +782,15 @@ private[spark] class ApplicationMaster(
 
     override def onStart(): Unit = {
       driver.send(RegisterClusterManager(self))
+      // if deployment mode for yarn Application is client
+      // then send the AM Log Info to spark driver
+      if (!isClusterMode) {
+        val hostPort = YarnContainerInfoHelper.getNodeManagerHttpAddress(None)
+        val yarnAMID = "yarn-am"
+        val info = new MiscellaneousProcessDetails(hostPort,
+          sparkConf.get(AM_CORES), extractLogUrls)
+        driver.send(MiscellaneousProcessAdded(System.currentTimeMillis(), yarnAMID, info))
+      }
     }
 
     override def receive: PartialFunction[Any, Unit] = {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 22002bb..c5c4594 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -333,6 +333,12 @@ private[spark] abstract class YarnSchedulerBackend(
           logWarning(s"Requesting driver to remove executor $executorId for reason $reason")
           driverEndpoint.send(r)
         }
+
+      // In case of yarn Miscellaneous Process is Spark AM Container
+      // Launched for the deploy mode client
+      case processInfo @ MiscellaneousProcessAdded(_, _, _) =>
+        logDebug(s"Sending the Spark AM info for yarn client mode")
+        driverEndpoint.send(processInfo)
     }
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org