You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2023/10/14 07:28:33 UTC
[spark] branch master updated: [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2f6cca55286 [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType
2f6cca55286 is described below
commit 2f6cca55286a97bb20ed665c741d0607ed04d0ce
Author: Josh Rosen <jo...@databricks.com>
AuthorDate: Sat Oct 14 02:28:15 2023 -0500
[SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType
### What changes were proposed in this pull request?
This PR aims to reduce the memory consumption of `LiveStageMetrics.accumIdsToMetricType`, which should help to reduce driver memory usage when running complex SQL queries that contain many operators and run many jobs.
In SQLAppStatusListener, the LiveStageMetrics.accumIdsToMetricType field holds a map which is used to look up the type of accumulators in order to perform conditional processing of a stage’s metrics.
Currently, that field is derived from `LiveExecutionData.metrics`, which contains metrics for _all_ operators used anywhere in the query. Whenever a job is submitted, we construct a fresh map containing all metrics that have ever been registered for that SQL query. If a query runs a single job, this isn't an issue: in that case, all `LiveStageMetrics` instances will hold the same immutable `accumIdsToMetricType`.
The problem arises if we have a query that runs many jobs (e.g. a complex query with many joins which gets divided into many jobs due to AQE): in that case, each job submission results in a new `accumIdsToMetricType` map being created.
This PR fixes this by changing `accumIdsToMetricType` to be a mutable `mutable.HashMap` which is shared across all `LivestageMetrics` instances belonging to the same `LiveExecutionData`.
The modified classes are `private` and are used only in SQLAppStatusListener, so I don't think this change poses any realistic risk of binary incompatibility risks to third party code.
### Why are the changes needed?
Addresses one contributing factor behind high driver memory / OOMs when executing complex queries.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit tests.
To demonstrate memory reduction, I performed manual benchmarking and heap dump inspection using benchmark that ran copies of a complex query: each test query launches ~200 jobs (so at least 200 stages) and contains ~3800 total operators, resulting in a huge number metric accumulators. Prior to this PR's fix, ~3700 LiveStageMetrics instances (from multiple concurrent runs of the query) consumed a combined ~3.3 GB of heap. After this PR's fix, I observed negligible memory usage from Liv [...]
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43250 from JoshRosen/reduce-accum-ids-to-metric-type-mem-overhead.
Authored-by: Josh Rosen <jo...@databricks.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../sql/execution/ui/SQLAppStatusListener.scala | 25 ++++++++++++++++------
1 file changed, 18 insertions(+), 7 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 7e31d40e511..0de16a45971 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -95,7 +95,7 @@ class SQLAppStatusListener(
executionData.details = sqlStoreData.details
executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription
executionData.modifiedConfigs = sqlStoreData.modifiedConfigs
- executionData.metrics = sqlStoreData.metrics
+ executionData.addMetrics(sqlStoreData.metrics)
executionData.submissionTime = sqlStoreData.submissionTime
executionData.completionTime = sqlStoreData.completionTime
executionData.jobs = sqlStoreData.jobs
@@ -111,7 +111,7 @@ class SQLAppStatusListener(
// Record the accumulator IDs and metric types for the stages of this job, so that the code
// that keeps track of the metrics knows which accumulators to look at.
- val accumIdsAndType = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
+ val accumIdsAndType = exec.metricAccumulatorIdToMetricType
if (accumIdsAndType.nonEmpty) {
event.stageInfos.foreach { stage =>
stageMetrics.put(stage.stageId, new LiveStageMetrics(stage.stageId, 0,
@@ -361,7 +361,7 @@ class SQLAppStatusListener(
exec.details = details
exec.physicalPlanDescription = physicalPlanDescription
exec.modifiedConfigs = modifiedConfigs
- exec.metrics = sqlPlanMetrics
+ exec.addMetrics(sqlPlanMetrics)
exec.submissionTime = time
update(exec)
}
@@ -383,7 +383,7 @@ class SQLAppStatusListener(
val exec = getOrCreateExecution(executionId)
exec.physicalPlanDescription = physicalPlanDescription
- exec.metrics ++= sqlPlanMetrics
+ exec.addMetrics(sqlPlanMetrics)
update(exec)
}
@@ -391,7 +391,7 @@ class SQLAppStatusListener(
val SparkListenerSQLAdaptiveSQLMetricUpdates(executionId, sqlPlanMetrics) = event
val exec = getOrCreateExecution(executionId)
- exec.metrics ++= sqlPlanMetrics
+ exec.addMetrics(sqlPlanMetrics)
update(exec)
}
@@ -490,7 +490,12 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
var details: String = null
var physicalPlanDescription: String = null
var modifiedConfigs: Map[String, String] = _
- var metrics = collection.Seq[SQLPlanMetric]()
+ private var _metrics = collection.Seq[SQLPlanMetric]()
+ def metrics: collection.Seq[SQLPlanMetric] = _metrics
+ // This mapping is shared across all LiveStageMetrics instances associated with
+ // this LiveExecutionData, helping to reduce memory overhead by avoiding waste
+ // from separate immutable maps with largely overlapping sets of entries.
+ val metricAccumulatorIdToMetricType = new mutable.HashMap[Long, String]()
var submissionTime = -1L
var completionTime: Option[Date] = None
var errorMessage: Option[String] = None
@@ -522,13 +527,19 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
metricsValues)
}
+ def addMetrics(newMetrics: collection.Seq[SQLPlanMetric]): Unit = {
+ _metrics ++= newMetrics
+ newMetrics.foreach { m =>
+ metricAccumulatorIdToMetricType.put(m.accumulatorId, m.metricType)
+ }
+ }
}
private class LiveStageMetrics(
val stageId: Int,
val attemptId: Int,
val numTasks: Int,
- val accumIdsToMetricType: Map[Long, String]) {
+ val accumIdsToMetricType: mutable.Map[Long, String]) {
/**
* Mapping of task IDs to their respective index. Note this may contain more elements than the
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org