You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/06/23 07:49:08 UTC
[spark] branch master updated: [SPARK-44153][CORE][UI] Support `Heap Histogram` column in `Executors` tab
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cd69d4dd18c [SPARK-44153][CORE][UI] Support `Heap Histogram` column in `Executors` tab
cd69d4dd18c is described below
commit cd69d4dd18cfaccf58bf64dde6268f7ea1d4415b
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Fri Jun 23 00:48:58 2023 -0700
[SPARK-44153][CORE][UI] Support `Heap Histogram` column in `Executors` tab
### What changes were proposed in this pull request?
This PR aims to support `Heap Histogram` column in `Executor` tab.
### Why are the changes needed?
Like `Thread Dump` column, this is very helpful when we analyze executor live JVM status.
![Screenshot 2023-06-22 at 8 37 55 PM](https://github.com/apache/spark/assets/9700541/741c8deb-23ff-463d-8b1e-7c2e53d0b59f)
![Screenshot 2023-06-22 at 8 38 34 PM](https://github.com/apache/spark/assets/9700541/93f77f42-48b5-41fa-94ab-ea675f576331)
### Does this PR introduce _any_ user-facing change?
Yes, but this is a new column and we provide `spark.ui.heapHistogramEnabled` configuration like `spark.ui.threadDumpsEnabled`.
### How was this patch tested?
Manual review.
Closes #41709 from dongjoon-hyun/SPARK-44153.
Authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/ui/static/executorspage-template.html | 1 +
.../org/apache/spark/ui/static/executorspage.js | 19 ++++-
.../main/scala/org/apache/spark/SparkContext.scala | 26 ++++++-
.../org/apache/spark/internal/config/UI.scala | 7 ++
.../spark/storage/BlockManagerMessages.scala | 5 ++
.../storage/BlockManagerStorageEndpoint.scala | 3 +
.../spark/ui/exec/ExecutorHeapHistogramPage.scala | 89 ++++++++++++++++++++++
.../org/apache/spark/ui/exec/ExecutorsTab.scala | 11 ++-
.../main/scala/org/apache/spark/util/Utils.scala | 17 +++++
9 files changed, 173 insertions(+), 5 deletions(-)
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
index 37d56a06ded..ecda34b545a 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html
@@ -128,6 +128,7 @@ limitations under the License.
Shuffle Write</span></th>
<th>Logs</th>
<th>Thread Dump</th>
+ <th>Heap Histogram</th>
<th>Exec Loss Reason</th>
</tr>
</thead>
diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
index 92d75c18e49..b52ece87ba1 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
@@ -20,17 +20,25 @@
/* global jQuery, setDataTableDefaults */
var threadDumpEnabled = false;
+var heapHistogramEnabled = false;
/* eslint-disable no-unused-vars */
function setThreadDumpEnabled(val) {
threadDumpEnabled = val;
}
+function setHeapHistogramEnabled(val) {
+ heapHistogramEnabled = val;
+}
/* eslint-enable no-unused-vars */
function getThreadDumpEnabled() {
return threadDumpEnabled;
}
+function getHeapHistogramEnabled() {
+ return heapHistogramEnabled;
+}
+
function formatLossReason(removeReason) {
if (removeReason) {
return removeReason
@@ -551,6 +559,12 @@ $(document).ready(function () {
return type === 'display' ? ("<a href='threadDump/?executorId=" + data + "'>Thread Dump</a>" ) : data;
}
},
+ {
+ name: 'heapHistogramCol',
+ data: 'id', render: function (data, type) {
+ return type === 'display' ? ("<a href='heapHistogram/?executorId=" + data + "'>Heap Histogram</a>") : data;
+ }
+ },
{
data: 'removeReason',
render: formatLossReason
@@ -566,7 +580,7 @@ $(document).ready(function () {
{"visible": false, "targets": 10},
{"visible": false, "targets": 13},
{"visible": false, "targets": 14},
- {"visible": false, "targets": 25}
+ {"visible": false, "targets": 26}
],
"deferRender": true
};
@@ -574,6 +588,7 @@ $(document).ready(function () {
execDataTable = $(selector).DataTable(conf);
execDataTable.column('executorLogsCol:name').visible(logsExist(response));
execDataTable.column('threadDumpCol:name').visible(getThreadDumpEnabled());
+ execDataTable.column('heapHistogramCol:name').visible(getHeapHistogramEnabled());
$('#active-executors [data-toggle="tooltip"]').tooltip();
// This section should be visible once API gives the response.
@@ -721,7 +736,7 @@ $(document).ready(function () {
"<div id='direct_mapped_pool_memory' class='direct_mapped_pool_memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='10'> Peak Pool Memory Direct / Mapped</div>" +
"<div id='extra_resources' class='resources-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='13'> Resources</div>" +
"<div id='resource_prof_id' class='resource-prof-id-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='14'> Resource Profile Id</div>" +
- "<div id='exec_loss_reason' class='exec-loss-reason-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='25'> Exec Loss Reason</div>" +
+ "<div id='exec_loss_reason' class='exec-loss-reason-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='26'> Exec Loss Reason</div>" +
"</div>");
reselectCheckboxesBasedOnTaskTableState();
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c32c674d64e..5aff6682bfd 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -68,7 +68,7 @@ import org.apache.spark.shuffle.api.ShuffleDriverComponents
import org.apache.spark.status.{AppStatusSource, AppStatusStore}
import org.apache.spark.status.api.v1.ThreadStackTrace
import org.apache.spark.storage._
-import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
+import org.apache.spark.storage.BlockManagerMessages.{TriggerHeapHistogram, TriggerThreadDump}
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
import org.apache.spark.util._
import org.apache.spark.util.logging.DriverLogger
@@ -750,6 +750,30 @@ class SparkContext(config: SparkConf) extends Logging {
}
}
+ /**
+ * Called by the web UI to obtain executor heap histogram.
+ */
+ private[spark] def getExecutorHeapHistogram(executorId: String): Option[Array[String]] = {
+ try {
+ if (executorId == SparkContext.DRIVER_IDENTIFIER) {
+ Some(Utils.getHeapHistogram())
+ } else {
+ env.blockManager.master.getExecutorEndpointRef(executorId) match {
+ case Some(endpointRef) =>
+ Some(endpointRef.askSync[Array[String]](TriggerHeapHistogram))
+ case None =>
+ logWarning(s"Executor $executorId might already have stopped and " +
+ "can not request heap histogram from it.")
+ None
+ }
+ }
+ } catch {
+ case e: Exception =>
+ logError(s"Exception getting heap histogram from executor $executorId", e)
+ None
+ }
+ }
+
private[spark] def getLocalProperties: Properties = localProperties.get()
private[spark] def setLocalProperties(props: Properties): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
index a32e60de2a4..d0db5a90854 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
@@ -20,6 +20,8 @@ package org.apache.spark.internal.config
import java.util.Locale
import java.util.concurrent.TimeUnit
+import org.apache.commons.lang3.{JavaVersion, SystemUtils}
+
import org.apache.spark.network.util.ByteUnit
private[spark] object UI {
@@ -97,6 +99,11 @@ private[spark] object UI {
.booleanConf
.createWithDefault(true)
+ val UI_HEAP_HISTOGRAM_ENABLED = ConfigBuilder("spark.ui.heapHistogramEnabled")
+ .version("3.5.0")
+ .booleanConf
+ .createWithDefault(SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11))
+
val UI_PROMETHEUS_ENABLED = ConfigBuilder("spark.ui.prometheus.enabled")
.internal()
.doc("Expose executor metrics at /metrics/executors/prometheus. " +
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 24d0f239f73..7fb145556a1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -56,6 +56,11 @@ private[spark] object BlockManagerMessages {
*/
case object TriggerThreadDump extends ToBlockManagerMasterStorageEndpoint
+ /**
+ * Driver to Executor message to get a heap histogram.
+ */
+ case object TriggerHeapHistogram extends ToBlockManagerMasterStorageEndpoint
+
//////////////////////////////////////////////////////////////////////////////////
// Messages from storage endpoints to the master.
//////////////////////////////////////////////////////////////////////////////////
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
index d4c631e59a1..476be80e67d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
@@ -78,6 +78,9 @@ class BlockManagerStorageEndpoint(
case TriggerThreadDump =>
context.reply(Utils.getThreadDump())
+ case TriggerHeapHistogram =>
+ context.reply(Utils.getHeapHistogram())
+
case ReplicateBlock(blockId, replicas, maxReplicas) =>
context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas))
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorHeapHistogramPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorHeapHistogramPage.scala
new file mode 100644
index 00000000000..6964711a788
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorHeapHistogramPage.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.exec
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{Node, Text}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.ui.{SparkUITab, UIUtils, WebUIPage}
+
+private[ui] class ExecutorHeapHistogramPage(
+ parent: SparkUITab,
+ sc: Option[SparkContext]) extends WebUIPage("heapHistogram") {
+
+ // Match the lines containing object informations
+ val pattern = """\s*([0-9]+):\s+([0-9]+)\s+([0-9]+)\s+(\S+)(.*)""".r
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val executorId = Option(request.getParameter("executorId")).map { executorId =>
+ UIUtils.decodeURLParameter(executorId)
+ }.getOrElse {
+ throw new IllegalArgumentException(s"Missing executorId parameter")
+ }
+ val time = System.currentTimeMillis()
+ val maybeHeapHistogram = sc.get.getExecutorHeapHistogram(executorId)
+
+ val content = maybeHeapHistogram.map { heapHistogram =>
+ val rows = heapHistogram.map { row =>
+ row match {
+ case pattern(rank, instances, bytes, name, module) =>
+ <tr class="accordion-heading">
+ <td>{rank}</td>
+ <td>{instances}</td>
+ <td>{bytes}</td>
+ <td>{name}</td>
+ <td>{module}</td>
+ </tr>
+ case pattern(rank, instances, bytes, name) =>
+ <tr class="accordion-heading">
+ <td>{rank}</td>
+ <td>{instances}</td>
+ <td>{bytes}</td>
+ <td>{name}</td>
+ <td></td>
+ </tr>
+ case _ =>
+ // Ignore the first two lines and the last line
+ //
+ // num #instances #bytes class name (module)
+ // -------------------------------------------------------
+ // ...
+ // Total 1267867 72845688
+ }
+ }
+ <div class="row">
+ <div class="col-12">
+ <p>Updated at {UIUtils.formatDate(time)}</p>
+ <table class={UIUtils.TABLE_CLASS_STRIPED + " accordion-group" + " sortable"}>
+ <thead>
+ <th>Rank</th>
+ <th>Instances</th>
+ <th>Bytes</th>
+ <th>Class Name</th>
+ <th>Module</th>
+ </thead>
+ <tbody>{rows}</tbody>
+ </table>
+ </div>
+ </div>
+ }.getOrElse(Text("Error fetching heap histogram"))
+ UIUtils.headerSparkPage(request, s"Heap Histogram for Executor $executorId", content, parent)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 7a857e57cee..b92c5e67989 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -31,18 +31,24 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
private def init(): Unit = {
val threadDumpEnabled =
parent.sc.isDefined && parent.conf.get(UI_THREAD_DUMPS_ENABLED)
+ val heapHistogramEnabled =
+ parent.sc.isDefined && parent.conf.get(UI_HEAP_HISTOGRAM_ENABLED)
- attachPage(new ExecutorsPage(this, threadDumpEnabled))
+ attachPage(new ExecutorsPage(this, threadDumpEnabled, heapHistogramEnabled))
if (threadDumpEnabled) {
attachPage(new ExecutorThreadDumpPage(this, parent.sc))
}
+ if (heapHistogramEnabled) {
+ attachPage(new ExecutorHeapHistogramPage(this, parent.sc))
+ }
}
}
private[ui] class ExecutorsPage(
parent: SparkUITab,
- threadDumpEnabled: Boolean)
+ threadDumpEnabled: Boolean,
+ heapHistogramEnabled: Boolean)
extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
@@ -52,6 +58,7 @@ private[ui] class ExecutorsPage(
<script src={UIUtils.prependBaseUri(request, "/static/utils.js")}></script> ++
<script src={UIUtils.prependBaseUri(request, "/static/executorspage.js")}></script> ++
<script>setThreadDumpEnabled({threadDumpEnabled})</script>
+ <script>setHeapHistogramEnabled({heapHistogramEnabled})</script>
}
UIUtils.headerSparkPage(request, "Executors", content, parent, useDataTables = true)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 6e8f2c496e8..ee74eacb84f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2287,6 +2287,23 @@ private[spark] object Utils extends Logging with SparkClassUtils {
}.map(threadInfoToThreadStackTrace)
}
+ /** Return a heap dump. Used to capture dumps for the web UI */
+ def getHeapHistogram(): Array[String] = {
+ // From Java 9+, we can use 'ProcessHandle.current().pid()'
+ val pid = getProcessName().split("@").head
+ val builder = new ProcessBuilder("jmap", "-histo:live", pid)
+ builder.redirectErrorStream(true)
+ val p = builder.start()
+ val r = new BufferedReader(new InputStreamReader(p.getInputStream()))
+ val rows = ArrayBuffer.empty[String]
+ var line = ""
+ while (line != null) {
+ if (line.nonEmpty) rows += line
+ line = r.readLine()
+ }
+ rows.toArray
+ }
+
def getThreadDumpForThread(threadId: Long): Option[ThreadStackTrace] = {
if (threadId <= 0) {
None
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org