You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "utkarsh39 (via GitHub)" <gi...@apache.org> on 2023/12/12 21:51:50 UTC

[PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

utkarsh39 opened a new pull request, #44321:
URL: https://github.com/apache/spark/pull/44321

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   `AccumulableInfo` is one of the top heap consumers in driver's heap dumps for stages with many tasks. For a stage with a large number of tasks (**_O(100k)_**), we saw **30%** of the heap usage stemming from `TaskInfo.accumulables()`.
   
     
   ![image](https://github.com/apache/spark/assets/10495099/13ef5d07-abfc-47fd-81b6-705f599db011)
   
   
   The `TaskSetManager` today keeps around the TaskInfo objects ([ref1](https://github.com/apache/spark/blob/c1ba963e64a22dea28e17b1ed954e6d03d38da1e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L134), [ref2](https://github.com/apache/spark/blob/c1ba963e64a22dea28e17b1ed954e6d03d38da1e/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L192))) and in turn the task metrics (`AccumulableInfo`) for every task attempt until the stage is completed. This means that for stages with a large number of tasks, we keep metrics for all the tasks (`AccumulableInfo`) around even when the task has completed and its metrics have been aggregated. Given a task has a large number of metrics, stages with many tasks end up with a large heap usage in the form of task metrics.
   
   This PR reduces the driver's heap usage for stages with many tasks by no longer referencing the task metrics of completed tasks. Once a task is completed in `TaskSetManager`, we no longer keep its metrics around. Upon task completion, we clone the `TaskInfo` object and empty out the metrics for the clone. The cloned `TaskInfo` is retained by the `TaskSetManager` while the original `TaskInfo` object with the metrics is sent over to the `DAGScheduler` where the task metrics are aggregated. Thus for a completed task, `TaskSetManager` holds a `TaskInfo` object with empty metrics. This reduces the memory footprint by ensuring that the number of task metric objects is proportional to the number of active tasks and not to the total number of tasks in the stage.
   
   ### Config to gate changes
   The changes in the PR are guarded with the Spark conf `spark.scheduler.dropTaskInfoAccumulablesOnTaskCompletion.enabled` which can be used for rollback or staged rollouts.
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   Reduce driver's heap usage, especially for stages with many tasks
   
   ## Benchmarking
   On a cluster running a scan stage with 100k tasks, the TaskSetManager's heap usage dropped from 1.1 GB to 37 MB. This  **reduced the total driver's heap usage by 38%**, down to 2 GB from 3.5 GB.
   
   **BEFORE**
   
   ![image](https://github.com/databricks/runtime/assets/10495099/7c1599f3-3587-48a1-b019-84115b1bb90d)
   **WITH FIX**
   <img width="1386" alt="image" src="https://github.com/databricks/runtime/assets/10495099/b85129c8-dc10-4ee2-898d-61c8e7449616">
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   Added new tests and did benchmarking on a cluster.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   Generated-by: Github Copilot


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428673442


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +850,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
     // Note: "result.value()" only deserializes the value when it's called at the first time, so
     // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo
+   * objects held by this class are long-lived and have a heavy memory footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can install
+   * SparkListeners that compare the TaskInfo objects across these SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index
+    if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {
+      val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables()
+      // Update this task's taskInfo while preserving its position in the list
+      taskAttempts(index) =
+        taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else i }
+      taskInfos(taskId) = clonedTaskInfo
+    }
+    sched.dagScheduler.taskEnded(task, reason, result, accumUpdates, metricPeaks, taskInfo)

Review Comment:
   It seems `DAGScheduler` need the `taskInfo.accumulables`, why send `CompletionEvent` after clone `TaskInfo` with empty accumulables?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1430656022


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager(
           addPendingTask(index)
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
-          sched.dagScheduler.taskEnded(
+          emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
             tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)

Review Comment:
   Yes, what @JoshRosen [detailed for executorLost](https://github.com/apache/spark/pull/44321#pullrequestreview-1785137821) is what I am referring to - had missed that comment, thanks for the analysis Josh !
   
   Reducing memory utilization is definitely something we should aspire for, but IMO not at the cost of breaking interfaces and existing user code - in this case, `Resubmitted` has existed for 8+ years; and has been exposed to developer's.
   How commonly this is triggered depends on stability of the cluster infra - which might be different for different users/deployments :-)
   
   In this specific case, unfortunately it impacts not just end user code (which we dont know how it is being used: and I have seen fairly involved usage of these api's in general, including writing some of my own in the past) - but also our spark UI - for example `AppStatusListener.onTaskEnd` when `Resubmitted` is fired.
   
   If we want to relook at the behavior of how `Resubmitted` is handled in Spark, we should do that decoupled from a performance discussion - it is possible that some of this has evolved in "interesting ways" since initially designed, and requires to be revisited (why should users care, etc are discussions to be had in that context).
   For this PR, which is to improve memory utilization, there should not have unintended side effects IMO.
   
   As I mentioned before, at a minimum, this should not become default.
   Once we relook/redesign `Resubmitted`, we can flip it - if required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #44321:
URL: https://github.com/apache/spark/pull/44321#issuecomment-1866163422

   SGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1448069167


##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -643,6 +657,29 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     }
   }
 
+  /**
+   * A simple listener that tracks task infos for all active tasks.
+   */
+  private class SaveActiveTaskInfos extends SparkListener {
+    // Use a set based on IdentityHashMap instead of a HashSet to track unique references of
+    // TaskInfo objects.
+    val taskInfos = Collections.newSetFromMap[TaskInfo](new IdentityHashMap)
+
+    override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+      val info = taskStart.taskInfo
+      if (info != null) {
+        taskInfos.add(info)
+      }
+    }
+
+    override def onTaskEnd(task: SparkListenerTaskEnd): Unit = {
+      val info = task.taskInfo
+      if (info != null && taskInfos.contains(info)) {
+        taskInfos.remove(info)
+      }

Review Comment:
   nit:
   
   ```suggestion
         if (info != null) {
           taskInfos.remove(info)
         }
   ```
   
   or even simply 
   ```suggestion
         taskInfos.remove(info)
   ```



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -61,6 +61,11 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
       accumUpdates: Seq[AccumulatorV2[_, _]],
       metricPeaks: Array[Long],
       taskInfo: TaskInfo): Unit = {
+    accumUpdates.foreach(acc =>
+      taskInfo.setAccumulables(
+        acc.toInfo(Some(acc.value), Some(acc.value)) +: taskInfo.accumulables)
+    )
+    taskScheduler.endedTasks(taskInfo.index) = reason

Review Comment:
   Duplicate ?
   
   ```suggestion
   ```



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -61,6 +61,11 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
       accumUpdates: Seq[AccumulatorV2[_, _]],
       metricPeaks: Array[Long],
       taskInfo: TaskInfo): Unit = {
+    accumUpdates.foreach(acc =>
+      taskInfo.setAccumulables(
+        acc.toInfo(Some(acc.value), Some(acc.value)) +: taskInfo.accumulables)
+    )

Review Comment:
   Add a comment on why we need this ?



##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -289,6 +290,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true}
   }
 
+  test("SPARK-46383: Track TaskInfo objects") {
+    val conf = new SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "SparkListenerSuite", conf)
+    val listener = new SaveActiveTaskInfos
+    sc.addSparkListener(listener)
+    val rdd1 = sc.parallelize(1 to 100, 4)
+    sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
+    sc.listenerBus.waitUntilEmpty()
+    listener.taskInfos.size should be { 0 }

Review Comment:
   Functionally that (the right task info is in the event) should be covered already (in use of `SaveStageAndTaskInfo` for example). Do let me know if that is not the case.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428555609


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +850,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
     // Note: "result.value()" only deserializes the value when it's called at the first time, so
     // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo
+   * objects held by this class are long-lived and have a heavy memory footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can install
+   * SparkListeners that compare the TaskInfo objects across these SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index
+    if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {
+      val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables()
+      // Update this task's taskInfo while preserving its position in the list
+      taskAttempts(index) =
+        taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else i }

Review Comment:
   when will we hit the else branch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428556859


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -787,6 +787,8 @@ private[spark] class TaskSetManager(
     // SPARK-37300: when the task was already finished state, just ignore it,
     // so that there won't cause successful and tasksSuccessful wrong result.

Review Comment:
   Reading this comment, the partition is already completed, probably by another `TaskSetManager`, and we just need to reset the task info here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1430656022


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager(
           addPendingTask(index)
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
-          sched.dagScheduler.taskEnded(
+          emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
             tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)

Review Comment:
   Yes, what @JoshRosen [detailed for executorLost](https://github.com/apache/spark/pull/44321#pullrequestreview-1785137821) is what I am referring to - had missed that comment, thanks for the analysis Josh !
   
   Reducing memory utilization is definitely something we should aspire for, but IMO not at the cost of breaking interfaces and existing user code - in this case, `Resubmitted` has existed for 8+ years; and has been exposed to developer's.
   How commonly this is triggered depends on stability of the cluster infra - which might be different for different users/deployments :-)
   
   In this specific case, unfortunately it impacts not just end user code (which we dont know how it is being used: and I have seen fairly involved usage of these api's in general, including writing some of my own in the past) - but also our spark UI - for example `AppStatusListener.onTaskEnd` when `Resubmitted` is fired.
   
   If we want to relook at the behavior of how `Resubmitted` is handled in Spark, we should do that decoupled from a performance discussion - it is possible that some of this has evolved in "interesting ways" since initially designed, and requires to be revisited (why should users care, etc are discussions to be had there).
   For this PR, which is to improve memory utilization, it should not have unintended side effects IMO.
   
   As I mentioned before, at a minimum, this should not become default.
   Once we relook/redesign `Resubmitted`, we can flip it - if required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #44321:
URL: https://github.com/apache/spark/pull/44321#issuecomment-1858692524

   I have not looked into this in a lot of detail (and given my vacation plans, might not be able to unfortunately).
   Will do one note for @cloud-fan and @JoshRosen - perhaps you have analyzed it and this is not a concern.
   
   `LiveTask` keeps a reference to `TaskInfo` and references `accumulables` there. Given the potential delays between task events getting fired and actual scheduler updates (due to delays in event processing), will this PR cause issues ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428553296


##########
core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala:
##########
@@ -75,14 +75,47 @@ class TaskInfo(
    * accumulable to be updated multiple times in a single task or for two accumulables with the
    * same name but different IDs to exist in a task.
    */
-  def accumulables: Seq[AccumulableInfo] = _accumulables
+  def accumulables: Seq[AccumulableInfo] = {
+    if (throwOnAccumulablesCall) {
+      throw new IllegalStateException("Accumulables for the TaskInfo have been cleared")
+    } else {
+      _accumulables
+    }
+  }
 
   private[this] var _accumulables: Seq[AccumulableInfo] = Nil
 
   private[spark] def setAccumulables(newAccumulables: Seq[AccumulableInfo]): Unit = {
     _accumulables = newAccumulables
   }
 
+  /**
+   * If true, a call to TaskInfo.accumulables() will throw an exception.
+   */
+  private var throwOnAccumulablesCall: Boolean = false
+
+  override def clone(): TaskInfo = super.clone().asInstanceOf[TaskInfo]
+
+  /**
+   * For testing only. Allows probing accumulables without triggering the exception when
+   * `throwOnAccumulablesCall` is set.
+   */
+  private[scheduler] def isAccumulablesEmpty(): Boolean = {
+    _accumulables.isEmpty
+  }
+
+  private[scheduler] def resetAccumulables(): Unit = {
+    setAccumulables(Nil)
+    throwOnAccumulablesCall = true
+  }
+
+  private[scheduler] def cloneWithEmptyAccumulables(): TaskInfo = {
+    val cloned = clone()
+    cloned.setAccumulables(Nil)
+    cloned.throwOnAccumulablesCall = true

Review Comment:
   is this `cloned.resetAccumulables`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1443157668


##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -289,6 +290,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true}
   }
 
+  test("SPARK-46383: Track TaskInfo objects") {
+    val conf = new SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "SparkListenerSuite", conf)
+    val listener = new SaveActiveTaskInfos
+    sc.addSparkListener(listener)
+    val rdd1 = sc.parallelize(1 to 100, 4)
+    sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
+    sc.listenerBus.waitUntilEmpty()
+    listener.taskInfos.size should be { 0 }

Review Comment:
   Isn't that not simply an implementation detail ? (for ex, the resubmission case would break it)
   I am not sure what is the behavior we are testing for here - and how would this test help with some future change (and validation).
   
   I dont see a harm is keeping it, but want to make sure I am not missing something here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428771003


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +850,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
     // Note: "result.value()" only deserializes the value when it's called at the first time, so
     // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo
+   * objects held by this class are long-lived and have a heavy memory footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can install
+   * SparkListeners that compare the TaskInfo objects across these SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index
+    if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {
+      val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables()
+      // Update this task's taskInfo while preserving its position in the list
+      taskAttempts(index) =
+        taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else i }
+      taskInfos(taskId) = clonedTaskInfo
+    }
+    sched.dagScheduler.taskEnded(task, reason, result, accumUpdates, metricPeaks, taskInfo)

Review Comment:
   Oh. I see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1430656022


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager(
           addPendingTask(index)
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
-          sched.dagScheduler.taskEnded(
+          emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
             tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)

Review Comment:
   Yes, what @JoshRosen [detailed for executorLost](https://github.com/apache/spark/pull/44321#pullrequestreview-1785137821) is what I am referring to - had missed that comment, thanks for the analysis Josh !
   
   Reducing memory utilization is definitely something we should aspire for, but IMO not at the cost of breaking interfaces and existing user code - in this case, `Resubmitted` has existed for 8+ years; and has been exposed to developer's.
   How commonly this is triggered depends on stability of the cluster infra - which might be different for different users/deployments :-)
   
   In this specific case, unfortunately it impacts not just end user code (which we dont know is using it: and I have seen fairly involved usage of these api's, including writing some of my own in the past) - but also our spark UI - for example `AppStatusListener.onTaskEnd` when `Resubmitted` is fired.
   
   If we want to relook at the behavior of how `Resubmitted` is handled in Spark, we should do that decoupled from a performance discussion - it is possible that some of this has evolved in "interesting ways" since initially designed, and requires to be revisited (why should users care, etc are discussions to be had there).
   For this PR, which is to improve memory utilization, it should not have unintended side effects IMO.
   
   As I mentioned before, at a minimum, this should not become default.
   Once we relook/redesign `Resubmitted`, we can flip it - if required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1429775174


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager(
           addPendingTask(index)
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
-          sched.dagScheduler.taskEnded(
+          emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
             tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)

Review Comment:
   TBH, I think this API is not well-designed and we don't have a clear definition anyway. @JoshRosen 's comment explains the details: if a task is running, listeners won't get the `Resubmitted` event, but the task failure event. The `Resubmitted` event is only issued if the task is completed and its map output is lost. In this case, I think the current behavior of sending the task info with metrics of the previously completed task is a bit weird. It's likely not by design but just a coincidence. Why would people care the previously completed task's metrics when Spark resubmits it? Even if they care, they can get the metrics in the previous task completion event.
   
   To me, the memory issue is more important to most users, and a listener that will be broken by this change should be very rare. How about we put it in the migration guide and ask people to set the legacy flag to get the legacy behavior?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1443157668


##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -289,6 +290,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true}
   }
 
+  test("SPARK-46383: Track TaskInfo objects") {
+    val conf = new SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "SparkListenerSuite", conf)
+    val listener = new SaveActiveTaskInfos
+    sc.addSparkListener(listener)
+    val rdd1 = sc.parallelize(1 to 100, 4)
+    sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
+    sc.listenerBus.waitUntilEmpty()
+    listener.taskInfos.size should be { 0 }

Review Comment:
   Isn't that not simply an implementation detail ? (for ex, the resubmission case would break it)
   I am not sure what is the behavior we are testing for here - and how would this test help with some future change (and validation).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1429775174


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager(
           addPendingTask(index)
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
-          sched.dagScheduler.taskEnded(
+          emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
             tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)

Review Comment:
   TBH, I think this API is not well-designed and we don't have a clear definition anyway. @JoshRosen 's comment explains the details: if a task is running, listeners won't get the `Resubmitted` event, but the task failure event. The `Resubmitted` event is only issued if the task is completed and its map output is lost. In this case, I think the current behavior to send the task info of the previously completed task info is a bit weird. It's likely not by design but just a coincidence. Why would people care the previously completed task's metrics when Spark resubmits it? Even if they care, they can get the metrics in the previous task completion event.
   
   To me, the memory issue is more important to most users, and a listener that will be broken by this change should be very rare. How about we put it in the migration guide and ask people to set the legacy flag to get the legacy behavior?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "utkarsh39 (via GitHub)" <gi...@apache.org>.
utkarsh39 commented on PR #44321:
URL: https://github.com/apache/spark/pull/44321#issuecomment-1864719908

   **Proposal To Gain Consensus**
   The PR alleviates memory pressure on the driver although at the cost of introducing a breaking change as identified by @JoshRosen  in https://github.com/apache/spark/pull/44321#pullrequestreview-1785137821. I propose that we disable the feature by default and introduce a breaking change wherein the `TaskInfo.accumulables()` are empty for `Resubmitted` tasks upon the loss of an executor? The behavior change would be to return an **empty** `Accumulables` as opposed to returning `Accumulables` of a earlier successful task attempt today. When this change is enabled, the behavior change will affect the following consumers:
   1. `EventLoggingListener` where task accumulables are serialized to JSON upon task completion ([code link](https://github.com/apache/spark/blob/aa1ff3789e492545b07d84ac095fc4c39f7446c6/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala#L159)).
   2. Custom Spark Listeners installed by Spark users
   
   
   What do the reviewers think of the proposal?
   
   Note that the current design in the PR does not implement this proposal. Currently, accessing the empty accumulables would result in a crash. I will refactor the change if agree upon this proposal.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1448875469


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -2620,4 +2620,14 @@ package object config {
       .stringConf
       .toSequence
       .createWithDefault("org.apache.spark.sql.connect.client" :: Nil)
+
+  private[spark] val DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION =
+    ConfigBuilder("spark.scheduler.dropTaskInfoAccumulablesOnTaskCompletion.enabled")
+    .internal()

Review Comment:
   nit: 2 spaces indentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1441419344


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +851,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
     // Note: "result.value()" only deserializes the value when it's called at the first time, so
     // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo
+   * objects held by this class are long-lived and have a heavy memory footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can install
+   * SparkListeners that compare the TaskInfo objects across these SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index
+    if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {

Review Comment:
   Pull `conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)` as a private field



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -787,6 +787,9 @@ private[spark] class TaskSetManager(
     // SPARK-37300: when the task was already finished state, just ignore it,
     // so that there won't cause successful and tasksSuccessful wrong result.
     if(info.finished) {
+      // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable
+      // lifetime.
+      info.resetAccumulables()

Review Comment:
   Only when `conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)` ?



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +851,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
     // Note: "result.value()" only deserializes the value when it's called at the first time, so
     // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo
+   * objects held by this class are long-lived and have a heavy memory footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can install
+   * SparkListeners that compare the TaskInfo objects across these SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index

Review Comment:
   super nit: pull this variable into the `if` block below



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -874,6 +917,9 @@ private[spark] class TaskSetManager(
     // SPARK-37300: when the task was already finished state, just ignore it,
     // so that there won't cause copiesRunning wrong result.
     if (info.finished) {
+      // SPARK-46383: Clear out the accumulables for a completed task to reduce accumulable
+      // lifetime.
+      info.resetAccumulables()

Review Comment:
   Same as above, only when `conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)` ?



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -229,6 +234,77 @@ class TaskSetManagerSuite
     super.afterEach()
   }
 
+  test("SPARK-46383: Accumulables of TaskInfo objects held by TaskSetManager must not be " +
+    "accessed once the task has completed") {
+    val conf = new SparkConf().
+      set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "test", conf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    val taskSet = FakeTask.createTaskSet(1)
+    val clock = new ManualClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
+    val accumUpdates = taskSet.tasks.head.metrics.internalAccums
+
+    // Offer a host. This will launch the first task.
+    val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+    assert(taskOption.isDefined)
+
+    clock.advance(1)
+    // Tell it the first task has finished successfully
+    manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
+    assert(sched.endedTasks(0) === Success)
+
+    val e = intercept[SparkException]{
+      manager.taskInfos.head._2.accumulables
+    }
+    assert(e.getMessage.contains("Accumulables for the TaskInfo have been cleared"))
+  }
+
+  test("SPARK-46383: TaskInfo accumulables are cleared upon task completion") {
+    val conf = new SparkConf().
+      set(config.DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "test", conf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+    val taskSet = FakeTask.createTaskSet(2)
+    val clock = new ManualClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
+    val accumUpdates = taskSet.tasks.head.metrics.internalAccums
+
+    // Offer a host. This will launch the first task.
+    val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+    assert(taskOption.isDefined)
+
+    clock.advance(1)
+    // Tell it the first task has finished successfully
+    manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdates))
+    assert(sched.endedTasks(0) === Success)
+
+    // Only one task was launched and it completed successfully, thus the TaskInfo accumulables
+    // should be empty.
+    assert(!manager.taskInfos.exists(l => !l._2.isAccumulablesEmpty))

Review Comment:
   Change from `l` to avoid confusion with `1` (here and elsewhere)



##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -289,6 +290,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true}
   }
 
+  test("SPARK-46383: Track TaskInfo objects") {
+    val conf = new SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "SparkListenerSuite", conf)
+    val listener = new SaveActiveTaskInfos
+    sc.addSparkListener(listener)
+    val rdd1 = sc.parallelize(1 to 100, 4)
+    sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
+    sc.listenerBus.waitUntilEmpty()
+    listener.taskInfos.size should be { 0 }

Review Comment:
   I am not sure I follow this test, what is it trying to do ?
   This test will be successful even with `DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION` = `true`, right ? (Since it is simply checking for instance equality in the fired event ?)



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +851,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
     // Note: "result.value()" only deserializes the value when it's called at the first time, so
     // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo
+   * objects held by this class are long-lived and have a heavy memory footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can install
+   * SparkListeners that compare the TaskInfo objects across these SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {

Review Comment:
   As we are passing `taskId` already - we can drop `taskInfo` from here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "utkarsh39 (via GitHub)" <gi...@apache.org>.
utkarsh39 commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1443107674


##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -289,6 +290,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true}
   }
 
+  test("SPARK-46383: Track TaskInfo objects") {
+    val conf = new SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "SparkListenerSuite", conf)
+    val listener = new SaveActiveTaskInfos
+    sc.addSparkListener(listener)
+    val rdd1 = sc.parallelize(1 to 100, 4)
+    sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
+    sc.listenerBus.waitUntilEmpty()
+    listener.taskInfos.size should be { 0 }

Review Comment:
   This test asserts that the same `TaskInfo` object is sent in the `onTaskStart` and `onTaskEnd` events. This test asserts the design in this PR that we are sending the original TaskInfo object to the DAGScheduler upon task completion and not a clone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "beliefer (via GitHub)" <gi...@apache.org>.
beliefer commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428673442


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +850,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
     // Note: "result.value()" only deserializes the value when it's called at the first time, so
     // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo
+   * objects held by this class are long-lived and have a heavy memory footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can install
+   * SparkListeners that compare the TaskInfo objects across these SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index
+    if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {
+      val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables()
+      // Update this task's taskInfo while preserving its position in the list
+      taskAttempts(index) =
+        taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else i }
+      taskInfos(taskId) = clonedTaskInfo
+    }
+    sched.dagScheduler.taskEnded(task, reason, result, accumUpdates, metricPeaks, taskInfo)

Review Comment:
   It seems `DAGScheduler` need the `taskInfo.accumulables`, why send `CompletionEvent` after clone TaskInfo with empty accumulables?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1430656022


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager(
           addPendingTask(index)
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
-          sched.dagScheduler.taskEnded(
+          emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
             tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)

Review Comment:
   Yes, what @JoshRosen [detailed for executorLost](https://github.com/apache/spark/pull/44321#pullrequestreview-1785137821) is what I am referring to - had missed that comment, thanks for the analysis Josh !
   
   Fixing memory issue is definitely something we should aspire for, but IMO not at the cost of breaking interfaces and existing user code - in this case, `Resubmitted` has existed for 8+ years; and has been exposed to developer's.
   How commonly this is triggered depends on stability of the cluster infra - which might be different for different users/deployments :-)
   
   In this specific case, unfortunately it impacts not just end user code (which we dont know is using it: and I have seen fairly involved usage of these api's, including writing some of my own in the past) - but also our spark UI - for example `AppStatusListener.onTaskEnd` when `Resubmitted` is fired.
   
   If we want to relook at the behavior of how `Resubmitted` is handled in Spark, we should do that decoupled from a performance discussion - it is possible that some of this has evolved in "interesting ways" since initially designed, and requires to be revisited.
   For this PR, which is to improve memory utilization, it should not have unintended side effects IMO.
   
   As I mentioned before, at a minimum, this should not become default.
   Once we relook/redesign `Resubmitted`, we can flip it - if required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "utkarsh39 (via GitHub)" <gi...@apache.org>.
utkarsh39 commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1448110832


##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -289,6 +290,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true}
   }
 
+  test("SPARK-46383: Track TaskInfo objects") {
+    val conf = new SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "SparkListenerSuite", conf)
+    val listener = new SaveActiveTaskInfos
+    sc.addSparkListener(listener)
+    val rdd1 = sc.parallelize(1 to 100, 4)
+    sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
+    sc.listenerBus.waitUntilEmpty()
+    listener.taskInfos.size should be { 0 }

Review Comment:
   `SaveActiveTaskInfos` is caching `TaskInfos` but there are no tests on `TaskInfo` objects and none asserting that the `TaskInfo` objects are expected to remain the same across listener events



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1430656022


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager(
           addPendingTask(index)
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
-          sched.dagScheduler.taskEnded(
+          emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
             tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)

Review Comment:
   Yes, what @JoshRosen [detailed for executorLost](https://github.com/apache/spark/pull/44321#pullrequestreview-1785137821) is what I am referring to - had missed that comment, thanks for the analysis Josh !
   
   Fixing memory issue is definitely something we should aspire for, but IMO not at the cost of breaking interfaces and existing user code - in this case, `Resubmitted` has existed for 8+ years; and has been exposed to developer's.
   How commonly this is triggered depends on stability of the cluster infra - which might be different for different users/deployments :-)
   
   In this specific case, unfortunately it impacts not just end user code (which we dont know is using it: and I have seen fairly involved usage of these api's, including writing some of my own in the past) - but also our spark UI - for example `AppStatusListener.onTaskEnd` when `Resubmitted` is fired.
   
   If we want to relook at the behavior of how `Resubmitted` is handled in Spark, we should do that decoupled from a performance discussion - it is possible that some of this has evolved in "interesting ways" since initially designed, and requires to be revisited (why should users care, etc are discussions to be had there).
   For this PR, which is to improve memory utilization, it should not have unintended side effects IMO.
   
   As I mentioned before, at a minimum, this should not become default.
   Once we relook/redesign `Resubmitted`, we can flip it - if required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1430656022


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager(
           addPendingTask(index)
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
-          sched.dagScheduler.taskEnded(
+          emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
             tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)

Review Comment:
   Yes, what @JoshRosen [detailed for executorLost](https://github.com/apache/spark/pull/44321#pullrequestreview-1785137821) is what I am referring to - had missed that comment, thanks for the analysis Josh !
   
   Reducing memory utilization is definitely something we should aspire for, but IMO not at the cost of breaking interfaces and existing user code - in this case, `Resubmitted` has existed for 8+ years; and has been exposed to developer's.
   How commonly this is triggered depends on stability of the cluster infra - which might be different for different users/deployments :-)
   
   In this specific case, unfortunately it impacts not just end user code (which we dont know how it is being used: and I have seen fairly involved usage of these api's, including writing some of my own in the past) - but also our spark UI - for example `AppStatusListener.onTaskEnd` when `Resubmitted` is fired.
   
   If we want to relook at the behavior of how `Resubmitted` is handled in Spark, we should do that decoupled from a performance discussion - it is possible that some of this has evolved in "interesting ways" since initially designed, and requires to be revisited (why should users care, etc are discussions to be had there).
   For this PR, which is to improve memory utilization, it should not have unintended side effects IMO.
   
   As I mentioned before, at a minimum, this should not become default.
   Once we relook/redesign `Resubmitted`, we can flip it - if required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1429033491


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager(
           addPendingTask(index)
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
-          sched.dagScheduler.taskEnded(
+          emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
             tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)

Review Comment:
   +CC @cloud-fan here, we will be firing with incorrect `accumulables`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428629120


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +850,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
     // Note: "result.value()" only deserializes the value when it's called at the first time, so
     // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo
+   * objects held by this class are long-lived and have a heavy memory footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can install
+   * SparkListeners that compare the TaskInfo objects across these SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index
+    if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {
+      val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables()
+      // Update this task's taskInfo while preserving its position in the list
+      taskAttempts(index) =
+        taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else i }

Review Comment:
   The `else` branch should be hit for `taskAttempts(index).length - 1` positions.



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +850,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
     // Note: "result.value()" only deserializes the value when it's called at the first time, so
     // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo
+   * objects held by this class are long-lived and have a heavy memory footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can install
+   * SparkListeners that compare the TaskInfo objects across these SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index
+    if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {
+      val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables()
+      // Update this task's taskInfo while preserving its position in the list
+      taskAttempts(index) =
+        taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else i }

Review Comment:
   The `else` branch should be hit for `taskAttempts(index).length - 1` total positions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428631597


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +850,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
     // Note: "result.value()" only deserializes the value when it's called at the first time, so
     // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo
+   * objects held by this class are long-lived and have a heavy memory footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can install
+   * SparkListeners that compare the TaskInfo objects across these SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index
+    if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {
+      val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables()
+      // Update this task's taskInfo while preserving its position in the list
+      taskAttempts(index) =
+        taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else i }

Review Comment:
   Oh I misunderstood the code before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #44321: [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()`
URL: https://github.com/apache/spark/pull/44321


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "utkarsh39 (via GitHub)" <gi...@apache.org>.
utkarsh39 commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1445001773


##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -289,6 +290,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true}
   }
 
+  test("SPARK-46383: Track TaskInfo objects") {
+    val conf = new SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "SparkListenerSuite", conf)
+    val listener = new SaveActiveTaskInfos
+    sc.addSparkListener(listener)
+    val rdd1 = sc.parallelize(1 to 100, 4)
+    sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
+    sc.listenerBus.waitUntilEmpty()
+    listener.taskInfos.size should be { 0 }

Review Comment:
   I don't mind dropping it. I was just trying to assert one of the ways SparkListeners could be used. The test is more of a general test to ensure that we preserve the behavior of SparkListeners 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1443157668


##########
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala:
##########
@@ -289,6 +290,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
     stageInfo.rddInfos.forall(_.numPartitions == 4) should be {true}
   }
 
+  test("SPARK-46383: Track TaskInfo objects") {
+    val conf = new SparkConf().set(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION, true)
+    sc = new SparkContext("local", "SparkListenerSuite", conf)
+    val listener = new SaveActiveTaskInfos
+    sc.addSparkListener(listener)
+    val rdd1 = sc.parallelize(1 to 100, 4)
+    sc.runJob(rdd1, (items: Iterator[Int]) => items.size, Seq(0, 1))
+    sc.listenerBus.waitUntilEmpty()
+    listener.taskInfos.size should be { 0 }

Review Comment:
   Isn't that not simply an implementation detail ?
   I am not sure what is the behavior we are testing for here - and how would this test help with some future change (and validation).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428630951


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +850,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
     // Note: "result.value()" only deserializes the value when it's called at the first time, so
     // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo
+   * objects held by this class are long-lived and have a heavy memory footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can install
+   * SparkListeners that compare the TaskInfo objects across these SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index
+    if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {
+      val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables()
+      // Update this task's taskInfo while preserving its position in the list
+      taskAttempts(index) =
+        taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else i }

Review Comment:
   We don't know the position of the this task attempt's info within the `taskInfos(index)` list, so the logic above is doing a search-and-replace using object identity. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #44321:
URL: https://github.com/apache/spark/pull/44321#issuecomment-1859050296

   @cloud-fan thanks for checking !
   Note that this does not apply to `executorLost` case though (I will call it out in the exact location) - since the `Resubmitted` event will now have invalid `accumulables`.
   
   At a minimum, this should be an opt-in and not default on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1430656022


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager(
           addPendingTask(index)
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
-          sched.dagScheduler.taskEnded(
+          emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
             tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)

Review Comment:
   Yes, what @JoshRosen [detailed for executorLost](https://github.com/apache/spark/pull/44321#pullrequestreview-1785137821) is what I am referring to - had missed that comment, thanks for the analysis Josh !
   
   Reducing memory utilization is definitely something we should aspire for, but IMO not at the cost of breaking interfaces and existing user code - in this case, `Resubmitted` has existed for 8+ years; and has been exposed to developer's.
   How commonly this is triggered depends on stability of the cluster infra - which might be different for different users/deployments :-)
   
   In this specific case, unfortunately it impacts not just end user code (which we dont know how it is being used: and I have seen fairly involved usage of these api's in general, including writing some of my own in the past) - but also our spark UI - for example `AppStatusListener.onTaskEnd` when `Resubmitted` is fired.
   
   If we want to relook at the behavior of how `Resubmitted` is handled in Spark, we should do that decoupled from a performance discussion - it is possible that some of this has evolved in "interesting ways" since initially designed, and requires to be revisited (why should users care, etc are discussions to be had in that context).
   For this PR, which is to improve memory utilization, it should not have unintended side effects IMO.
   
   As I mentioned before, at a minimum, this should not become default.
   Once we relook/redesign `Resubmitted`, we can flip it - if required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "utkarsh39 (via GitHub)" <gi...@apache.org>.
utkarsh39 commented on PR #44321:
URL: https://github.com/apache/spark/pull/44321#issuecomment-1881845026

   @mridulm Can you PTAL?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #44321:
URL: https://github.com/apache/spark/pull/44321#issuecomment-1888334658

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428630112


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -787,6 +787,8 @@ private[spark] class TaskSetManager(
     // SPARK-37300: when the task was already finished state, just ignore it,
     // so that there won't cause successful and tasksSuccessful wrong result.

Review Comment:
   I think this branch is handling a rare corner-case where the same `TaskSetManager` can mark the same task as both succeeded but failed. There is some detailed prior discussion of this in https://issues.apache.org/jira/browse/SPARK-37300



##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -787,6 +787,8 @@ private[spark] class TaskSetManager(
     // SPARK-37300: when the task was already finished state, just ignore it,
     // so that there won't cause successful and tasksSuccessful wrong result.

Review Comment:
   I think this branch is handling a rare corner-case where the same `TaskSetManager` can mark the same task as both succeeded and failed. There is some detailed prior discussion of this in https://issues.apache.org/jira/browse/SPARK-37300



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428689443


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -846,11 +850,49 @@ private[spark] class TaskSetManager(
     // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
     // Note: "result.value()" only deserializes the value when it's called at the first time, so
     // here "result.value()" just returns the value and won't block other threads.
-    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates,
-      result.metricPeaks, info)
+
+    emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid, tasks(index), Success, result.value(),
+      result.accumUpdates, result.metricPeaks, info)
     maybeFinishTaskSet()
   }
 
+  /**
+   * A wrapper around [[DAGScheduler.taskEnded()]] that empties out the accumulables for the
+   * TaskInfo object, corresponding to the completed task, referenced by this class.
+   *
+   * SPARK-46383: For the completed task, we ship the original TaskInfo to the DAGScheduler and only
+   * retain a cloned TaskInfo in this class. We then set the accumulables to Nil for the TaskInfo
+   * object that corresponds to the completed task.
+   * We do this to release references to `TaskInfo.accumulables()` as the TaskInfo
+   * objects held by this class are long-lived and have a heavy memory footprint on the driver.
+   *
+   * This is safe as the TaskInfo accumulables are not needed once they are shipped to the
+   * DAGScheduler where they are aggregated. Additionally, the original TaskInfo, and not a
+   * clone, must be sent to the DAGScheduler as this TaskInfo object is sent to the
+   * DAGScheduler on multiple events during the task's lifetime. Users can install
+   * SparkListeners that compare the TaskInfo objects across these SparkListener events and
+   * thus the TaskInfo object sent to the DAGScheduler must always reference the same TaskInfo
+   * object.
+   */
+  private def emptyTaskInfoAccumulablesAndNotifyDagScheduler(
+      taskId: Long,
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Seq[AccumulatorV2[_, _]],
+      metricPeaks: Array[Long],
+      taskInfo: TaskInfo): Unit = {
+    val index = taskInfo.index
+    if (conf.get(DROP_TASK_INFO_ACCUMULABLES_ON_TASK_COMPLETION)) {
+      val clonedTaskInfo = taskInfo.cloneWithEmptyAccumulables()
+      // Update this task's taskInfo while preserving its position in the list
+      taskAttempts(index) =
+        taskAttempts(index).map { i => if (i eq taskInfo) clonedTaskInfo else i }
+      taskInfos(taskId) = clonedTaskInfo
+    }
+    sched.dagScheduler.taskEnded(task, reason, result, accumUpdates, metricPeaks, taskInfo)

Review Comment:
   Here it sends the original taskInfo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #44321:
URL: https://github.com/apache/spark/pull/44321#issuecomment-1858720574

   @mridulm `LiveTask` gets the `TaskInfo` via the listener, this PR sends the original `TaskInfo` instance to `DAGScheduler` and thus to the event bus. We keep the cloned `TaskInfo` with empty accumulables in `TaskSetManager`, assuming the listener won't hold the original `TaskInfo` instance for a long time. Built-in listeners are fine, they just aggregate and throw away. User listeners may still cause memory issues, but this is out of our control.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #44321:
URL: https://github.com/apache/spark/pull/44321#issuecomment-1866052989

   Sounds good to me, thoughts @JoshRosen, @cloud-fan ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1429033491


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager(
           addPendingTask(index)
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
-          sched.dagScheduler.taskEnded(
+          emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
             tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)

Review Comment:
   +CC @cloud-fan here, we will be firing with incorrect `accumulables`.
   Given `TaskInfo` is a `DeveloperApi`, this will impact user listeners - depending on how they handle `Resubmitted` event.
   
   Thoughts ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1430850168


##########
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##########
@@ -1086,7 +1133,7 @@ private[spark] class TaskSetManager(
           addPendingTask(index)
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
-          sched.dagScheduler.taskEnded(
+          emptyTaskInfoAccumulablesAndNotifyDagScheduler(tid,
             tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)

Review Comment:
   I checked the SQL UI (`SQLAppStatusListener`) and the task metrics from `Resubmitted` is ignored because the partition is already marked as finished. Anyway, I'm fine to turn it off by default, but I don't really think the metrics of a `Resubmitted` event is useful. We can discuss it in a different thread though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428629362


##########
core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala:
##########
@@ -75,14 +75,47 @@ class TaskInfo(
    * accumulable to be updated multiple times in a single task or for two accumulables with the
    * same name but different IDs to exist in a task.
    */
-  def accumulables: Seq[AccumulableInfo] = _accumulables
+  def accumulables: Seq[AccumulableInfo] = {
+    if (throwOnAccumulablesCall) {
+      throw new IllegalStateException("Accumulables for the TaskInfo have been cleared")

Review Comment:
   +1, good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "JoshRosen (via GitHub)" <gi...@apache.org>.
JoshRosen commented on PR #44321:
URL: https://github.com/apache/spark/pull/44321#issuecomment-1866794659

   The proposed "make the behavior change optional and off-by-default with option for users to opt-in" approach sounds reasonable to me: users or platforms that don't rely on the hopefully-rare corner-case listener behavior can choose to opt-in in order to address a major contributor to driver memory problems with large task sets 👍 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44321:
URL: https://github.com/apache/spark/pull/44321#discussion_r1428552502


##########
core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala:
##########
@@ -75,14 +75,47 @@ class TaskInfo(
    * accumulable to be updated multiple times in a single task or for two accumulables with the
    * same name but different IDs to exist in a task.
    */
-  def accumulables: Seq[AccumulableInfo] = _accumulables
+  def accumulables: Seq[AccumulableInfo] = {
+    if (throwOnAccumulablesCall) {
+      throw new IllegalStateException("Accumulables for the TaskInfo have been cleared")

Review Comment:
   let's use `SparkException.internalError`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46383] Reduce Driver Heap Usage by Reducing the Lifespan of `TaskInfo.accumulables()` [spark]

Posted by "utkarsh39 (via GitHub)" <gi...@apache.org>.
utkarsh39 commented on PR #44321:
URL: https://github.com/apache/spark/pull/44321#issuecomment-1874648077

   Disabled the changes by default @JoshRosen  @mridulm. Can you all PTAL?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org