You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "JoshRosen (via GitHub)" <gi...@apache.org> on 2023/09/28 22:17:50 UTC

[GitHub] [spark] JoshRosen commented on a diff in pull request #39767: [SPARK-42205][CORE] Don't log accumulator values in stage / task start event logs

JoshRosen commented on code in PR #39767:
URL: https://github.com/apache/spark/pull/39767#discussion_r1340709730


##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -163,7 +165,7 @@ private[spark] object JsonProtocol {
     g.writeStartObject()
     g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult)
     g.writeFieldName("Task Info")
-    taskInfoToJson(taskInfo, g)
+    taskInfoToJson(taskInfo, g, includeAccumulables = true)

Review Comment:
   Good catch:
   
   I think that this should actually be `false` because the presence of non-empty accumulables in `SparkListenerTaskGettingResult` is prone to the same non-deterministic race as in `SparkListenerTaskStart`.
   
   Tracing through the call chain that leads to the event being posted:
   
   - [In TaskResultGetter](https://github.com/apache/spark/blob/b3d5bc0c10908aa66510844eaabc43b6764dd7c0/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L87) `TaskScheduler.handleTaksGettingResult` is called when an `IndirectTaskResult` fetch begins.
   - [The TaskSchedulerImpl calls the handler on the TaskSetManager](https://github.com/apache/spark/blob/b3d5bc0c10908aa66510844eaabc43b6764dd7c0/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L924-L926)
   - [The TaskSetManager enqueues an event on the DAGScheduler event loop](https://github.com/apache/spark/blob/b3d5bc0c10908aa66510844eaabc43b6764dd7c0/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L753-L760)
   - [The DAGScheduler loop enqueues the listener event](https://github.com/apache/spark/blob/b3d5bc0c10908aa66510844eaabc43b6764dd7c0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1262-L1264)
   
   These steps always take place prior to the publishing of the SparkListenerTaskEnd event because that event is published after `TaskScheduler.handleSuccessfulTask` or `TaskScheduler.handleFailedTask` are called, both of which take place after the indirect task result fetching starts.
   
   Given this, I think we should exclude this for `SparkListenerTaskGettingResult` as well.
   
   `SparkListenerTaskGettingResult` is somewhat rare event compared to task starts and ends because most tasks don't use the indirect task result path. However, there are rare cases where tasks can have such large accumulator updates that we can wind up with many / most tasks emitting IndirectTaskResults. In those cases, dropping the accumulables could be a size win.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org