You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2020/03/03 15:27:34 UTC

[spark] branch master updated: [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d22498  [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks
2d22498 is described below

commit 2d22498d6a1f290f9ca54404a6e83ed4b61431d2
Author: xuesenliang <xu...@tencent.com>
AuthorDate: Tue Mar 3 09:27:07 2020 -0600

    [SPARK-30388][CORE] Mark running map stages of finished job as finished, and cancel running tasks
    
    ### What changes were proposed in this pull request?
    
    When a job finished, its running (re-submitted) map stages should be marked as finished if not used by other jobs. The running tasks of these stages are cancelled.
    
    And the ListenerBus should be notified too, otherwise, these map stage items will stay on the "Active Stages" page of web UI and never gone.
    
    For example:
    
    Suppose job 0 has two stages: map stage 0 and result stage 1. Map stage 0 has two partitions, and its result stage 1 has two partitions too.
    
    **Steps to reproduce the bug:**
    1. map stage 0:    start task 0(```TID 0```) and task 1 (```TID 1```), then both finished successfully.
    2. result stage 1:  start task 0(```TID 2```) and task 1 (```TID 3```)
    3. result stage 1:  task 0(```TID 2```) finished successfully
    4. result stage 1:  speculative task 1.1(```TID 4```) launched, but then failed due to FetchFailedException.
    5. driver re-submits map stage 0 and result stage 1.
    6. map stage 0 (retry 1): task0(```TID 5```) launched
    7. result stage 1: task 1(```TID 3```) finished successfully, so job 0 finished.
    8. map stage 0 is removed from ```runningStages``` and ```stageIdToStage```, because it doesn't belong to any job.
    ```
      private def DAGScheduler#cleanupStateForJobAndIndependentStages(job: ActiveJob): HashSet[Stage] = {
       ...
          stageIdToStage.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach {
            case (stageId, stage) =>
                ...
                def removeStage(stageId: Int): Unit = {
                  for (stage <- stageIdToStage.get(stageId)) {
                    if (runningStages.contains(stage)) {
                      logDebug("Removing running stage %d".format(stageId))
                      runningStages -= stage
                    }
                    ...
                  }
                  stageIdToStage -= stageId
                }
    
                jobSet -= job.jobId
                if (jobSet.isEmpty) { // no other job needs this stage
                  removeStage(stageId)
                }
              }
      ...
      }
    
    ```
    9. map stage 0 (retry 1): task0(TID 5) finished successfully, but its stage 0 is not in ```stageIdToStage```, so the stage not ```markStageAsFinished```
    ```
      private[scheduler] def DAGScheduler#handleTaskCompletion(event: CompletionEvent): Unit = {
        val task = event.task
        val stageId = task.stageId
        ...
        if (!stageIdToStage.contains(task.stageId)) {
          postTaskEnd(event)
          // Skip all the actions if the stage has been cancelled.
          return
        }
        ...
    ```
    
    #### Relevant spark driver logs as follows:
    
    ```
    20/01/02 11:21:45 INFO DAGScheduler: Got job 0 (main at NativeMethodAccessorImpl.java:0) with 2 output partitions
    20/01/02 11:21:45 INFO DAGScheduler: Final stage: ResultStage 1 (main at NativeMethodAccessorImpl.java:0)
    20/01/02 11:21:45 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
    20/01/02 11:21:45 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
    
    20/01/02 11:21:45 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
    20/01/02 11:21:45 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1))
    20/01/02 11:21:45 INFO YarnClusterScheduler: Adding task set 0.0 with 2 tasks
    20/01/02 11:21:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
    20/01/02 11:21:45 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 9.76.13.26, executor 2, partition 1, PROCESS_LOCAL, 7705 bytes)
    20/01/02 11:22:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 32491 ms on 9.179.143.4 (executor 1) (1/2)
    20/01/02 11:22:26 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 40544 ms on 9.76.13.26 (executor 2) (2/2)
    20/01/02 11:22:26 INFO DAGScheduler: ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) finished in 40.854 s
    20/01/02 11:22:26 INFO YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
    
    20/01/02 11:22:26 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
    20/01/02 11:22:26 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[6] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0, 1))
    20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 1.0 with 2 tasks
    20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, 9.179.143.4, executor 1, partition 0, NODE_LOCAL, 7929 bytes)
    20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 9.76.13.26, executor 2, partition 1, NODE_LOCAL, 7929 bytes)
    20/01/02 11:22:26 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 79 ms on 9.179.143.4 (executor 1) (1/2)
    
    20/01/02 11:22:26 INFO TaskSetManager: Marking task 1 in stage 1.0 (on 9.76.13.26) as speculatable because it ran more than 158 ms
    20/01/02 11:22:26 INFO TaskSetManager: Starting task 1.1 in stage 1.0 (TID 4, 9.179.143.52, executor 3, partition 1, ANY, 7929 bytes)
    20/01/02 11:22:26 WARN TaskSetManager: Lost task 1.1 in stage 1.0 (TID 4, 9.179.143.52, executor 3): FetchFailed(BlockManagerId(1, 9.179.143.4, 7337, None), shuffleId=0, mapId=0, reduceId=1, message=org.apache.spark.shuffle.FetchFailedException: Connection reset by peer)
    20/01/02 11:22:26 INFO TaskSetManager: Task 1.1 in stage 1.0 (TID 4) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).
    20/01/02 11:22:26 INFO DAGScheduler: Marking ResultStage 1 (main at NativeMethodAccessorImpl.java:0) as failed due to a fetch failure from ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0)
    20/01/02 11:22:26 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) failed in 0.261 s due to org.apache.spark.shuffle.FetchFailedException: Connection reset by peer
    20/01/02 11:22:26 INFO DAGScheduler: Resubmitting ShuffleMapStage 0 (main at NativeMethodAccessorImpl.java:0) and ResultStage 1 (main at NativeMethodAccessorImpl.java:0) due to fetch failure
    20/01/02 11:22:26 INFO DAGScheduler: Resubmitting failed stages
    
    20/01/02 11:22:26 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0), which has no missing parents
    20/01/02 11:22:26 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[2] at main at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
    20/01/02 11:22:26 INFO YarnClusterScheduler: Adding task set 0.1 with 1 tasks
    20/01/02 11:22:26 INFO TaskSetManager: Starting task 0.0 in stage 0.1 (TID 5, 9.179.143.4, executor 1, partition 0, PROCESS_LOCAL, 7704 bytes)
    
    // NOTE: Here should be "INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 10000 ms on 9.76.13.26 (executor 2) (2/2)"
    // and this bug is being fixed in https://issues.apache.org/jira/browse/SPARK-30404
    
    20/01/02 11:22:36 INFO TaskSetManager: Ignoring task-finished event for 1.0 in stage 1.0 because task 1 has already completed successfully
    
    20/01/02 11:22:36 INFO YarnClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
    20/01/02 11:22:36 INFO DAGScheduler: ResultStage 1 (main at NativeMethodAccessorImpl.java:0) finished in 10.131 s
    20/01/02 11:22:36 INFO DAGScheduler: Job 0 finished: main at NativeMethodAccessorImpl.java:0, took 51.031212 s
    
    20/01/02 11:22:58 INFO TaskSetManager: Finished task 0.0 in stage 0.1 (TID 5) in 32029 ms on 9.179.143.4 (executor 1) (1/1)
    20/01/02 11:22:58 INFO YarnClusterScheduler: Removed TaskSet 0.1, whose tasks have all completed, from pool
    ```
    
    ### Why are the changes needed?
    
    web UI is incorrect: ```stage 0 (retry 1)``` is finished, but it stays in ```Active Stages``` Page.
    
    ![active_stage](https://user-images.githubusercontent.com/4401756/71656718-71185680-2d77-11ea-8dbc-fd8085ab3dfb.png)
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    A new test case is added.
    
    And test manually on cluster. The result is as follows:
    ![cancel_stage](https://user-images.githubusercontent.com/4401756/71658434-04a15580-2d7f-11ea-952b-dd8dd685f37d.png)
    
    Closes #27050 from liangxs/master.
    
    Authored-by: xuesenliang <xu...@tencent.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 27 +++++++------
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 44 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index fd5c3e0..a226b65 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1434,6 +1434,7 @@ private[spark] class DAGScheduler(
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
                     markStageAsFinished(resultStage)
+                    cancelRunningIndependentStages(job, s"Job ${job.jobId} is finished.")
                     cleanupStateForJobAndIndependentStages(job)
                     try {
                       // killAllTaskAttempts will fail if a SchedulerBackend does not implement
@@ -1978,18 +1979,12 @@ private[spark] class DAGScheduler(
     }
   }
 
-  /** Fails a job and all stages that are only used by that job, and cleans up relevant state. */
-  private def failJobAndIndependentStages(
-      job: ActiveJob,
-      failureReason: String,
-      exception: Option[Throwable] = None): Unit = {
-    val error = new SparkException(failureReason, exception.orNull)
+  /** Cancel all independent, running stages that are only used by this job. */
+  private def cancelRunningIndependentStages(job: ActiveJob, reason: String): Boolean = {
     var ableToCancelStages = true
-
-    // Cancel all independent, running stages.
     val stages = jobIdToStageIds(job.jobId)
     if (stages.isEmpty) {
-      logError("No stages registered for job " + job.jobId)
+      logError(s"No stages registered for job ${job.jobId}")
     }
     stages.foreach { stageId =>
       val jobsForStage: Option[HashSet[Int]] = stageIdToStage.get(stageId).map(_.jobIds)
@@ -2001,12 +1996,12 @@ private[spark] class DAGScheduler(
         if (!stageIdToStage.contains(stageId)) {
           logError(s"Missing Stage for stage with id $stageId")
         } else {
-          // This is the only job that uses this stage, so fail the stage if it is running.
+          // This stage is only used by the job, so finish the stage if it is running.
           val stage = stageIdToStage(stageId)
           if (runningStages.contains(stage)) {
             try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
               taskScheduler.cancelTasks(stageId, shouldInterruptTaskThread(job))
-              markStageAsFinished(stage, Some(failureReason))
+              markStageAsFinished(stage, Some(reason))
             } catch {
               case e: UnsupportedOperationException =>
                 logWarning(s"Could not cancel tasks for stage $stageId", e)
@@ -2016,11 +2011,19 @@ private[spark] class DAGScheduler(
         }
       }
     }
+    ableToCancelStages
+  }
 
-    if (ableToCancelStages) {
+  /** Fails a job and all stages that are only used by that job, and cleans up relevant state. */
+  private def failJobAndIndependentStages(
+      job: ActiveJob,
+      failureReason: String,
+      exception: Option[Throwable] = None): Unit = {
+    if (cancelRunningIndependentStages(job, failureReason)) {
       // SPARK-15783 important to cleanup state first, just for tests where we have some asserts
       // against the state.  Otherwise we have a *little* bit of flakiness in the tests.
       cleanupStateForJobAndIndependentStages(job)
+      val error = new SparkException(failureReason, exception.orNull)
       job.listener.jobFailed(error)
       listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
     }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index e40b63f..2b2fd32 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1933,6 +1933,50 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     assertDataStructuresEmpty()
   }
 
+  test("shuffle fetch failed on speculative task, but original task succeed (SPARK-30388)") {
+    var completedStage: List[Int] = Nil
+    val listener = new SparkListener() {
+      override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
+        completedStage = completedStage :+ event.stageInfo.stageId
+      }
+    }
+    sc.addSparkListener(listener)
+
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+    submit(reduceRdd, Array(0, 1))
+    completeShuffleMapStageSuccessfully(0, 0, 2)
+    assert(completedStage === List(0))
+
+    // result task 0.0 succeed
+    runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success, 42))
+    // speculative result task 1.1 fetch failed
+    val info = new TaskInfo(4, index = 1, attemptNumber = 1, 0L, "", "", TaskLocality.ANY, true)
+    runEvent(makeCompletionEvent(
+        taskSets(1).tasks(1),
+        FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0L, 0, 1, "ignored"),
+        null,
+        Seq.empty,
+        Array.empty,
+        info
+      )
+    )
+    assert(completedStage === List(0, 1))
+
+    Thread.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2)
+    // map stage resubmitted
+    assert(scheduler.runningStages.size === 1)
+    val mapStage = scheduler.runningStages.head
+    assert(mapStage.id === 0)
+    assert(mapStage.latestInfo.failureReason.isEmpty)
+
+    // original result task 1.0 succeed
+    runEvent(makeCompletionEvent(taskSets(1).tasks(1), Success, 42))
+    assert(completedStage === List(0, 1, 1, 0))
+    assert(scheduler.activeJobs.isEmpty)
+  }
+
   test("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
     val acc = new LongAccumulator {
       override def add(v: java.lang.Long): Unit = throw new DAGSchedulerSuiteDummyException


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org