You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/18 03:37:26 UTC

[GitHub] [spark] wineternity opened a new pull request, #38702: SPARK-41187 [Core] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

wineternity opened a new pull request, #38702:
URL: https://github.com/apache/spark/pull/38702

   ### What changes were proposed in this pull request?
   Ignore the SparkListenerTaskEnd with Reason "Resubmitted" to avoid memory leak
   
   
   ### Why are the changes needed?
   For a long running spark thriftserver,  LiveExecutor will be accumulated in the deadExecutors HashMap and cause message event queue processing slowly
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   New UT Added
   Test in thriftserver env
   
   ### The way to reproduce
   I try to reproduce it in spark shell, but it is a little bit handy
   1.  start spark-shell , set spark.dynamicAllocation.maxExecutors=2 for convient
   ` bin/spark-shell  --driver-java-options "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8006"`
   2. run a job with shuffle
    `sc.parallelize(1 to 1000, 10).map { x => Thread.sleep(1000) ; (x % 3, x) }.reduceByKey((a, b) => a + b).collect()`
   3. After some ShuffleMapTask finished, kill one or two executor to let tasks resubmitted
   4. check by heap dump or debug
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1035584393


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors
+    if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   > Couple of things to clarify (the pr description/comments confused me a bit).
   > 
   > a) There was a pair of task start and task end event which were fired for the task (let us call it Tr) b) When executor which ran Tr was lost, while stage is still running, a `Resubmitted` is fired for Tr. c) Subsequently, a new task start and task end will be fired for the retry of Tr.
   > 
   > The issue here is, (b) is associated with (a) - which was a success earlier, but has now been marked as a failure.
   > 
   > We should not be ignoring this event, but rather deal with it properly. For example, update counters, number of failed tasks, etc.
   
   The steps is right, and in my opinion the task end event in (b) with the `Resubmitted` reason can be ignored in AppStatusListener, ignore it also make the logic clear. 
   
    In (a), the counter and metric is already recorded for Tr. eg. the stage.completedTasks, stage.job.completedTasks, stage.executorSummary.taskTime and so on.
    In (b), we got an extra task end with the "Resubmitted" reason for Tr.  thus Tr has one start event with two end event. The activeTask counter in stage will be wrong, which is the root cause for this jira.
    In (c), the counter and metric will be recorded for the retry of Tr.
   
    For the metrics, if we handled the task end message in (b), it will be redundant, and cause wrong value, for example stage.executorSummary.taskTime will be added again, will need to excluded them in `Resubmitted` situation.
   
    And for the counters, the one need to be discussed is the counter `failedTasks`.  I think it both make sense for the following two situations: 
   1. ( completedTasks, failedTasks, killedTasks ) = (2, 0, 0) , which means Tr success twice with no failure, actually resubmitted is a specific signal, it just rerun a success task, no task is actually failed
   and 
   2. ( completedTasks, failedTasks, killedTasks ) = (2, 1, 0) , which means Tr success twice and we think resubmitted is a failure.  But it also will be confused with the tasks which really failed, like the tasks which is running when executor lost, these one is the really failed ones . 
   
   Am I making sense? 
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1041671057


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors
+    if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   OK , so let's focus on fix the active tasks first and left the failed counters go up, I have change the pr,  could you help check it @mridulm @Ngone51 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1044064944


##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"SPARK-41187: Stage should be removed from liveStages to avoid deadExecutors accumulated") {

Review Comment:
   ```suggestion
     test("SPARK-41187: Stage should be removed from liveStages to avoid deadExecutors accumulated") {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38702: [SPARK-41187][Core] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38702:
URL: https://github.com/apache/spark/pull/38702#issuecomment-1320614168

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034721151


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors
+    if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   1. First in TaskSetManager.scala, we handle the executorLost
   ``` scala
     override def executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit = {
       // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage,
       // and we are not using an external shuffle server which could serve the shuffle outputs.
       // The reason is the next stage wouldn't be able to fetch the data from this dead executor
       // so we would need to rerun these tasks on other executors.
       if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) {
         for ((tid, info) <- taskInfos if info.executorId == execId) {
           val index = info.index
           // We may have a running task whose partition has been marked as successful,
           // this partition has another task completed in another stage attempt.
           // We treat it as a running task and will call handleFailedTask later.
           if (successful(index) && !info.running && !killedByOtherAttempt.contains(tid)) {
             successful(index) = false
             copiesRunning(index) -= 1
             tasksSuccessful -= 1
             addPendingTask(index)
             // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
             // stage finishes when a total of tasks.size tasks finish.
             sched.dagScheduler.taskEnded(
               tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)
           }
         }
       }
   ```
   2. the successful task is added to queue to re-execute in `addPendingTask`
   3. `sched.dagScheduler.taskEnded` is called to tell DAGScheduler to handle this, this will post a `CompletionEvent` to eventProcessLoop  
   4. the function `handleTaskCompletion` in DATScheduler.scala will handle this message, as the reason `Resubmitted` is kind of a failure, it will call the function `handleResubmittedFailure`,  which add the task partitionId to ShuffleMapStage's pendingPartitions. Thus this stage will wait this resubmitted task to finish again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1040439989


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors
+    if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   It has been a while since I looked at it, but this is what I expect to happen.
   
   - When a task is marked a `Resubmitted`, the stage metrics are reverted from the earlier (successful) task completion. (`metricsDelta`)
   - Failed tasks counter should go up - for both stage and executor.
   - Active tasks MUST not go down - this is the issue we have to fix.
   
   Rest of the method should not have too much of a change imo.
   
   Cab the change not simply be limited to `if (event.reason != Resubmitted) <foo>.activeTasks -= 1` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034309933


##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") {
+
+    val listener = new AppStatusListener(store, conf, true)
+
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+    time += 1
+    stage.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+
+    val tasks = createTasks(2, Array("1", "2"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    // executor lost, success task will be resubmitted

Review Comment:
    The reason the "resubmitted" task added is added in the comment       
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034309933


##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") {
+
+    val listener = new AppStatusListener(store, conf, true)
+
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+    time += 1
+    stage.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+
+    val tasks = createTasks(2, Array("1", "2"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    // executor lost, success task will be resubmitted

Review Comment:
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034798225


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors

Review Comment:
   > 
   
   You make it more clearly, great



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034824042


##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") {
+
+    val listener = new AppStatusListener(store, conf, true)
+
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+    time += 1
+    stage.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+
+    val tasks = createTasks(2, Array("1", "2"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    // executor lost, success task will be resubmitted

Review Comment:
   > Could you copy-paste your analysis in JIRA to the PR description to elaborate the issue more clearly?
   
   done, I used the description in your suggestion, I think it's more clear. And by the way, I commit the suggestion one by one which cause too many commit, could I rebase them into one and force push it ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on PR #38702:
URL: https://github.com/apache/spark/pull/38702#issuecomment-1340463701

   > The change looks good to me. +CC @Ngone51
   > 
   > Btw, do you also want to remove the `if (event.taskInfo == null) {` check in beginning of `onTaskEnd` ?
   > 
   > Make it a precondition check ? `Preconditions.checkNotNull(event.taskInfo)`
   
   Yes, it can be change to a precondition check. Maybe I can change it in a new pr after test. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1044071110


##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"SPARK-41187: Stage should be removed from liveStages to avoid deadExecutors accumulated") {
+
+    val listener = new AppStatusListener(store, conf, true)
+
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+    time += 1
+    stage.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+
+    val tasks = createTasks(2, Array("1", "2"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    // executor lost, success task will be resubmitted
+    time += 1
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Resubmitted, tasks(0), new ExecutorMetrics, null))
+
+    // executor lost, running task will be failed and rerun
+    time += 1
+    tasks(1).markFinished(TaskState.FAILED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      ExecutorLostFailure("1", true, Some("Lost executor")), tasks(1), new ExecutorMetrics,
+      null))
+
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    time += 1
+    tasks(1).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))

Review Comment:
   Shouldn't be `tasks(1)` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1044064601


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -689,7 +689,15 @@ private[spark] class AppStatusListener(
       if (metricsDelta != null) {
         stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, metricsDelta)
       }
-      stage.activeTasks -= 1
+      // SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason, which is raised by
+      // executor lost, it can lead to negative `LiveStage.activeTasks` since there's no
+      // corresponding `SparkListenerTaskStart` event for each of them. The negative activeTasks
+      // will make the stage always remains in the live stage list as it can never meet the
+      // condition activeTasks == 0. This in turn causes the dead executor to never be retained
+      // if that live stage's submissionTime is less than the dead executor's removeTime.
+      if (event.reason != Resubmitted) {

Review Comment:
   Can we extend xxxDelta above to include`Resubmitted` reason (e.g., activeTaskDelta) so we don't have to check `event.reason` every time below?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38702:
URL: https://github.com/apache/spark/pull/38702#issuecomment-1345872533

   Merged to master and brach-3.3
   Thanks for working on this @wineternity !
   Thanks for the reviews @cloud-fan, @Ngone51, @dongjoon-hyun :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1044581470


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -674,22 +674,30 @@ private[spark] class AppStatusListener(
       delta
     }.orNull
 
-    val (completedDelta, failedDelta, killedDelta) = event.reason match {
+    // SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason, which is raised by
+    // executor lost, it can lead to negative `LiveStage.activeTasks` since there's no
+    // corresponding `SparkListenerTaskStart` event for each of them. The negative activeTasks
+    // will make the stage always remains in the live stage list as it can never meet the
+    // condition activeTasks == 0. This in turn causes the dead executor to never be retained
+    // if that live stage's submissionTime is less than the dead executor's removeTime.
+    val (completedDelta, failedDelta, killedDelta, activeTasksDelta) = event.reason match {

Review Comment:
   nit:
   ```suggestion
       val (completedDelta, failedDelta, killedDelta, activeDelta) = event.reason match {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on PR #38702:
URL: https://github.com/apache/spark/pull/38702#issuecomment-1345497906

   > 
   
   my pleasure 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1032056919


##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") {
+
+    val listener = new AppStatusListener(store, conf, true)
+
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+    time += 1
+    stage.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+
+    val tasks = createTasks(2, Array("1", "2"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    // executor lost, success task will be resubmitted

Review Comment:
   why would Spark resubmit a successful task?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034721151


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors
+    if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   > to help understand the code more, do you know when/where the scheduler handles resubmitted tasks?
   1. First in TaskSetManager.scala, we handle the executorLost
   ``` scala
     override def executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit = {
       // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage,
       // and we are not using an external shuffle server which could serve the shuffle outputs.
       // The reason is the next stage wouldn't be able to fetch the data from this dead executor
       // so we would need to rerun these tasks on other executors.
       if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) {
         for ((tid, info) <- taskInfos if info.executorId == execId) {
           val index = info.index
           // We may have a running task whose partition has been marked as successful,
           // this partition has another task completed in another stage attempt.
           // We treat it as a running task and will call handleFailedTask later.
           if (successful(index) && !info.running && !killedByOtherAttempt.contains(tid)) {
             successful(index) = false
             copiesRunning(index) -= 1
             tasksSuccessful -= 1
             addPendingTask(index)
             // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
             // stage finishes when a total of tasks.size tasks finish.
             sched.dagScheduler.taskEnded(
               tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)
           }
         }
       }
   ```
   2. the successful task is added to queue to re-execute in `addPendingTask`
   3. `sched.dagScheduler.taskEnded` is called to tell DAGScheduler to handle this, this will post a `CompletionEvent` to eventProcessLoop  
   4. the function `handleTaskCompletion` in DATScheduler.scala will handle this message, as the reason `Resubmitted` is kind of a failure, it will call the function `handleResubmittedFailure`,  which add the task partitionId to ShuffleMapStage's pendingPartitions. Thus this stage will wait this resubmitted task to finish again.
   ``` scala
     private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = {
       logInfo(s"Resubmitted $task, so marking it as still running.")
       stage match {
         case sms: ShuffleMapStage =>
           sms.pendingPartitions += task.partitionId
   
         case _ =>
           throw SparkCoreErrors.sendResubmittedTaskStatusForShuffleMapStagesOnlyError()
       }
     }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1035883778


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,15 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?

Review Comment:
   Yea. I checked. Null taskInfo is not possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1040532427


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors
+    if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   > Can the change not simply be limited to...
   
   Ok..if our target is to only be able to remove the dead executors, I think this's good enough. But if we also want to correct the metrics, it's not enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1044596644


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -674,22 +674,30 @@ private[spark] class AppStatusListener(
       delta
     }.orNull
 
-    val (completedDelta, failedDelta, killedDelta) = event.reason match {
+    // SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason, which is raised by
+    // executor lost, it can lead to negative `LiveStage.activeTasks` since there's no
+    // corresponding `SparkListenerTaskStart` event for each of them. The negative activeTasks
+    // will make the stage always remains in the live stage list as it can never meet the
+    // condition activeTasks == 0. This in turn causes the dead executor to never be retained
+    // if that live stage's submissionTime is less than the dead executor's removeTime.
+    val (completedDelta, failedDelta, killedDelta, activeTasksDelta) = event.reason match {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] itholic commented on pull request #38702: SPARK-41187 [Core] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
itholic commented on PR #38702:
URL: https://github.com/apache/spark/pull/38702#issuecomment-1319566712

   Can we change the JIRA format in the title such as "[SPARK-41187][CORE] ...".
   
   Check the [Spark contribution guide](https://spark.apache.org/contributing.html) also would helpful!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034309933


##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") {
+
+    val listener = new AppStatusListener(store, conf, true)
+
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+    time += 1
+    stage.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+
+    val tasks = createTasks(2, Array("1", "2"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    // executor lost, success task will be resubmitted

Review Comment:
    The reason the resubmitted event added is added in the comment       
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
   The successful task will be rerun, and the DAGScheduler need this event to know the map stage is not finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034721151


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors
+    if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   1. First in TaskSetManager.scala, we handle the executorLost
   ``` scala
     override def executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit = {
       // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage,
       // and we are not using an external shuffle server which could serve the shuffle outputs.
       // The reason is the next stage wouldn't be able to fetch the data from this dead executor
       // so we would need to rerun these tasks on other executors.
       if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) {
         for ((tid, info) <- taskInfos if info.executorId == execId) {
           val index = info.index
           // We may have a running task whose partition has been marked as successful,
           // this partition has another task completed in another stage attempt.
           // We treat it as a running task and will call handleFailedTask later.
           if (successful(index) && !info.running && !killedByOtherAttempt.contains(tid)) {
             successful(index) = false
             copiesRunning(index) -= 1
             tasksSuccessful -= 1
             addPendingTask(index)
             // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
             // stage finishes when a total of tasks.size tasks finish.
             sched.dagScheduler.taskEnded(
               tasks(index), Resubmitted, null, Seq.empty, Array.empty, info)
           }
         }
       }
   ```
   2. the successful task is added to queue to re-execute in `addPendingTask`
   3. `sched.dagScheduler.taskEnded` is called to tell DAGScheduler to handle this, this will post a `CompletionEvent` to eventProcessLoop  
   4. the function `handleTaskCompletion` in DATScheduler.scala will handle this message, as the reason `Resubmitted` is kind of a failure, it will call the function `handleResubmittedFailure`,  which add the task partitionId to ShuffleMapStage's pendingPartitions. Thus this stage will wait this resubmitted task to finish again.
   ``` scala
     private def handleResubmittedFailure(task: Task[_], stage: Stage): Unit = {
       logInfo(s"Resubmitted $task, so marking it as still running.")
       stage match {
         case sms: ShuffleMapStage =>
           sms.pendingPartitions += task.partitionId
   
         case _ =>
           throw SparkCoreErrors.sendResubmittedTaskStatusForShuffleMapStagesOnlyError()
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1035472239


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,15 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason, which is raised by executor lost,
+    // it can lead to negative `LiveStage.activeTasks` since there's no corresponding `SparkListenerTaskStart`
+    // event for each of them. The negative activeTasks will make the stage always remains in the live stage list
+    // as it can never meet the condition activeTasks == 0. This in turn causes the dead executor to never be
+    // retained if that live stage's submissionTime is less than the dead executor's removeTime( see

Review Comment:
   nit:
   ```suggestion
       // cleaned up if that live stage's submissionTime is less than the dead executor's removeTime( see
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034309587


##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") {
+
+    val listener = new AppStatusListener(store, conf, true)
+
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+    time += 1
+    stage.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+
+    val tasks = createTasks(2, Array("1", "2"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    // executor lost, success task will be resubmitted

Review Comment:
   Please check the  JIRA https://issues.apache.org/jira/browse/SPARK-41187。 For a shuffle map stage tasks, if a executor lost happen,  the finished task will be resubmitted, and send out a taskEnd Message with reason "Resubmitted" in TaskSetManager.scala, this will cause the  activeTask in AppStatusListner's liveStage become negative



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034713383


##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") {
+
+    val listener = new AppStatusListener(store, conf, true)
+
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+    time += 1
+    stage.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+
+    val tasks = createTasks(2, Array("1", "2"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    // executor lost, success task will be resubmitted

Review Comment:
   OK~ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034309933


##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") {
+
+    val listener = new AppStatusListener(store, conf, true)
+
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+    time += 1
+    stage.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+
+    val tasks = createTasks(2, Array("1", "2"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    // executor lost, success task will be resubmitted

Review Comment:
    The reason the resubmitted event sent is added in the comment  in  TaskSetManager.scala    
           // Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
           // stage finishes when a total of tasks.size tasks finish.
   The successful task will be rerun, and the DAGScheduler need this event to know the map stage is not finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1040439989


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors
+    if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   It has been a while since I looked at it, but this is what I expect to happen.
   
   - When a task is marked a `Resubmitted`, the stage metrics are reverted from the earlier (successful) task completion. (`metricsDelta`)
   - Failed tasks counter should go up - for both stage and executor.
   - Active tasks MUST not go down - this is the issue we have to fix.
   
   Rest of the method should not have too much of a change imo.
   
   Can the change not simply be limited to `if (event.reason != Resubmitted) <foo>.activeTasks -= 1` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on PR #38702:
URL: https://github.com/apache/spark/pull/38702#issuecomment-1345501343

   > +1, LGTM. Thank you, @wineternity and all.
   
   my pleasure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on PR #38702:
URL: https://github.com/apache/spark/pull/38702#issuecomment-1327055076

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1035507465


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,15 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?

Review Comment:
   To answer this question, no `taskInfo` cannot be `null`  IMO - can you also take a look at it @Ngone51 ?
   We can remove this check and the TODO once we are sure.



##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors
+    if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   Couple of things to clarify (the pr description/comments confused me a bit).
   
   a) There was a pair of task start and task end event which were fired for the task (let us call it Tr)
   b) When executor which ran Tr was lost, while stage is still running, a `Resubmitted` is fired for Tr.
   c) Subsequently, a new task start and task end will be fired for the retry of Tr.
   
   
   The issue here is, (b) is associated with (a) - which was a success earlier, but has now been marked as a failure.
   
   
   We should not be ignoring this event, but rather deal with it properly.
   For example, update counters, number of failed tasks, etc.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034655159


##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") {
+
+    val listener = new AppStatusListener(store, conf, true)
+
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+    time += 1
+    stage.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+
+    val tasks = createTasks(2, Array("1", "2"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    // executor lost, success task will be resubmitted
+    time += 1
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Resubmitted, tasks(0), new ExecutorMetrics, null))
+
+    // executor lost, running task will be failed and rerun
+    time += 1
+    tasks(1).markFinished(TaskState.FAILED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      ExecutorLostFailure("1", true, Some("Lost executor")), tasks(1), new ExecutorMetrics,
+      null))
+
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    time += 1
+    tasks(1).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    listener.onStageCompleted(SparkListenerStageCompleted(stage))
+    time += 1
+    listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded ))
+
+    time += 1
+    listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "1", "Test"))
+    time += 1
+    listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "2", "Test"))
+
+    assert( listener.deadExecutors.size === 0 )

Review Comment:
   ```suggestion
       assert(listener.deadExecutors.size === 0)
   ```



##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") {

Review Comment:
   ```suggestion
     test(s"SPARK-41187: Stage should be removed from liveStages to avoid deadExecutors accumulated") {
   ```



##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors

Review Comment:
   ```suggestion
       // SPARK-41187: For `SparkListenerTaskEnd` with `Resubmitted` reason, which is raised by executor lost,
       // it can lead to negative `LiveStage.activeTasks` since there's no corresponding `SparkListenerTaskStart`
       // event for each of them. The negative activeTasks will make the stage always remains in the live stage list
       // as it can never meet the condition activeTasks == 0. This in turn causes the dead executor to never be
       // retained if that live stage's submissionTime is less than the dead executor's removeTime( see
       // isExecutorActiveForLiveStages). Since this kind of `SparkListenerTaskEnd` is useless here, we simply
       // ignore it.
   ```



##########
core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala:
##########
@@ -1849,6 +1849,68 @@ abstract class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter
     checkInfoPopulated(listener, logUrlMap, processId)
   }
 
+  test(s"Stage should be removed from liveStages to avoid deadExecutors accumulated") {
+
+    val listener = new AppStatusListener(store, conf, true)
+
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+
+    time += 1
+    stage.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+
+    val tasks = createTasks(2, Array("1", "2"))
+    tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+    }
+
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+      Success, tasks(0), new ExecutorMetrics, null))
+
+    // executor lost, success task will be resubmitted

Review Comment:
   Could you copy-paste your analysis in JIRA to the PR description to elaborate the issue more clearly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1034461758


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors
+    if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   to help understand the code more, do you know when/where the scheduler handles resubmitted tasks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #38702:
URL: https://github.com/apache/spark/pull/38702#issuecomment-1330292664

   cc @jiangxb1987 @Ngone51 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1040350789


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors
+    if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   @wineternity  Thanks for the detailed analysis. Seems like the only metric that the `Resubmmitted` task end event could affect is the failed/completed tasks counters. And other metrics like executor summary, and shuffle read/write metrics are duplicated. So I tend to agree to recognize the `Resubmmitted` task end event as a signal rather than a real task end event. 
   
   @mridulm WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on a diff in pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on code in PR #38702:
URL: https://github.com/apache/spark/pull/38702#discussion_r1039103663


##########
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala:
##########
@@ -645,8 +645,11 @@ private[spark] class AppStatusListener(
   }
 
   override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
-    // TODO: can this really happen?
-    if (event.taskInfo == null) {
+    // TODO: can taskInfo null really happen?
+    // For resubmitted tasks caused by ExecutorLost, the SparkListenerTaskEnd is useless and
+    // will make activeTask in stage to be negative, this will cause stage not be removed in
+    // liveStages, and finally cause executor not removed in deadExecutors
+    if (event.taskInfo == null ||  event.reason == Resubmitted) {

Review Comment:
   @cloud-fan @Ngone51 @mridulm any advice for the metrics and counters? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on PR #38702:
URL: https://github.com/apache/spark/pull/38702#issuecomment-1343811023

   > Btw, do you also want to remove the if (event.taskInfo == null) { check in beginning of onTaskEnd ?
   
   @mridulm  Since the latest PR fix doesn't involve the metrics, I think we can skip this removal to keep the current changes as much simpler as possible. We can back to it when working on metrics stuff. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wineternity commented on pull request #38702: [SPARK-41187][Core] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
wineternity commented on PR #38702:
URL: https://github.com/apache/spark/pull/38702#issuecomment-1321774634

   @itholic @vanzin Could you help review this patch?  thanks very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm closed pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen

Posted by GitBox <gi...@apache.org>.
mridulm closed pull request #38702: [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen
URL: https://github.com/apache/spark/pull/38702


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org