You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "JoshRosen (via GitHub)" <gi...@apache.org> on 2023/10/06 20:25:03 UTC

[PR] [SPARK-45439][SQL] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType [spark]

JoshRosen opened a new pull request, #43250:
URL: https://github.com/apache/spark/pull/43250

   ### What changes were proposed in this pull request?
   
   This PR aims to reduce the memory consumption of `LiveStageMetrics.accumIdsToMetricType`, which should help to reduce driver memory usage when running complex SQL queries that contain many operators and run many jobs.
   
   In SQLAppStatusListener, the LiveStageMetrics.accumIdsToMetricType field holds a map which is used to look up the type of accumulators in order to perform conditional processing of a stage’s metrics.
   
   Currently, that field is derived from `LiveExecutionData.metrics`, which contains metrics for _all_ operators used anywhere in the query. Whenever a job is submitted, we construct a fresh map containing all metrics that have ever been registered for that SQL query. If a query runs a single job, this isn't an issue: in that case, all `LiveStageMetrics` instances will hold the same immutable `accumIdsToMetricType`.
   
   The problem arises if we have a query that runs many jobs (e.g. a complex query with many joins which gets divided into many jobs due to AQE): in that case, each job submission results in a new `accumIdsToMetricType` map being created.
   
   This PR fixes this by changing `accumIdsToMetricType` to be a mutable `ConcurrentHashMap` which is shared across all `LivestageMetrics` instances belonging to the same `LiveExecutionData`.
   
   The modified classes are `private` and are used only in SQLAppStatusListener, so I don't think this change poses any realistic risk of binary incompatibility risks to third party code.   
   
   
   ### Why are the changes needed?
   
   Addresses one contributing factor behind high driver memory / OOMs when executing complex queries.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing unit tests.
   
   To demonstrate memory reduction, I performed manual benchmarking and heap dump inspection using a complex query that executed ~200 jobs (so at least 200 stages) and contains ~3800 total operators, resulting in a huge number metric accumulators. Prior to this PR's fix, ~3700 LiveStageMetrics instances (from multiple concurrent runs of the query) consumed a combined ~3.3 GB of heap. After this PR's fix, I observed negligible memory usage from LiveStageMetrics.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43250:
URL: https://github.com/apache/spark/pull/43250#discussion_r1357389742


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala:
##########
@@ -589,7 +600,7 @@ private class LiveStageMetrics(
         val metricValues = taskMetrics.computeIfAbsent(acc.id, _ => new Array(numTasks))
         metricValues(taskIdx) = value
 
-        if (SQLMetrics.metricNeedsMax(accumIdsToMetricType(acc.id))) {
+        if (SQLMetrics.metricNeedsMax(Option(accumIdsToMetricType.get(acc.id)).get)) {

Review Comment:
   QQ: If we are always expecting `acc.id` to be present, avoid the `Option(<map.get>).get` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType [spark]

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen commented on PR #43250:
URL: https://github.com/apache/spark/pull/43250#issuecomment-1762420388

   Hmm, it looks like the `OracleIntegrationSuite` tests are flaky but I don't think that's related to this PR's changes:
   
   ```
   [info] OracleIntegrationSuite:
   [info] org.apache.spark.sql.jdbc.OracleIntegrationSuite *** ABORTED *** (7 minutes, 38 seconds)
   [info]   The code passed to eventually never returned normally. Attempted 429 times over 7.003095079966667 minutes. Last failure message: ORA-12514: Cannot connect to database. Service freepdb1 is not registered with the listener at host 10.1.0.126 port 45139. (CONNECTION_ID=CC2wkBm6SPGoMF7IghzCeQ==). (DockerJDBCIntegrationSuite.scala:166)
   [info]   org.scalatest.exceptions.TestFailedDueToTimeoutException:
   [info]   at org.scalatest.enablers.Retrying$$anon$4.tryTryAgain$2(Retrying.scala:219)
   [info]   at org.scalatest.enablers.Retrying$$anon$4.retry(Retrying.scala:226)
   [info]   at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:313)
   [info]   at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:312)
   [info]   at org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.eventually(DockerJDBCIntegrationSuite.scala:95)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #43250:
URL: https://github.com/apache/spark/pull/43250#issuecomment-1762699766

   The test failures are not related - unfortunately reattempt did not fix them.
   Merging to master.
   Thanks for fixing this @JoshRosen !
   Thanks for the reviews @jiangxb1987, @beliefer, @Ngone51 :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm closed pull request #43250: [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType
URL: https://github.com/apache/spark/pull/43250


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType [spark]

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen commented on code in PR #43250:
URL: https://github.com/apache/spark/pull/43250#discussion_r1355983329


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala:
##########
@@ -490,7 +490,12 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
   var details: String = null
   var physicalPlanDescription: String = null
   var modifiedConfigs: Map[String, String] = _
-  var metrics = collection.Seq[SQLPlanMetric]()
+  private var _metrics = collection.Seq[SQLPlanMetric]()
+  def metrics: collection.Seq[SQLPlanMetric] = _metrics
+  // This mapping is shared across all LiveStageMetrics instances associated with
+  // this LiveExecutionData, helping to reduce memory overhead by avoiding waste
+  // from separate immutable maps with largely overlapping sets of entries.
+  val metricAccumulatorIdToMetricType = new ConcurrentHashMap[Long, String]()

Review Comment:
   Our Spark fork contains a codepath that performs multi-threaded access to this data structure, so that's why I used a thread-safe data structure here.
   
   If you would prefer that I avoid leaking those proprietary thread-safety concerns into OSS Apache Spark then I would be glad to refactor this to use a `mutable.Map` instead and keep the thread-safe version in our fork.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #43250:
URL: https://github.com/apache/spark/pull/43250#discussion_r1349475794


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala:
##########
@@ -490,7 +490,12 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
   var details: String = null
   var physicalPlanDescription: String = null
   var modifiedConfigs: Map[String, String] = _
-  var metrics = collection.Seq[SQLPlanMetric]()
+  private var _metrics = collection.Seq[SQLPlanMetric]()
+  def metrics: collection.Seq[SQLPlanMetric] = _metrics
+  // This mapping is shared across all LiveStageMetrics instances associated with
+  // this LiveExecutionData, helping to reduce memory overhead by avoiding waste
+  // from separate immutable maps with largely overlapping sets of entries.
+  val metricAccumulatorIdToMetricType = new ConcurrentHashMap[Long, String]()

Review Comment:
   AFAIK, SQLAppStatusListener only works in single thread. Should we only use Map here? please tell me detail if I thought missing something.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala:
##########
@@ -490,7 +490,12 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
   var details: String = null
   var physicalPlanDescription: String = null
   var modifiedConfigs: Map[String, String] = _
-  var metrics = collection.Seq[SQLPlanMetric]()
+  private var _metrics = collection.Seq[SQLPlanMetric]()
+  def metrics: collection.Seq[SQLPlanMetric] = _metrics
+  // This mapping is shared across all LiveStageMetrics instances associated with
+  // this LiveExecutionData, helping to reduce memory overhead by avoiding waste
+  // from separate immutable maps with largely overlapping sets of entries.
+  val metricAccumulatorIdToMetricType = new ConcurrentHashMap[Long, String]()

Review Comment:
   AFAIK, `SQLAppStatusListener` only works in single thread. Should we only use Map here? please tell me detail if I thought missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43250:
URL: https://github.com/apache/spark/pull/43250#discussion_r1357386637


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala:
##########
@@ -490,7 +490,12 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
   var details: String = null
   var physicalPlanDescription: String = null
   var modifiedConfigs: Map[String, String] = _
-  var metrics = collection.Seq[SQLPlanMetric]()
+  private var _metrics = collection.Seq[SQLPlanMetric]()
+  def metrics: collection.Seq[SQLPlanMetric] = _metrics
+  // This mapping is shared across all LiveStageMetrics instances associated with
+  // this LiveExecutionData, helping to reduce memory overhead by avoiding waste
+  // from separate immutable maps with largely overlapping sets of entries.
+  val metricAccumulatorIdToMetricType = new ConcurrentHashMap[Long, String]()

Review Comment:
   Curious why this is diverging ? How is it used ? Is it something that might be relevant to Apache Spark as a functional enhancement ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45439][SQL][UI] Reduce memory usage of LiveStageMetrics.accumIdsToMetricType [spark]

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen commented on code in PR #43250:
URL: https://github.com/apache/spark/pull/43250#discussion_r1357445322


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala:
##########
@@ -490,7 +490,12 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {
   var details: String = null
   var physicalPlanDescription: String = null
   var modifiedConfigs: Map[String, String] = _
-  var metrics = collection.Seq[SQLPlanMetric]()
+  private var _metrics = collection.Seq[SQLPlanMetric]()
+  def metrics: collection.Seq[SQLPlanMetric] = _metrics
+  // This mapping is shared across all LiveStageMetrics instances associated with
+  // this LiveExecutionData, helping to reduce memory overhead by avoiding waste
+  // from separate immutable maps with largely overlapping sets of entries.
+  val metricAccumulatorIdToMetricType = new ConcurrentHashMap[Long, String]()

Review Comment:
   The codepath in question isn't of general relevance and, as it turns out, might be deprecated soon. Given this, I have updated this PR to use a `mutable.HashMap` (which simplifies the rest of the code and addresses your other review comment).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org