You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2017/02/18 14:56:42 UTC

spark git commit: [SPARK-19263] DAGScheduler should avoid sending conflicting task set.

Repository: spark
Updated Branches:
  refs/heads/master 21c7d3c31 -> 729ce3703


[SPARK-19263] DAGScheduler should avoid sending conflicting task set.

In current `DAGScheduler handleTaskCompletion` code, when event.reason is `Success`, it will first do `stage.pendingPartitions -= task.partitionId`, which maybe a bug when `FetchFailed` happens.

**Think about below**

1.  Stage 0 runs and generates shuffle output data.
2. Stage 1 reads the output from stage 0 and generates more shuffle data. It has two tasks: ShuffleMapTask1 and ShuffleMapTask2, and these tasks are launched on executorA.
3. ShuffleMapTask1 fails to fetch blocks locally and sends a FetchFailed to the driver. The driver marks executorA as lost and updates failedEpoch;
4. The driver resubmits stage 0 so the missing output can be re-generated, and then once it completes, resubmits stage 1 with ShuffleMapTask1x and ShuffleMapTask2x.
5. ShuffleMapTask2 (from the original attempt of stage 1) successfully finishes on executorA and sends Success back to driver. This causes DAGScheduler::handleTaskCompletion to remove partition 2 from stage.pendingPartitions (line 1149), but it does not add the partition to the set of output locations (line 1192), because the task\u2019s epoch is less than the failure epoch for the executor (because of the earlier failure on executor A)
6. ShuffleMapTask1x successfully finishes on executorB, causing the driver to remove partition 1 from stage.pendingPartitions. Combined with the previous step, this means that there are no more pending partitions for the stage, so the DAGScheduler marks the stage as finished (line 1196). However, the shuffle stage is not available (line 1215) because the completion for ShuffleMapTask2 was ignored because of its epoch, so the DAGScheduler resubmits the stage.
7. ShuffleMapTask2x is still running, so when TaskSchedulerImpl::submitTasks is called for the re-submitted stage, it throws an error, because there\u2019s an existing active task set

**In this fix**

If a task completion is from a previous stage attempt and the epoch is too low
(i.e., it was from a failed executor), don't remove the corresponding partition
from pendingPartitions.

Author: jinxing <ji...@meituan.com>
Author: jinxing <ji...@126.com>

Closes #16620 from jinxing64/SPARK-19263.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/729ce370
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/729ce370
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/729ce370

Branch: refs/heads/master
Commit: 729ce3703257aa34c00c5c8253e6971faf6a0c8d
Parents: 21c7d3c
Author: jinxing <ji...@meituan.com>
Authored: Sat Feb 18 10:49:40 2017 -0400
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Sat Feb 18 10:55:18 2017 -0400

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 22 +++++-
 .../org/apache/spark/scheduler/Stage.scala      |  2 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     | 70 ++++++++++++++++++++
 3 files changed, 91 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/729ce370/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 69101ac..0b7d371 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1181,15 +1181,33 @@ class DAGScheduler(
 
           case smt: ShuffleMapTask =>
             val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
-            shuffleStage.pendingPartitions -= task.partitionId
             updateAccumulators(event)
             val status = event.result.asInstanceOf[MapStatus]
             val execId = status.location.executorId
             logDebug("ShuffleMapTask finished on " + execId)
+            if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) {
+              // This task was for the currently running attempt of the stage. Since the task
+              // completed successfully from the perspective of the TaskSetManager, mark it as
+              // no longer pending (the TaskSetManager may consider the task complete even
+              // when the output needs to be ignored because the task's epoch is too small below.
+              // In this case, when pending partitions is empty, there will still be missing
+              // output locations, which will cause the DAGScheduler to resubmit the stage below.)
+              shuffleStage.pendingPartitions -= task.partitionId
+            }
             if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
               logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
             } else {
+              // The epoch of the task is acceptable (i.e., the task was launched after the most
+              // recent failure we're aware of for the executor), so mark the task's output as
+              // available.
               shuffleStage.addOutputLoc(smt.partitionId, status)
+              // Remove the task's partition from pending partitions. This may have already been
+              // done above, but will not have been done yet in cases where the task attempt was
+              // from an earlier attempt of the stage (i.e., not the attempt that's currently
+              // running).  This allows the DAGScheduler to mark the stage as complete when one
+              // copy of each task has finished successfully, even if the currently active stage
+              // still has tasks running.
+              shuffleStage.pendingPartitions -= task.partitionId
             }
 
             if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
@@ -1213,7 +1231,7 @@ class DAGScheduler(
               clearCacheLocs()
 
               if (!shuffleStage.isAvailable) {
-                // Some tasks had failed; let's resubmit this shuffleStage
+                // Some tasks had failed; let's resubmit this shuffleStage.
                 // TODO: Lower-level scheduler should also deal with this
                 logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
                   ") because some of its tasks had failed: " +

http://git-wip-us.apache.org/repos/asf/spark/blob/729ce370/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index c6fc038..32e5df6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -74,7 +74,7 @@ private[scheduler] abstract class Stage(
   val details: String = callSite.longForm
 
   /**
-   * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
+   * Pointer to the [[StageInfo]] object for the most recent attempt. This needs to be initialized
    * here, before any attempts have actually been created, because the DAGScheduler uses this
    * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
    * have been created).

http://git-wip-us.apache.org/repos/asf/spark/blob/729ce370/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4e5f267..c735220 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -2161,6 +2161,76 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     }
   }
 
+  test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
+      " even with late completions from earlier stage attempts") {
+    // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC
+    val rddA = new MyRDD(sc, 2, Nil)
+    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
+    val shuffleIdA = shuffleDepA.shuffleId
+
+    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
+    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
+
+    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
+
+    submit(rddC, Array(0, 1))
+
+    // Complete both tasks in rddA.
+    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", 2)),
+      (Success, makeMapStatus("hostA", 2))))
+
+    // Fetch failed for task(stageId=1, stageAttemptId=0, partitionId=0) running on hostA
+    // and task(stageId=1, stageAttemptId=0, partitionId=1) is still running.
+    assert(taskSets(1).stageId === 1 && taskSets(1).stageAttemptId === 0)
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
+        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
+      result = null))
+
+    // Both original tasks in rddA should be marked as failed, because they ran on the
+    // failed hostA, so both should be resubmitted. Complete them on hostB successfully.
+    scheduler.resubmitFailedStages()
+    assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1
+      && taskSets(2).tasks.size === 2)
+    complete(taskSets(2), Seq(
+      (Success, makeMapStatus("hostB", 2)),
+      (Success, makeMapStatus("hostB", 2))))
+
+    // Complete task(stageId=1, stageAttemptId=0, partitionId=1) running on failed hostA
+    // successfully. The success should be ignored because the task started before the
+    // executor failed, so the output may have been lost.
+    runEvent(makeCompletionEvent(
+      taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))
+
+    // Both tasks in rddB should be resubmitted, because none of them has succeeded truely.
+    // Complete the task(stageId=1, stageAttemptId=1, partitionId=0) successfully.
+    // Task(stageId=1, stageAttemptId=1, partitionId=1) of this new active stage attempt
+    // is still running.
+    assert(taskSets(3).stageId === 1 && taskSets(3).stageAttemptId === 1
+      && taskSets(3).tasks.size === 2)
+    runEvent(makeCompletionEvent(
+      taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
+
+    // There should be no new attempt of stage submitted,
+    // because task(stageId=1, stageAttempt=1, partitionId=1) is still running in
+    // the current attempt (and hasn't completed successfully in any earlier attempts).
+    assert(taskSets.size === 4)
+
+    // Complete task(stageId=1, stageAttempt=1, partitionId=1) successfully.
+    runEvent(makeCompletionEvent(
+      taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2)))
+
+    // Now the ResultStage should be submitted, because all of the tasks of rddB have
+    // completed successfully on alive executors.
+    assert(taskSets.size === 5 && taskSets(4).tasks(0).isInstanceOf[ResultTask[_, _]])
+    complete(taskSets(4), Seq(
+      (Success, 1),
+      (Success, 1)))
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
    * Note that this checks only the host and not the executor ID.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org