You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/06/23 07:49:08 UTC

[spark] branch master updated: [SPARK-44153][CORE][UI] Support `Heap Histogram` column in `Executors` tab

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cd69d4dd18c [SPARK-44153][CORE][UI] Support `Heap Histogram` column in `Executors` tab
cd69d4dd18c is described below

commit cd69d4dd18cfaccf58bf64dde6268f7ea1d4415b
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Fri Jun 23 00:48:58 2023 -0700

    [SPARK-44153][CORE][UI] Support `Heap Histogram` column in `Executors` tab
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `Heap Histogram` column in `Executor` tab.
    
    ### Why are the changes needed?
    
    Like `Thread Dump` column, this is very helpful when we analyze executor live JVM status.
    
    ![Screenshot 2023-06-22 at 8 37 55 PM](https://github.com/apache/spark/assets/9700541/741c8deb-23ff-463d-8b1e-7c2e53d0b59f)
    
    ![Screenshot 2023-06-22 at 8 38 34 PM](https://github.com/apache/spark/assets/9700541/93f77f42-48b5-41fa-94ab-ea675f576331)
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but this is a new column and we provide `spark.ui.heapHistogramEnabled` configuration like `spark.ui.threadDumpsEnabled`.
    
    ### How was this patch tested?
    
    Manual review.
    
    Closes #41709 from dongjoon-hyun/SPARK-44153.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/ui/static/executorspage-template.html    |  1 +
 .../org/apache/spark/ui/static/executorspage.js    | 19 ++++-
 .../main/scala/org/apache/spark/SparkContext.scala | 26 ++++++-
 .../org/apache/spark/internal/config/UI.scala      |  7 ++
 .../spark/storage/BlockManagerMessages.scala       |  5 ++
 .../storage/BlockManagerStorageEndpoint.scala      |  3 +
 .../spark/ui/exec/ExecutorHeapHistogramPage.scala  | 89 ++++++++++++++++++++++
 .../org/apache/spark/ui/exec/ExecutorsTab.scala    | 11 ++-
 .../main/scala/org/apache/spark/util/Utils.scala   | 17 +++++
 9 files changed, 173 insertions(+), 5 deletions(-)

diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
index 37d56a06ded..ecda34b545a 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
@@ -128,6 +128,7 @@ limitations under the License.
               Shuffle Write</span></th>
           <th>Logs</th>
           <th>Thread Dump</th>
+          <th>Heap Histogram</th>
           <th>Exec Loss Reason</th>
         </tr>
         </thead>
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
index 92d75c18e49..b52ece87ba1 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
@@ -20,17 +20,25 @@
 /* global jQuery, setDataTableDefaults */
 
 var threadDumpEnabled = false;
+var heapHistogramEnabled = false;
 
 /* eslint-disable no-unused-vars */
 function setThreadDumpEnabled(val) {
   threadDumpEnabled = val;
 }
+function setHeapHistogramEnabled(val) {
+  heapHistogramEnabled = val;
+}
 /* eslint-enable no-unused-vars */
 
 function getThreadDumpEnabled() {
   return threadDumpEnabled;
 }
 
+function getHeapHistogramEnabled() {
+  return heapHistogramEnabled;
+}
+
 function formatLossReason(removeReason) {
   if (removeReason) {
     return removeReason
@@ -551,6 +559,12 @@ $(document).ready(function () {
                 return type === 'display' ? ("<a href='threadDump/?executorId=" + data + "'>Thread Dump</a>" ) : data;
               }
             },
+            {
+              name: 'heapHistogramCol',
+              data: 'id', render: function (data, type) {
+                return type === 'display' ? ("<a href='heapHistogram/?executorId=" + data + "'>Heap Histogram</a>") : data;
+              }
+            },
             {
               data: 'removeReason',
               render: formatLossReason
@@ -566,7 +580,7 @@ $(document).ready(function () {
             {"visible": false, "targets": 10},
             {"visible": false, "targets": 13},
             {"visible": false, "targets": 14},
-            {"visible": false, "targets": 25}
+            {"visible": false, "targets": 26}
           ],
           "deferRender": true
         };
@@ -574,6 +588,7 @@ $(document).ready(function () {
         execDataTable = $(selector).DataTable(conf);
         execDataTable.column('executorLogsCol:name').visible(logsExist(response));
         execDataTable.column('threadDumpCol:name').visible(getThreadDumpEnabled());
+        execDataTable.column('heapHistogramCol:name').visible(getHeapHistogramEnabled());
         $('#active-executors [data-toggle="tooltip"]').tooltip();
     
         // This section should be visible once API gives the response.
@@ -721,7 +736,7 @@ $(document).ready(function () {
           "<div id='direct_mapped_pool_memory' class='direct_mapped_pool_memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='10'> Peak Pool Memory Direct / Mapped</div>" +
           "<div id='extra_resources' class='resources-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='13'> Resources</div>" +
           "<div id='resource_prof_id' class='resource-prof-id-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='14'> Resource Profile Id</div>" +
-          "<div id='exec_loss_reason' class='exec-loss-reason-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='25'> Exec Loss Reason</div>" +
+          "<div id='exec_loss_reason' class='exec-loss-reason-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='26'> Exec Loss Reason</div>" +
           "</div>");
 
         reselectCheckboxesBasedOnTaskTableState();
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c32c674d64e..5aff6682bfd 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -68,7 +68,7 @@ import org.apache.spark.shuffle.api.ShuffleDriverComponents
 import org.apache.spark.status.{AppStatusSource, AppStatusStore}
 import org.apache.spark.status.api.v1.ThreadStackTrace
 import org.apache.spark.storage._
-import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
+import org.apache.spark.storage.BlockManagerMessages.{TriggerHeapHistogram, TriggerThreadDump}
 import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
 import org.apache.spark.util._
 import org.apache.spark.util.logging.DriverLogger
@@ -750,6 +750,30 @@ class SparkContext(config: SparkConf) extends Logging {
     }
   }
 
+  /**
+   * Called by the web UI to obtain executor heap histogram.
+   */
+  private[spark] def getExecutorHeapHistogram(executorId: String): Option[Array[String]] = {
+    try {
+      if (executorId == SparkContext.DRIVER_IDENTIFIER) {
+        Some(Utils.getHeapHistogram())
+      } else {
+        env.blockManager.master.getExecutorEndpointRef(executorId) match {
+          case Some(endpointRef) =>
+            Some(endpointRef.askSync[Array[String]](TriggerHeapHistogram))
+          case None =>
+            logWarning(s"Executor $executorId might already have stopped and " +
+              "can not request heap histogram from it.")
+            None
+        }
+      }
+    } catch {
+      case e: Exception =>
+        logError(s"Exception getting heap histogram from executor $executorId", e)
+        None
+    }
+  }
+
   private[spark] def getLocalProperties: Properties = localProperties.get()
 
   private[spark] def setLocalProperties(props: Properties): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
index a32e60de2a4..d0db5a90854 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
@@ -20,6 +20,8 @@ package org.apache.spark.internal.config
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
+import org.apache.commons.lang3.{JavaVersion, SystemUtils}
+
 import org.apache.spark.network.util.ByteUnit
 
 private[spark] object UI {
@@ -97,6 +99,11 @@ private[spark] object UI {
     .booleanConf
     .createWithDefault(true)
 
+  val UI_HEAP_HISTOGRAM_ENABLED = ConfigBuilder("spark.ui.heapHistogramEnabled")
+    .version("3.5.0")
+    .booleanConf
+    .createWithDefault(SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11))
+
   val UI_PROMETHEUS_ENABLED = ConfigBuilder("spark.ui.prometheus.enabled")
     .internal()
     .doc("Expose executor metrics at /metrics/executors/prometheus. " +
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 24d0f239f73..7fb145556a1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -56,6 +56,11 @@ private[spark] object BlockManagerMessages {
    */
   case object TriggerThreadDump extends ToBlockManagerMasterStorageEndpoint
 
+  /**
+   * Driver to Executor message to get a heap histogram.
+   */
+  case object TriggerHeapHistogram extends ToBlockManagerMasterStorageEndpoint
+
   //////////////////////////////////////////////////////////////////////////////////
   // Messages from storage endpoints to the master.
   //////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
index d4c631e59a1..476be80e67d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
@@ -78,6 +78,9 @@ class BlockManagerStorageEndpoint(
     case TriggerThreadDump =>
       context.reply(Utils.getThreadDump())
 
+    case TriggerHeapHistogram =>
+      context.reply(Utils.getHeapHistogram())
+
     case ReplicateBlock(blockId, replicas, maxReplicas) =>
       context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas))
 
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorHeapHistogramPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorHeapHistogramPage.scala
new file mode 100644
index 00000000000..6964711a788
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorHeapHistogramPage.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.exec
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{Node, Text}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.ui.{SparkUITab, UIUtils, WebUIPage}
+
+private[ui] class ExecutorHeapHistogramPage(
+    parent: SparkUITab,
+    sc: Option[SparkContext]) extends WebUIPage("heapHistogram") {
+
+  // Match the lines containing object informations
+  val pattern = """\s*([0-9]+):\s+([0-9]+)\s+([0-9]+)\s+(\S+)(.*)""".r
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val executorId = Option(request.getParameter("executorId")).map { executorId =>
+      UIUtils.decodeURLParameter(executorId)
+    }.getOrElse {
+      throw new IllegalArgumentException(s"Missing executorId parameter")
+    }
+    val time = System.currentTimeMillis()
+    val maybeHeapHistogram = sc.get.getExecutorHeapHistogram(executorId)
+
+    val content = maybeHeapHistogram.map { heapHistogram =>
+      val rows = heapHistogram.map { row =>
+        row match {
+          case pattern(rank, instances, bytes, name, module) =>
+            <tr class="accordion-heading">
+              <td>{rank}</td>
+              <td>{instances}</td>
+              <td>{bytes}</td>
+              <td>{name}</td>
+              <td>{module}</td>
+            </tr>
+          case pattern(rank, instances, bytes, name) =>
+            <tr class="accordion-heading">
+              <td>{rank}</td>
+              <td>{instances}</td>
+              <td>{bytes}</td>
+              <td>{name}</td>
+              <td></td>
+            </tr>
+          case _ =>
+            // Ignore the first two lines and the last line
+            //
+            //  num     #instances         #bytes  class name (module)
+            // -------------------------------------------------------
+            // ...
+            // Total       1267867       72845688
+        }
+      }
+      <div class="row">
+        <div class="col-12">
+          <p>Updated at {UIUtils.formatDate(time)}</p>
+          <table class={UIUtils.TABLE_CLASS_STRIPED + " accordion-group" + " sortable"}>
+            <thead>
+              <th>Rank</th>
+              <th>Instances</th>
+              <th>Bytes</th>
+              <th>Class Name</th>
+              <th>Module</th>
+            </thead>
+            <tbody>{rows}</tbody>
+          </table>
+        </div>
+      </div>
+    }.getOrElse(Text("Error fetching heap histogram"))
+    UIUtils.headerSparkPage(request, s"Heap Histogram for Executor $executorId", content, parent)
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 7a857e57cee..b92c5e67989 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -31,18 +31,24 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
   private def init(): Unit = {
     val threadDumpEnabled =
       parent.sc.isDefined && parent.conf.get(UI_THREAD_DUMPS_ENABLED)
+    val heapHistogramEnabled =
+      parent.sc.isDefined && parent.conf.get(UI_HEAP_HISTOGRAM_ENABLED)
 
-    attachPage(new ExecutorsPage(this, threadDumpEnabled))
+    attachPage(new ExecutorsPage(this, threadDumpEnabled, heapHistogramEnabled))
     if (threadDumpEnabled) {
       attachPage(new ExecutorThreadDumpPage(this, parent.sc))
     }
+    if (heapHistogramEnabled) {
+      attachPage(new ExecutorHeapHistogramPage(this, parent.sc))
+    }
   }
 
 }
 
 private[ui] class ExecutorsPage(
     parent: SparkUITab,
-    threadDumpEnabled: Boolean)
+    threadDumpEnabled: Boolean,
+    heapHistogramEnabled: Boolean)
   extends WebUIPage("") {
 
   def render(request: HttpServletRequest): Seq[Node] = {
@@ -52,6 +58,7 @@ private[ui] class ExecutorsPage(
         <script src={UIUtils.prependBaseUri(request, "/static/utils.js")}></script> ++
         <script src={UIUtils.prependBaseUri(request, "/static/executorspage.js")}></script> ++
         <script>setThreadDumpEnabled({threadDumpEnabled})</script>
+        <script>setHeapHistogramEnabled({heapHistogramEnabled})</script>
       }
 
     UIUtils.headerSparkPage(request, "Executors", content, parent, useDataTables = true)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 6e8f2c496e8..ee74eacb84f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2287,6 +2287,23 @@ private[spark] object Utils extends Logging with SparkClassUtils {
     }.map(threadInfoToThreadStackTrace)
   }
 
+  /** Return a heap dump. Used to capture dumps for the web UI */
+  def getHeapHistogram(): Array[String] = {
+    // From Java 9+, we can use 'ProcessHandle.current().pid()'
+    val pid = getProcessName().split("@").head
+    val builder = new ProcessBuilder("jmap", "-histo:live", pid)
+    builder.redirectErrorStream(true)
+    val p = builder.start()
+    val r = new BufferedReader(new InputStreamReader(p.getInputStream()))
+    val rows = ArrayBuffer.empty[String]
+    var line = ""
+    while (line != null) {
+      if (line.nonEmpty) rows += line
+      line = r.readLine()
+    }
+    rows.toArray
+  }
+
   def getThreadDumpForThread(threadId: Long): Option[ThreadStackTrace] = {
     if (threadId <= 0) {
       None


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org