You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "Stove-hust (via GitHub)" <gi...@apache.org> on 2023/03/13 08:25:23 UTC

[GitHub] [spark] Stove-hust opened a new pull request, #40393: []SPARK-40082]

Stove-hust opened a new pull request, #40393:
URL: https://github.com/apache/spark/pull/40393

   ### What changes were proposed in this pull request?
   Copy the logic of handleTaskCompletion in DAGScheduler for processing the last shuffleMapTask into submitMissingTasks.
   
   
   ### Why are the changes needed?
   In condition of push-based shuffle being enabled and speculative tasks existing, a shuffleMapStage will be resubmitting once fetchFailed occurring, then its parent stages will be resubmitting firstly and it will cost some time to compute. Before the shuffleMapStage being resubmitted, its all speculative tasks success and register map output, but speculative task successful events can not trigger shuffleMergeFinalized( shuffleBlockPusher.notifyDriverAboutPushCompletion ) because this stage has been removed from runningStages.
   
   Then this stage is resubmitted, but speculative tasks have registered map output and there are no missing tasks to compute, resubmitting stages will also not trigger shuffleMergeFinalized. Eventually this stage‘s _shuffleMergedFinalized keeps false.
   
   Then AQE will submit next stages which are dependent on  this shuffleMapStage occurring fetchFailed. And in getMissingParentStages, this stage will be marked as missing and will be resubmitted, but next stages are added to waitingStages after this stage being finished, so next stages will not be submitted even though this stage's resubmitting has been finished.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   This extreme case is very difficult to construct, and we added logs to our production environment to capture the number of problems and verify the stability of the job. I am happy to provide a timeline of the various events in which the problem arose。
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on pull request #40393: []SPARK-40082]

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1467339346

   > @Stove-hust Thank you for reporting and the patch. Would you be able to share driver logs?
   
   sure.
   `# stage 10 faield 
   22/10/15 10:55:58 WARN task-result-getter-1 TaskSetManager: Lost task 435.1 in stage 10.0 (TID 6822, zw02-data-hdp-dn21102.mt, executor 101): FetchFailed(null, shuffleId=3, mapIndex=-1, mapId=-1, reduceId=435, message=
   org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 3 partition 435
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: ShuffleMapStage 10 (processCmd at CliDriver.java:386) failed in 601.792 s due to org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 3 partition 435
   
   # resubmit stage 10 && parentStage 9
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Resubmitting ShuffleMapStage 9 (processCmd at CliDriver.java:386) and ShuffleMapStage 10 (processCmd at CliDriver.java:386) due to fetch failure
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Resubmitting failed stages
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Submitting ShuffleMapStage 9 (MapPartitionsRDD[22] at processCmd at CliDriver.java:386), which has no missing parents
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Push-based shuffle disabled for ShuffleMapStage 9 (processCmd at CliDriver.java:386) since it is already shuffle merge finalized
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 9 (MapPartitionsRDD[22] at processCmd at CliDriver.java:386) (first 15 tasks are for partitions Vector(98, 372, 690))
   22/10/15 10:55:58 INFO dag-scheduler-event-loop YarnClusterScheduler: Adding task set 9.1 with 3 tasks
   
   # The first stage10 task completes one after another, and notifyDriverAboutPushCompletion to end stage 10, and mark finalizeTask, because the stage is not in runningStages, so the stage cannot be marked shuffleMergeFinalized.
   22/10/15 10:55:58 INFO task-result-getter-0 TaskSetManager: Finished task 325.0 in stage 10.0 (TID 6166) in 154455 ms on zw02-data-hdp-dn25537.mt (executor 117) (494/500)
   22/10/15 10:55:59 WARN task-result-getter-1 TaskSetManager: Lost task 325.1 in stage 10.0 (TID 6671, zw02-data-hdp-dn23160.mt, executor 47): TaskKilled (another attempt succeeded)
   22/10/15 10:56:20 WARN task-result-getter-1 TaskSetManager: Lost task 358.1 in stage 10.0 (TID 6731, zw02-data-hdp-dn25537.mt, executor 95): TaskKilled (another attempt succeeded)
   22/10/15 10:56:20 INFO task-result-getter-1 TaskSetManager: Task 358.1 in stage 10.0 (TID 6731) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).
   
   # Removed TaskSet 10.0, whose tasks have all completed
   22/10/15 10:56:22 INFO task-result-getter-1 TaskSetManager: Ignoring task-finished event for 435.0 in stage 10.0 because task 435 has already completed successfully
   22/10/15 10:56:22 INFO task-result-getter-1 YarnClusterScheduler: Removed TaskSet 10.0, whose tasks have all completed, from pool 
   
   # notifyDriverAboutPushCompletion stage 10
   22/10/15 10:56:23 INFO dag-scheduler-event-loop DAGScheduler: ShuffleMapStage 10 (processCmd at CliDriver.java:386) scheduled for finalizing shuffle merge in 0 s
   22/10/15 10:56:23 INFO shuffle-merge-finalizer-2 DAGScheduler: ShuffleMapStage 10 (processCmd at CliDriver.java:386) finalizing the shuffle merge with registering merge results set to true
   
   # stage 9 finished 
   22/10/15 10:57:51 INFO task-result-getter-1 TaskSetManager: Finished task 2.0 in stage 9.1 (TID 6825) in 112825 ms on zw02-data-hdp-dn25559.mt (executor 74) (3/3)
   22/10/15 10:57:51 INFO task-result-getter-1 YarnClusterScheduler: Removed TaskSet 9.1, whose tasks have all completed, from pool 
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: ShuffleMapStage 9 (processCmd at CliDriver.java:386) finished in 112.832 s
   
   # resubmit stage 10
   2/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: looking for newly runnable stages
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: running: Set(ShuffleMapStage 11, ShuffleMapStage 8)
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: waiting: Set(ShuffleMapStage 12, ShuffleMapStage 10)
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: failed: Set()
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: Submitting ShuffleMapStage 10 (MapPartitionsRDD[36] at processCmd at CliDriver.java:386), which has no missing parents
   22/10/15 10:57:51 INFO dag-scheduler-event-loop OutputCommitCoordinator: Reusing state from previous attempt of stage 10.
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: Shuffle merge enabled before starting the stage for ShuffleMapStage 10 with shuffle 7 and shuffle merge 0 with 108 merger locations
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: Submitting 4 missing tasks from ShuffleMapStage 10 (MapPartitionsRDD[36] at processCmd at CliDriver.java:386) (first 15 tasks are for partitions Vector(105, 288, 447, 481))
   22/10/15 10:57:51 INFO dag-scheduler-event-loop YarnClusterScheduler: Adding task set 10.1 with 4 tasks
   
   # stage 10 can not finished
   22/10/15 10:58:18 INFO task-result-getter-1 TaskSetManager: Finished task 2.0 in stage 10.1 (TID 6857) in 26644 ms on zw02-data-hdp-dn23767.mt (executor 139) (1/4)
   22/10/15 10:58:24 INFO task-result-getter-1 TaskSetManager: Finished task 3.0 in stage 10.1 (TID 6860) in 32551 ms on zw02-data-hdp-dn23729.mt (executor 42) (2/4)
   22/10/15 10:58:47 INFO task-result-getter-1 TaskSetManager: Finished task 0.0 in stage 10.1 (TID 6858) in 55524 ms on zw02-data-hdp-dn20640.mt (executor 134) (3/4)
   22/10/15 10:58:58 INFO task-result-getter-0 TaskSetManager: Finished task 1.0 in stage 10.1 (TID 6859) in 66911 ms on zw02-data-hdp-dn25862.mt (executor 57) (4/4)
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #40393:
URL: https://github.com/apache/spark/pull/40393#discussion_r1142572022


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4595,6 +4595,184 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     }
   }
 
+  test("SPARK-40082: recomputation of shuffle map stage with no pending partitions" +
+    "should not hang") {
+
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3"))
+
+    val rddA = new MyRDD(sc, 2, Nil)
+    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+    val shuffleIdA = shuffleDepA.shuffleId
+
+    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
+    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
+
+    submit(rddC, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(0, 0, 2, Seq("hostA", "hostA"))
+
+    // Fetch failed
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostC"), shuffleIdA, 0L, 0, 0,
+        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
+      result = null))
+
+    // long running task complete
+    completeShuffleMapStageSuccessfully(1, 0, 2, Seq("hostA", "hostA"))
+    assert(!shuffleDepB.shuffleMergeFinalized)
+
+    // stage1`s tasks have all completed
+    val shuffleStage1 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.pendingPartitions.isEmpty)
+
+    // resubmit
+    scheduler.resubmitFailedStages()
+
+    // complete parentStage0
+    completeShuffleMapStageSuccessfully(0, 0, 2, Seq("hostA", "hostA"))
+
+    // stage1 should be shuffleMergeFinalized
+    assert(shuffleDepB.shuffleMergeFinalized)
+  }
+
+  for (pushBasedShuffleEnabled <- Seq(true, false)) {
+    test("SPARK-40082: recomputation of shuffle map stage with no pending partitions should not " +
+      s"hang. pushBasedShuffleEnabled = $pushBasedShuffleEnabled") {

Review Comment:
   Good question - I have not tried that :-)
   Does specifying pushBasedShuffleEnabled = true in that string work ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "otterc (via GitHub)" <gi...@apache.org>.
otterc commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1474477575

   One of the problems that I see is that a successful completion of speculative task will not trigger shuffle merge finalization of   a stage that was marked failed but doesn't have any pending partitions. I think we need to address this as well with the fix.
   cc. @Stove-hust @mridulm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1474162354

   > So this is an interesting coincidence, I literally encountered a production job which seems to be hitting this exact same issue :-) I was in the process of creating a test case, but my intuition was along the same lines as this PR.
   > 
   > Can you create a test case to validate this behavior @Stove-hust ? Essentially it should fail with current master, and succeed after this change.
   > 
   > Thanks for working on this fix
   
   Added UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1478833621

   Merged to master.
   Thanks for working on this @Stove-hust !
   Thanks for the review @otterc :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1474918516

   > Instead of only testing specifically for the flag - which is subject to change as the implementation evolves, we should also test for behavior here.
   > 
   > This is the reproducible test I was using (with some changes) to test approaches for this bug - and it mimics the case I saw in our production reasonably well. (In DAGSchedulerSuite):
   > 
   > ```
   >   for (pushBasedShuffleEnabled <- Seq(true, false)) {
   >     test("SPARK-40082: recomputation of shuffle map stage with no pending partitions should not " +
   >         s"hang. pushBasedShuffleEnabled = $pushBasedShuffleEnabled") {
   > 
   >       if (pushBasedShuffleEnabled) {
   >         initPushBasedShuffleConfs(conf)
   >         DAGSchedulerSuite.clearMergerLocs()
   >         DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
   >       }
   > 
   >       var taskIdCount = 0
   > 
   >       var completedStage: List[Int] = Nil
   >       val listener = new SparkListener() {
   >         override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
   >           completedStage = completedStage :+ event.stageInfo.stageId
   >         }
   >       }
   >       sc.addSparkListener(listener)
   > 
   >       val fetchFailParentPartition = 0
   > 
   >       val shuffleMapRdd0 = new MyRDD(sc, 2, Nil)
   >       val shuffleDep0 = new ShuffleDependency(shuffleMapRdd0, new HashPartitioner(2))
   > 
   >       val shuffleMapRdd1 = new MyRDD(sc, 2, List(shuffleDep0), tracker = mapOutputTracker)
   >       val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2))
   > 
   >       val reduceRdd = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)
   > 
   >       // submit the initial mapper stage, generate shuffle output for first reducer stage.
   >       submitMapStage(shuffleDep0)
   > 
   >       // Map stage completes successfully,
   >       completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostB"))
   >       taskIdCount += 2
   >       assert(completedStage === List(0))
   > 
   >       // Now submit the first reducer stage
   >       submitMapStage(shuffleDep1)
   > 
   >       def createTaskInfo(speculative: Boolean): TaskInfo = {
   >         val taskInfo = new TaskInfo(
   >           taskId = taskIdCount,
   >           index = 0,
   >           attemptNumber = 0,
   >           partitionId = 0,
   >           launchTime = 0L,
   >           executorId = "",
   >           host = "hostC",
   >           TaskLocality.ANY,
   >           speculative = speculative)
   >         taskIdCount += 1
   >         taskInfo
   >       }
   > 
   >       val normalTask = createTaskInfo(speculative = false);
   >       val speculativeTask = createTaskInfo(speculative = true)
   > 
   >       // fail task 1.0 due to FetchFailed, and make 1.1 succeed.
   >       runEvent(makeCompletionEvent(taskSets(1).tasks(0),
   >         FetchFailed(makeBlockManagerId("hostA"), shuffleDep0.shuffleId, normalTask.taskId,
   >           fetchFailParentPartition, normalTask.index, "ignored"),
   >         result = null,
   >         Seq.empty,
   >         Array.empty,
   >         normalTask))
   > 
   >       // Make the speculative task succeed after initial task has failed
   >       runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success,
   >         result = MapStatus(BlockManagerId("hostD-exec1", "hostD", 34512),
   >           Array.fill[Long](2)(2), mapTaskId = speculativeTask.taskId),
   >         taskInfo = speculativeTask))
   > 
   >       // The second task, for partition 1 succeeds as well.
   >       runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success,
   >         result = MapStatus(BlockManagerId("hostE-exec2", "hostE", 23456),
   >           Array.fill[Long](2)(2), mapTaskId = taskIdCount),
   >       ))
   >       taskIdCount += 1
   > 
   >       sc.listenerBus.waitUntilEmpty()
   >       assert(completedStage === List(0, 2))
   > 
   >       // the stages will now get resubmitted due to the failure
   >       Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
   > 
   >       // parent map stage resubmitted
   >       assert(scheduler.runningStages.size === 1)
   >       val mapStage = scheduler.runningStages.head
   > 
   >       // Stage 1 is same as Stage 0 - but created for the ShuffleMapTask 2, as it is a
   >       // different job
   >       assert(mapStage.id === 1)
   >       assert(mapStage.latestInfo.failureReason.isEmpty)
   >       // only the partition reported in fetch failure is resubmitted
   >       assert(mapStage.latestInfo.numTasks === 1)
   > 
   >       val stage0Retry = taskSets.filter(_.stageId == 1)
   >       assert(stage0Retry.size === 1)
   >       // make the original task succeed
   >       runEvent(makeCompletionEvent(stage0Retry.head.tasks(fetchFailParentPartition), Success,
   >         result = MapStatus(BlockManagerId("hostF-exec1", "hostF", 12345),
   >           Array.fill[Long](2)(2), mapTaskId = taskIdCount)))
   >       Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
   > 
   >       // The retries should succeed
   >       sc.listenerBus.waitUntilEmpty()
   >       assert(completedStage === List(0, 2, 1, 2))
   > 
   >       // Now submit the entire dag again
   >       // This will add 3 new stages.
   >       submit(reduceRdd, Array(0, 1))
   >       Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
   > 
   >       // Only the last stage needs to execute, and those tasks - so completed stages should not
   >       // change.
   >       sc.listenerBus.waitUntilEmpty()
   > 
   >       assert(completedStage === List(0, 2, 1, 2))
   > 
   >       // All other stages should be done, and only the final stage should be waiting
   >       assert(scheduler.runningStages.size === 1)
   >       assert(scheduler.runningStages.head.id === 5)
   >       assert(taskSets.count(_.stageId == 5) === 1)
   > 
   >       complete(taskSets.filter(_.stageId == 5).head, Seq((Success, 1), (Success, 2)))
   > 
   >       sc.listenerBus.waitUntilEmpty()
   >       assert(completedStage === List(0, 2, 1, 2, 5))
   >     }
   >   }
   > ```
   > 
   > Would be good to adapt/clean it up for your PR, in addition to the existing test - so that the observed bug does not recur.
   > 
   > (Good news is, this PR works against it :-) )
   
   Thank you for your advice on the UT I wrote, it was very important to me. I will delete my UT. thanks again very much


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #40393:
URL: https://github.com/apache/spark/pull/40393#discussion_r1141789869


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4595,6 +4595,183 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     }
   }
 
+  test("SPARK-40082: recomputation of shuffle map stage with no pending partitions should not hang") {

Review Comment:
   This line is exceeding 100 chars, and is probably responsible for the build failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a diff in pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "otterc (via GitHub)" <gi...@apache.org>.
otterc commented on code in PR #40393:
URL: https://github.com/apache/spark/pull/40393#discussion_r1142475865


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4595,6 +4595,184 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     }
   }
 
+  test("SPARK-40082: recomputation of shuffle map stage with no pending partitions" +
+    "should not hang") {
+
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3"))
+
+    val rddA = new MyRDD(sc, 2, Nil)
+    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+    val shuffleIdA = shuffleDepA.shuffleId
+
+    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
+    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
+
+    submit(rddC, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(0, 0, 2, Seq("hostA", "hostA"))
+
+    // Fetch failed
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostC"), shuffleIdA, 0L, 0, 0,
+        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
+      result = null))
+
+    // long running task complete
+    completeShuffleMapStageSuccessfully(1, 0, 2, Seq("hostA", "hostA"))
+    assert(!shuffleDepB.shuffleMergeFinalized)
+
+    // stage1`s tasks have all completed
+    val shuffleStage1 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.pendingPartitions.isEmpty)
+
+    // resubmit
+    scheduler.resubmitFailedStages()
+
+    // complete parentStage0
+    completeShuffleMapStageSuccessfully(0, 0, 2, Seq("hostA", "hostA"))
+
+    // stage1 should be shuffleMergeFinalized
+    assert(shuffleDepB.shuffleMergeFinalized)
+  }
+
+  for (pushBasedShuffleEnabled <- Seq(true, false)) {
+    test("SPARK-40082: recomputation of shuffle map stage with no pending partitions should not " +
+      s"hang. pushBasedShuffleEnabled = $pushBasedShuffleEnabled") {

Review Comment:
   How do we run just the individual tests? 



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4595,6 +4595,184 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     }
   }
 
+  test("SPARK-40082: recomputation of shuffle map stage with no pending partitions" +
+    "should not hang") {

Review Comment:
   nit: missing ` ` between `partitions` and `should`.
   Since this test is checking that the stage gets finalized, we should change the name to "SPARK-40082: re-computation of shuffle map stage with no pending partitions should finalize the stage"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #40393: []SPARK-40082]

Posted by "otterc (via GitHub)" <gi...@apache.org>.
otterc commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1467314593

   @Stove-hust Thank you for reporting and the patch. Would you be able to share driver logs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #40393: []SPARK-40082]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1467025042

   +CC @otterc 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "otterc (via GitHub)" <gi...@apache.org>.
otterc commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1470501033

   @akpatnam25 @shuwang21


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1478924572

   @mridulm 
   yep,it`s me
   Username: StoveM
   Full name: Fencheng Mei


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1469402096

   > @Stove-hust Haven't had a chance to look at it yet. I'll take a look at it this week.
   
   tks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1475528785

   > @Stove-hust To clarify - I meant add this as well (after you had a chance to look at it and clean it up if required - this was from my test setup). We should keep the UT you had added - and it is important to test the specific code expectation as it stands today.
   
   Sorry, I misunderstood what you meant。😂
   I think the UT  written by you is great, can I write your UT in my PR, I will mark this part of UT written by you。
   I have one more question, so for this PR we will have two UT, is that right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "otterc (via GitHub)" <gi...@apache.org>.
otterc commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1474749275

   @Stove-hust The main change in `DAGScheduler` looks good to me. Basically, [here](https://github.com/apache/spark/blob/11c9838283e98d5ebe6ce13b85e26217494feef2/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L762) we also check whether the parent stage is finalized and if it is not we submit that. The reason the parent stage is not getting finalized here is because it has no tasks. 
   Will review the UT and take another look at the code next week. Thanks for addressing this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "otterc (via GitHub)" <gi...@apache.org>.
otterc commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1469293356

   @Stove-hust  Haven't had a chance to look at it yet. I'll take a look at it this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1478833979

   I could not cherry pick this into 3.4 and 3.3 - we should fix for those branches as well IMO.
   Can you create a PR against those two branches as well @Stove-hust ? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #40393:
URL: https://github.com/apache/spark/pull/40393#discussion_r1142362530


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4595,6 +4595,184 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     }
   }
 
+  test("SPARK-40082: recomputation of shuffle map stage with no pending partitions" +
+    "should not hang") {
+
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3"))
+
+    val rddA = new MyRDD(sc, 2, Nil)
+    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+    val shuffleIdA = shuffleDepA.shuffleId
+
+    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
+    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
+
+    submit(rddC, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(0, 0, 2, Seq("hostA", "hostA"))

Review Comment:
   nit: For consistency, let us make it `hostA` and `hostB` ... and have `hostA` (say) fail.



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4595,6 +4595,184 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     }
   }
 
+  test("SPARK-40082: recomputation of shuffle map stage with no pending partitions" +
+    "should not hang") {
+
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3"))
+
+    val rddA = new MyRDD(sc, 2, Nil)
+    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+    val shuffleIdA = shuffleDepA.shuffleId
+
+    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
+    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
+
+    submit(rddC, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(0, 0, 2, Seq("hostA", "hostA"))
+
+    // Fetch failed
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostC"), shuffleIdA, 0L, 0, 0,

Review Comment:
   nit: Since map tasks did not run on `hostC`, let us change it to `hostA` as per comment above.



##########
pom.xml:
##########
@@ -114,7 +114,7 @@
     <java.version>1.8</java.version>
     <maven.compiler.source>${java.version}</maven.compiler.source>
     <maven.compiler.target>${java.version}</maven.compiler.target>
-    <maven.version>3.8.7</maven.version>
+    <maven.version>3.6.3</maven.version>

Review Comment:
   Revert this ?



##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4595,6 +4595,184 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     }
   }
 
+  test("SPARK-40082: recomputation of shuffle map stage with no pending partitions" +
+    "should not hang") {
+
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3"))
+
+    val rddA = new MyRDD(sc, 2, Nil)
+    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+    val shuffleIdA = shuffleDepA.shuffleId
+
+    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
+    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
+
+    submit(rddC, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(0, 0, 2, Seq("hostA", "hostA"))
+
+    // Fetch failed
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostC"), shuffleIdA, 0L, 0, 0,
+        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
+      result = null))
+
+    // long running task complete
+    completeShuffleMapStageSuccessfully(1, 0, 2, Seq("hostA", "hostA"))

Review Comment:
   nit: let us change the host to `hostB` for successes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1478905577

   Is your jira id `StoveM` @Stove-hust  ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1474986959

   @Stove-hust To clarify - I meant add this as well.
   We should keep the UT you had added - and it is important to test the specific expectation as it stands today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1474782673

   Instead of testing specifically for the flag - which is subject to change as the implementation evolves, we should test for behavior here.
   
   This is the reproducible test I was using (with some changes) to test approaches for this bug:
   
   ```
     for (pushBasedShuffleEnabled <- Seq(true, false)) {
       test("SPARK-40082: recomputation of shuffle map stage with no pending partitions should not " +
           s"hang. pushBasedShuffleEnabled = $pushBasedShuffleEnabled") {
   
         if (pushBasedShuffleEnabled) {
           initPushBasedShuffleConfs(conf)
           DAGSchedulerSuite.clearMergerLocs()
           DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
         }
   
         var taskIdCount = 0
   
         var completedStage: List[Int] = Nil
         val listener = new SparkListener() {
           override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
             completedStage = completedStage :+ event.stageInfo.stageId
           }
         }
         sc.addSparkListener(listener)
   
         val fetchFailParentPartition = 0
   
         val shuffleMapRdd0 = new MyRDD(sc, 2, Nil)
         val shuffleDep0 = new ShuffleDependency(shuffleMapRdd0, new HashPartitioner(2))
   
         val shuffleMapRdd1 = new MyRDD(sc, 2, List(shuffleDep0), tracker = mapOutputTracker)
         val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2))
   
         val reduceRdd = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)
   
         // submit the initial mapper stage, generate shuffle output for first reducer stage.
         submitMapStage(shuffleDep0)
   
         // Map stage completes successfully,
         completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostB"))
         taskIdCount += 2
         assert(completedStage === List(0))
   
         // Now submit the first reducer stage
         submitMapStage(shuffleDep1)
   
         def createTaskInfo(speculative: Boolean): TaskInfo = {
           val taskInfo = new TaskInfo(
             taskId = taskIdCount,
             index = 0,
             attemptNumber = 0,
             partitionId = 0,
             launchTime = 0L,
             executorId = "",
             host = "hostC",
             TaskLocality.ANY,
             speculative = speculative)
           taskIdCount += 1
           taskInfo
         }
   
         val normalTask = createTaskInfo(speculative = false);
         val speculativeTask = createTaskInfo(speculative = true)
   
         // fail task 1.0 due to FetchFailed, and make 1.1 succeed.
         runEvent(makeCompletionEvent(taskSets(1).tasks(0),
           FetchFailed(makeBlockManagerId("hostA"), shuffleDep0.shuffleId, normalTask.taskId,
             fetchFailParentPartition, normalTask.index, "ignored"),
           result = null,
           Seq.empty,
           Array.empty,
           normalTask))
   
         // Make the speculative task succeed after initial task has failed
         runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success,
           result = MapStatus(BlockManagerId("hostD-exec1", "hostD", 34512),
             Array.fill[Long](2)(2), mapTaskId = speculativeTask.taskId),
           taskInfo = speculativeTask))
   
         // The second task, for partition 1 succeeds as well.
         runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success,
           result = MapStatus(BlockManagerId("hostE-exec2", "hostE", 23456),
             Array.fill[Long](2)(2), mapTaskId = taskIdCount),
         ))
         taskIdCount += 1
   
         sc.listenerBus.waitUntilEmpty()
         assert(completedStage === List(0, 2))
   
         // the stages will now get resubmitted due to the failure
         Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
   
         // parent map stage resubmitted
         assert(scheduler.runningStages.size === 1)
         val mapStage = scheduler.runningStages.head
   
         // Stage 1 is same as Stage 0 - but created for the ShuffleMapTask 2, as it is a
         // different job
         assert(mapStage.id === 1)
         assert(mapStage.latestInfo.failureReason.isEmpty)
         // only the partition reported in fetch failure is resubmitted
         assert(mapStage.latestInfo.numTasks === 1)
   
         val stage0Retry = taskSets.filter(_.stageId == 1)
         assert(stage0Retry.size === 1)
         // make the original task succeed
         runEvent(makeCompletionEvent(stage0Retry.head.tasks(fetchFailParentPartition), Success,
           result = MapStatus(BlockManagerId("hostF-exec1", "hostF", 12345),
             Array.fill[Long](2)(2), mapTaskId = taskIdCount)))
         Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
   
         // The retries should succeed
         sc.listenerBus.waitUntilEmpty()
         assert(completedStage === List(0, 2, 1, 2))
   
         // Now submit the entire dag again
         // This will add 3 new stages.
         submit(reduceRdd, Array(0, 1))
         Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
   
         // Only the last stage needs to execute, and those tasks - so completed stages should not
         // change.
         sc.listenerBus.waitUntilEmpty()
   
         assert(completedStage === List(0, 2, 1, 2))
   
         // All other stages should be done, and only the final stage should be waiting
         assert(scheduler.runningStages.size === 1)
         assert(scheduler.runningStages.head.id === 5)
         assert(taskSets.count(_.stageId == 5) === 1)
   
         complete(taskSets.filter(_.stageId == 5).head, Seq((Success, 1), (Success, 2)))
   
         sc.listenerBus.waitUntilEmpty()
         assert(completedStage === List(0, 2, 1, 2, 5))
       }
     }
   ```
   
   Would be good to adapt/clean it up for your PR - so that the observed bug does not recur.
   
   (Good news is, this PR works against it :-) )
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on a diff in pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on code in PR #40393:
URL: https://github.com/apache/spark/pull/40393#discussion_r1143279779


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4595,6 +4595,184 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     }
   }
 
+  test("SPARK-40082: recomputation of shuffle map stage with no pending partitions" +
+    "should not hang") {
+
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3"))
+
+    val rddA = new MyRDD(sc, 2, Nil)
+    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+    val shuffleIdA = shuffleDepA.shuffleId
+
+    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
+    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
+
+    submit(rddC, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(0, 0, 2, Seq("hostA", "hostA"))
+
+    // Fetch failed
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostC"), shuffleIdA, 0L, 0, 0,
+        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
+      result = null))
+
+    // long running task complete
+    completeShuffleMapStageSuccessfully(1, 0, 2, Seq("hostA", "hostA"))
+    assert(!shuffleDepB.shuffleMergeFinalized)
+
+    // stage1`s tasks have all completed
+    val shuffleStage1 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.pendingPartitions.isEmpty)
+
+    // resubmit
+    scheduler.resubmitFailedStages()
+
+    // complete parentStage0
+    completeShuffleMapStageSuccessfully(0, 0, 2, Seq("hostA", "hostA"))
+
+    // stage1 should be shuffleMergeFinalized
+    assert(shuffleDepB.shuffleMergeFinalized)
+  }
+
+  for (pushBasedShuffleEnabled <- Seq(true, false)) {
+    test("SPARK-40082: recomputation of shuffle map stage with no pending partitions should not " +
+      s"hang. pushBasedShuffleEnabled = $pushBasedShuffleEnabled") {

Review Comment:
   emmm,I have tested it and it works



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on pull request #40393: []SPARK-40082]

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1467340408

   > @Stove-hust Thank you for reporting and the patch. Would you be able to share driver logs?
   
   Sure(Add some comments)
   --- stage 10 faield 
   22/10/15 10:55:58 WARN task-result-getter-1 TaskSetManager: Lost task 435.1 in stage 10.0 (TID 6822, zw02-data-hdp-dn21102.mt, executor 101): FetchFailed(null, shuffleId=3, mapIndex=-1, mapId=-1, reduceId=435, message=
   org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 3 partition 435
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: ShuffleMapStage 10 (processCmd at CliDriver.java:386) failed in 601.792 s due to org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 3 partition 435
   
   -- resubmit stage 10 && parentStage 9
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Resubmitting ShuffleMapStage 9 (processCmd at CliDriver.java:386) and ShuffleMapStage 10 (processCmd at CliDriver.java:386) due to fetch failure
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Resubmitting failed stages
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Submitting ShuffleMapStage 9 (MapPartitionsRDD[22] at processCmd at CliDriver.java:386), which has no missing parents
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Push-based shuffle disabled for ShuffleMapStage 9 (processCmd at CliDriver.java:386) since it is already shuffle merge finalized
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 9 (MapPartitionsRDD[22] at processCmd at CliDriver.java:386) (first 15 tasks are for partitions Vector(98, 372, 690))
   22/10/15 10:55:58 INFO dag-scheduler-event-loop YarnClusterScheduler: Adding task set 9.1 with 3 tasks
   
   -- The first stage10 task completes one after another, and notifyDriverAboutPushCompletion to end stage 10, and mark finalizeTask, because the stage is not in runningStages, so the stage cannot be marked shuffleMergeFinalized.
   22/10/15 10:55:58 INFO task-result-getter-0 TaskSetManager: Finished task 325.0 in stage 10.0 (TID 6166) in 154455 ms on zw02-data-hdp-dn25537.mt (executor 117) (494/500)
   22/10/15 10:55:59 WARN task-result-getter-1 TaskSetManager: Lost task 325.1 in stage 10.0 (TID 6671, zw02-data-hdp-dn23160.mt, executor 47): TaskKilled (another attempt succeeded)
   22/10/15 10:56:20 WARN task-result-getter-1 TaskSetManager: Lost task 358.1 in stage 10.0 (TID 6731, zw02-data-hdp-dn25537.mt, executor 95): TaskKilled (another attempt succeeded)
   22/10/15 10:56:20 INFO task-result-getter-1 TaskSetManager: Task 358.1 in stage 10.0 (TID 6731) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).
   
   --- Removed TaskSet 10.0, whose tasks have all completed
   22/10/15 10:56:22 INFO task-result-getter-1 TaskSetManager: Ignoring task-finished event for 435.0 in stage 10.0 because task 435 has already completed successfully
   22/10/15 10:56:22 INFO task-result-getter-1 YarnClusterScheduler: Removed TaskSet 10.0, whose tasks have all completed, from pool 
   
   --- notifyDriverAboutPushCompletion stage 10
   22/10/15 10:56:23 INFO dag-scheduler-event-loop DAGScheduler: ShuffleMapStage 10 (processCmd at CliDriver.java:386) scheduled for finalizing shuffle merge in 0 s
   22/10/15 10:56:23 INFO shuffle-merge-finalizer-2 DAGScheduler: ShuffleMapStage 10 (processCmd at CliDriver.java:386) finalizing the shuffle merge with registering merge results set to true
   
   --- stage 9 finished 
   22/10/15 10:57:51 INFO task-result-getter-1 TaskSetManager: Finished task 2.0 in stage 9.1 (TID 6825) in 112825 ms on zw02-data-hdp-dn25559.mt (executor 74) (3/3)
   22/10/15 10:57:51 INFO task-result-getter-1 YarnClusterScheduler: Removed TaskSet 9.1, whose tasks have all completed, from pool 
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: ShuffleMapStage 9 (processCmd at CliDriver.java:386) finished in 112.832 s
   
   --- resubmit stage 10
   2/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: looking for newly runnable stages
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: running: Set(ShuffleMapStage 11, ShuffleMapStage 8)
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: waiting: Set(ShuffleMapStage 12, ShuffleMapStage 10)
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: failed: Set()
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: Submitting ShuffleMapStage 10 (MapPartitionsRDD[36] at processCmd at CliDriver.java:386), which has no missing parents
   22/10/15 10:57:51 INFO dag-scheduler-event-loop OutputCommitCoordinator: Reusing state from previous attempt of stage 10.
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: Shuffle merge enabled before starting the stage for ShuffleMapStage 10 with shuffle 7 and shuffle merge 0 with 108 merger locations
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: Submitting 4 missing tasks from ShuffleMapStage 10 (MapPartitionsRDD[36] at processCmd at CliDriver.java:386) (first 15 tasks are for partitions Vector(105, 288, 447, 481))
   22/10/15 10:57:51 INFO dag-scheduler-event-loop YarnClusterScheduler: Adding task set 10.1 with 4 tasks
   
   --- stage 10 can not finished
   22/10/15 10:58:18 INFO task-result-getter-1 TaskSetManager: Finished task 2.0 in stage 10.1 (TID 6857) in 26644 ms on zw02-data-hdp-dn23767.mt (executor 139) (1/4)
   22/10/15 10:58:24 INFO task-result-getter-1 TaskSetManager: Finished task 3.0 in stage 10.1 (TID 6860) in 32551 ms on zw02-data-hdp-dn23729.mt (executor 42) (2/4)
   22/10/15 10:58:47 INFO task-result-getter-1 TaskSetManager: Finished task 0.0 in stage 10.1 (TID 6858) in 55524 ms on zw02-data-hdp-dn20640.mt (executor 134) (3/4)
   22/10/15 10:58:58 INFO task-result-getter-0 TaskSetManager: Finished task 1.0 in stage 10.1 (TID 6859) in 66911 ms on zw02-data-hdp-dn25862.mt (executor 57) (4/4)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1473171295

   So this is an interesting coincidence, I literally encountered a production job which seems to be hitting this exact same issue :-)
   I was in the process of creating a test case, but my intuition was along the same lines as this PR.
   
   Can you create a test case to validate this behavior @Stove-hust ?
   Essentially it should fail with current master, and succeed after this change.
   
   Thanks for working on this fix


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1469222238

   @otterc Hello, is there anything else I should add?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1475695975

   > Technically, 3 :-) The UT that I added will generate 2 tests - one for push based shuffle and one without. And we have the initial test you added.
   > 
   > You dont need to mark it as written by me ! We can include it in your PR - with any changes you make as part of the adding it.
   
   Thanks for your answer, I have added all three UTs (including you wrote)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1478924279

   > 
   
   yep,it`s me 
   Username:	StoveM
   Full name:	Fencheng Mei


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #40393:
URL: https://github.com/apache/spark/pull/40393#discussion_r1142572022


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4595,6 +4595,184 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     }
   }
 
+  test("SPARK-40082: recomputation of shuffle map stage with no pending partitions" +
+    "should not hang") {
+
+    initPushBasedShuffleConfs(conf)
+    conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 3)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3"))
+
+    val rddA = new MyRDD(sc, 2, Nil)
+    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+    val shuffleIdA = shuffleDepA.shuffleId
+
+    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
+    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
+
+    submit(rddC, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(0, 0, 2, Seq("hostA", "hostA"))
+
+    // Fetch failed
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostC"), shuffleIdA, 0L, 0, 0,
+        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
+      result = null))
+
+    // long running task complete
+    completeShuffleMapStageSuccessfully(1, 0, 2, Seq("hostA", "hostA"))
+    assert(!shuffleDepB.shuffleMergeFinalized)
+
+    // stage1`s tasks have all completed
+    val shuffleStage1 = scheduler.stageIdToStage(1).asInstanceOf[ShuffleMapStage]
+    assert(shuffleStage1.pendingPartitions.isEmpty)
+
+    // resubmit
+    scheduler.resubmitFailedStages()
+
+    // complete parentStage0
+    completeShuffleMapStageSuccessfully(0, 0, 2, Seq("hostA", "hostA"))
+
+    // stage1 should be shuffleMergeFinalized
+    assert(shuffleDepB.shuffleMergeFinalized)
+  }
+
+  for (pushBasedShuffleEnabled <- Seq(true, false)) {
+    test("SPARK-40082: recomputation of shuffle map stage with no pending partitions should not " +
+      s"hang. pushBasedShuffleEnabled = $pushBasedShuffleEnabled") {

Review Comment:
   Good question - I have not tried that :-) This is a pattern used for other tests as well when we want to do a config sweep.
   Does specifying pushBasedShuffleEnabled = true in that string work ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1475538274

   Technically, 3 :-)
   The UT that I added will generate 2 tests - one for push based shuffle and one without.
   And we have the initial test you added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on pull request #40393: []SPARK-40082]

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1467339828

   > @Stove-hust Thank you for reporting and the patch. Would you be able to share driver logs?
   
   **
   --- stage 10 faield 
   22/10/15 10:55:58 WARN task-result-getter-1 TaskSetManager: Lost task 435.1 in stage 10.0 (TID 6822, zw02-data-hdp-dn21102.mt, executor 101): FetchFailed(null, shuffleId=3, mapIndex=-1, mapId=-1, reduceId=435, message=
   org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 3 partition 435
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: ShuffleMapStage 10 (processCmd at CliDriver.java:386) failed in 601.792 s due to org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 3 partition 435
   
   --- resubmit stage 10 && parentStage 9
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Resubmitting ShuffleMapStage 9 (processCmd at CliDriver.java:386) and ShuffleMapStage 10 (processCmd at CliDriver.java:386) due to fetch failure
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Resubmitting failed stages
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Submitting ShuffleMapStage 9 (MapPartitionsRDD[22] at processCmd at CliDriver.java:386), which has no missing parents
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Push-based shuffle disabled for ShuffleMapStage 9 (processCmd at CliDriver.java:386) since it is already shuffle merge finalized
   22/10/15 10:55:58 INFO dag-scheduler-event-loop DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 9 (MapPartitionsRDD[22] at processCmd at CliDriver.java:386) (first 15 tasks are for partitions Vector(98, 372, 690))
   22/10/15 10:55:58 INFO dag-scheduler-event-loop YarnClusterScheduler: Adding task set 9.1 with 3 tasks
   
   --- The first stage10 task completes one after another, and notifyDriverAboutPushCompletion to end stage 10, and mark finalizeTask, because the stage is not in runningStages, so the stage cannot be marked shuffleMergeFinalized.
   22/10/15 10:55:58 INFO task-result-getter-0 TaskSetManager: Finished task 325.0 in stage 10.0 (TID 6166) in 154455 ms on zw02-data-hdp-dn25537.mt (executor 117) (494/500)
   22/10/15 10:55:59 WARN task-result-getter-1 TaskSetManager: Lost task 325.1 in stage 10.0 (TID 6671, zw02-data-hdp-dn23160.mt, executor 47): TaskKilled (another attempt succeeded)
   22/10/15 10:56:20 WARN task-result-getter-1 TaskSetManager: Lost task 358.1 in stage 10.0 (TID 6731, zw02-data-hdp-dn25537.mt, executor 95): TaskKilled (another attempt succeeded)
   22/10/15 10:56:20 INFO task-result-getter-1 TaskSetManager: Task 358.1 in stage 10.0 (TID 6731) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).
   
   --- Removed TaskSet 10.0, whose tasks have all completed
   22/10/15 10:56:22 INFO task-result-getter-1 TaskSetManager: Ignoring task-finished event for 435.0 in stage 10.0 because task 435 has already completed successfully
   22/10/15 10:56:22 INFO task-result-getter-1 YarnClusterScheduler: Removed TaskSet 10.0, whose tasks have all completed, from pool 
   
   --- notifyDriverAboutPushCompletion stage 10
   22/10/15 10:56:23 INFO dag-scheduler-event-loop DAGScheduler: ShuffleMapStage 10 (processCmd at CliDriver.java:386) scheduled for finalizing shuffle merge in 0 s
   22/10/15 10:56:23 INFO shuffle-merge-finalizer-2 DAGScheduler: ShuffleMapStage 10 (processCmd at CliDriver.java:386) finalizing the shuffle merge with registering merge results set to true
   
   --- stage 9 finished 
   22/10/15 10:57:51 INFO task-result-getter-1 TaskSetManager: Finished task 2.0 in stage 9.1 (TID 6825) in 112825 ms on zw02-data-hdp-dn25559.mt (executor 74) (3/3)
   22/10/15 10:57:51 INFO task-result-getter-1 YarnClusterScheduler: Removed TaskSet 9.1, whose tasks have all completed, from pool 
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: ShuffleMapStage 9 (processCmd at CliDriver.java:386) finished in 112.832 s
   
   --- resubmit stage 10
   2/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: looking for newly runnable stages
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: running: Set(ShuffleMapStage 11, ShuffleMapStage 8)
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: waiting: Set(ShuffleMapStage 12, ShuffleMapStage 10)
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: failed: Set()
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: Submitting ShuffleMapStage 10 (MapPartitionsRDD[36] at processCmd at CliDriver.java:386), which has no missing parents
   22/10/15 10:57:51 INFO dag-scheduler-event-loop OutputCommitCoordinator: Reusing state from previous attempt of stage 10.
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: Shuffle merge enabled before starting the stage for ShuffleMapStage 10 with shuffle 7 and shuffle merge 0 with 108 merger locations
   22/10/15 10:57:51 INFO dag-scheduler-event-loop DAGScheduler: Submitting 4 missing tasks from ShuffleMapStage 10 (MapPartitionsRDD[36] at processCmd at CliDriver.java:386) (first 15 tasks are for partitions Vector(105, 288, 447, 481))
   22/10/15 10:57:51 INFO dag-scheduler-event-loop YarnClusterScheduler: Adding task set 10.1 with 4 tasks
   
   --- stage 10 can not finished
   22/10/15 10:58:18 INFO task-result-getter-1 TaskSetManager: Finished task 2.0 in stage 10.1 (TID 6857) in 26644 ms on zw02-data-hdp-dn23767.mt (executor 139) (1/4)
   22/10/15 10:58:24 INFO task-result-getter-1 TaskSetManager: Finished task 3.0 in stage 10.1 (TID 6860) in 32551 ms on zw02-data-hdp-dn23729.mt (executor 42) (2/4)
   22/10/15 10:58:47 INFO task-result-getter-1 TaskSetManager: Finished task 0.0 in stage 10.1 (TID 6858) in 55524 ms on zw02-data-hdp-dn20640.mt (executor 134) (3/4)
   22/10/15 10:58:58 INFO task-result-getter-0 TaskSetManager: Finished task 1.0 in stage 10.1 (TID 6859) in 66911 ms on zw02-data-hdp-dn25862.mt (executor 57) (4/4)
   **


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on a diff in pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on code in PR #40393:
URL: https://github.com/apache/spark/pull/40393#discussion_r1142853303


##########
pom.xml:
##########
@@ -114,7 +114,7 @@
     <java.version>1.8</java.version>
     <maven.compiler.source>${java.version}</maven.compiler.source>
     <maven.compiler.target>${java.version}</maven.compiler.target>
-    <maven.version>3.8.7</maven.version>
+    <maven.version>3.6.3</maven.version>

Review Comment:
   😂 Forgot to change it back after local compilation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm closed pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm closed pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks
URL: https://github.com/apache/spark/pull/40393


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1478849048

   > I could not cherry pick this into 3.4 and 3.3 - we should fix for those branches as well IMO. Can you create a PR against those two branches as well @Stove-hust ? Thanks
   
   No problem


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1473303194

   > So this is an interesting coincidence, I literally encountered a production job which seems to be hitting this exact same issue :-) I was in the process of creating a test case, but my intuition was along the same lines as this PR.
   > 
   > Can you create a test case to validate this behavior @Stove-hust ? Essentially it should fail with current master, and succeed after this change.
   > 
   > Thanks for working on this fix
   
   No problem


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #40393:
URL: https://github.com/apache/spark/pull/40393#issuecomment-1476585572

   The test failure is unrelated to this PR - once the changes above are made, the reexecution should pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Stove-hust commented on a diff in pull request #40393: [SPARK-40082] Schedule mergeFinalize when push merge shuffleMapStage retry but no running tasks

Posted by "Stove-hust (via GitHub)" <gi...@apache.org>.
Stove-hust commented on code in PR #40393:
URL: https://github.com/apache/spark/pull/40393#discussion_r1141790998


##########
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##########
@@ -4595,6 +4595,183 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     }
   }
 
+  test("SPARK-40082: recomputation of shuffle map stage with no pending partitions should not hang") {

Review Comment:
   fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org