You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/03/30 22:48:53 UTC

[spark] branch master updated: [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a6b1770c85 [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled
1a6b1770c85 is described below

commit 1a6b1770c85f37982b15d261abf9cc6e4be740f4
Author: Xingbo Jiang <xi...@databricks.com>
AuthorDate: Thu Mar 30 15:48:04 2023 -0700

    [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled
    
    ### What changes were proposed in this pull request?
    The PR fixes a bug that SparkListenerTaskStart can have `stageAttemptId = -1` when a task is launched after the stage is cancelled. Actually, we should use the information within `Task` to update the `stageAttemptId` field.
    
    ### Why are the changes needed?
    -1 is not a legal stageAttemptId value, thus it can lead to unexpected problem if a subscriber try to parse the stage information from the SparkListenerTaskStart event.
    
    ### Does this PR introduce _any_ user-facing change?
    No, it's a bugfix.
    
    ### How was this patch tested?
    Manually verified.
    
    Closes #40592 from jiangxb1987/SPARK-42967.
    
    Authored-by: Xingbo Jiang <xi...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +-----
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 065bf88ab16..c78a26d91eb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1183,11 +1183,7 @@ private[spark] class DAGScheduler(
   }
 
   private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit = {
-    // Note that there is a chance that this task is launched after the stage is cancelled.
-    // In that case, we wouldn't have the stage anymore in stageIdToStage.
-    val stageAttemptId =
-      stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
-    listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
+    listenerBus.post(SparkListenerTaskStart(task.stageId, task.stageAttemptId, taskInfo))
   }
 
   private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org