You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2023/10/14 07:28:33 UTC

[spark] branch master updated: [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f6cca55286 [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType
2f6cca55286 is described below

commit 2f6cca55286a97bb20ed665c741d0607ed04d0ce
Author: Josh Rosen <jo...@databricks.com>
AuthorDate: Sat Oct 14 02:28:15 2023 -0500

    [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType
    
    ### What changes were proposed in this pull request?
    
    This PR aims to reduce the memory consumption of `LiveStageMetrics.accumIdsToMetricType`, which should help to reduce driver memory usage when running complex SQL queries that contain many operators and run many jobs.
    
    In SQLAppStatusListener, the LiveStageMetrics.accumIdsToMetricType field holds a map which is used to look up the type of accumulators in order to perform conditional processing of a stage’s metrics.
    
    Currently, that field is derived from `LiveExecutionData.metrics`, which contains metrics for _all_ operators used anywhere in the query. Whenever a job is submitted, we construct a fresh map containing all metrics that have ever been registered for that SQL query. If a query runs a single job, this isn't an issue: in that case, all `LiveStageMetrics` instances will hold the same immutable `accumIdsToMetricType`.
    
    The problem arises if we have a query that runs many jobs (e.g. a complex query with many joins which gets divided into many jobs due to AQE): in that case, each job submission results in a new `accumIdsToMetricType` map being created.
    
    This PR fixes this by changing `accumIdsToMetricType` to be a mutable `mutable.HashMap` which is shared across all `LivestageMetrics` instances belonging to the same `LiveExecutionData`.
    
    The modified classes are `private` and are used only in SQLAppStatusListener, so I don't think this change poses any realistic risk of binary incompatibility risks to third party code.
    
    ### Why are the changes needed?
    
    Addresses one contributing factor behind high driver memory / OOMs when executing complex queries.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing unit tests.
    
    To demonstrate memory reduction, I performed manual benchmarking and heap dump inspection using benchmark that ran copies of a complex query: each test query launches ~200 jobs (so at least 200 stages) and contains ~3800 total operators, resulting in a huge number metric accumulators. Prior to this PR's fix, ~3700 LiveStageMetrics instances (from multiple concurrent runs of the query) consumed a combined ~3.3 GB of heap. After this PR's fix, I observed negligible memory usage from Liv [...]
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #43250 from JoshRosen/reduce-accum-ids-to-metric-type-mem-overhead.
    
    Authored-by: Josh Rosen <jo...@databricks.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../sql/execution/ui/SQLAppStatusListener.scala    | 25 ++++++++++++++++------
 1 file changed, 18 insertions(+), 7 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 7e31d40e511..0de16a45971 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -95,7 +95,7 @@ class SQLAppStatusListener(
           executionData.details = sqlStoreData.details
           executionData.physicalPlanDescription = sqlStoreData.physicalPlanDescription
           executionData.modifiedConfigs = sqlStoreData.modifiedConfigs
-          executionData.metrics = sqlStoreData.metrics
+          executionData.addMetrics(sqlStoreData.metrics)
           executionData.submissionTime = sqlStoreData.submissionTime
           executionData.completionTime = sqlStoreData.completionTime
           executionData.jobs = sqlStoreData.jobs
@@ -111,7 +111,7 @@ class SQLAppStatusListener(
 
     // Record the accumulator IDs and metric types for the stages of this job, so that the code
     // that keeps track of the metrics knows which accumulators to look at.
-    val accumIdsAndType = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
+    val accumIdsAndType = exec.metricAccumulatorIdToMetricType
     if (accumIdsAndType.nonEmpty) {
       event.stageInfos.foreach { stage =>
         stageMetrics.put(stage.stageId, new LiveStageMetrics(stage.stageId, 0,
@@ -361,7 +361,7 @@ class SQLAppStatusListener(
     exec.details = details
     exec.physicalPlanDescription = physicalPlanDescription
     exec.modifiedConfigs = modifiedConfigs
-    exec.metrics = sqlPlanMetrics
+    exec.addMetrics(sqlPlanMetrics)
     exec.submissionTime = time
     update(exec)
   }
@@ -383,7 +383,7 @@ class SQLAppStatusListener(
 
     val exec = getOrCreateExecution(executionId)
     exec.physicalPlanDescription = physicalPlanDescription
-    exec.metrics ++= sqlPlanMetrics
+    exec.addMetrics(sqlPlanMetrics)
     update(exec)
   }
 
@@ -391,7 +391,7 @@ class SQLAppStatusListener(
     val SparkListenerSQLAdaptiveSQLMetricUpdates(executionId, sqlPlanMetrics) = event
 
     val exec = getOrCreateExecution(executionId)
-    exec.metrics ++= sqlPlanMetrics
+    exec.addMetrics(sqlPlanMetrics)
     update(exec)
   }
 
@@ -490,7 +490,12 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
   var details: String = null
   var physicalPlanDescription: String = null
   var modifiedConfigs: Map[String, String] = _
-  var metrics = collection.Seq[SQLPlanMetric]()
+  private var _metrics = collection.Seq[SQLPlanMetric]()
+  def metrics: collection.Seq[SQLPlanMetric] = _metrics
+  // This mapping is shared across all LiveStageMetrics instances associated with
+  // this LiveExecutionData, helping to reduce memory overhead by avoiding waste
+  // from separate immutable maps with largely overlapping sets of entries.
+  val metricAccumulatorIdToMetricType = new mutable.HashMap[Long, String]()
   var submissionTime = -1L
   var completionTime: Option[Date] = None
   var errorMessage: Option[String] = None
@@ -522,13 +527,19 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
       metricsValues)
   }
 
+  def addMetrics(newMetrics: collection.Seq[SQLPlanMetric]): Unit = {
+    _metrics ++= newMetrics
+    newMetrics.foreach { m =>
+      metricAccumulatorIdToMetricType.put(m.accumulatorId, m.metricType)
+    }
+  }
 }
 
 private class LiveStageMetrics(
     val stageId: Int,
     val attemptId: Int,
     val numTasks: Int,
-    val accumIdsToMetricType: Map[Long, String]) {
+    val accumIdsToMetricType: mutable.Map[Long, String]) {
 
   /**
    * Mapping of task IDs to their respective index. Note this may contain more elements than the


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org