You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2023/03/13 07:23:06 UTC

[spark] branch master updated: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c7aa90c771 [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry
9c7aa90c771 is described below

commit 9c7aa90c771868da727073f9941b8b2c4b856946
Author: Tengfei Huang <te...@gmail.com>
AuthorDate: Mon Mar 13 02:22:49 2023 -0500

    [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry
    
    ### What changes were proposed in this pull request?
    Currently a stage will be resubmitted in a few scenarios:
    1. Task failed with `FetchFailed` will trigger stage re-submit;
    2. Barrier task failed;
    3. Shuffle data loss due to executor/host decommissioned;
    
    For the first 2 scenarios, there is a config `spark.stage.maxConsecutiveAttempts` to limit the retry times. While for the 3rd scenario, there'll be potential risks for inifinite retry if there are always executors hosting the shuffle data from successful tasks got killed/lost, the stage will be re-run again and again.
    
    To avoid the potential risk, the proposal in this PR is to add a new config `spark.stage.maxConsecutiveAttempts` to limit the overall max attempts number for each stage, the stage will be aborted once the retry times beyond the limitation.
    
    ### Why are the changes needed?
    To avoid the potential risks for stage infinite retry.
    
    ### Does this PR introduce _any_ user-facing change?
    Added limitation for stage retry times, so jobs may fail if they need to retry for mutiplte times beyond the limitation.
    
    ### How was this patch tested?
    Added new UT.
    
    Closes #40286 from ivoson/SPARK-42577.
    
    Authored-by: Tengfei Huang <te...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../org/apache/spark/internal/config/package.scala | 10 ++++
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 30 ++++++++----
 .../scala/org/apache/spark/scheduler/Stage.scala   |  1 +
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 56 +++++++++++++++++++++-
 4 files changed, 87 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 99f9b78c09b..7f93bf76216 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2479,4 +2479,14 @@ package object config {
       .version("3.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val STAGE_MAX_ATTEMPTS =
+    ConfigBuilder("spark.stage.maxAttempts")
+      .doc("Specify the max attempts for a stage - the spark job will be aborted if any of its " +
+        "stages is resubmitted multiple times beyond the max retries limitation. The maximum " +
+        "number of stage retries is the maximum of `spark.stage.maxAttempts` and " +
+        "`spark.stage.maxConsecutiveAttempts`.")
+      .version("3.5.0")
+      .intConf
+      .createWithDefault(Int.MaxValue)
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 1a1f0cbba7f..cc018ac6aec 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -232,6 +232,13 @@ private[spark] class DAGScheduler(
     sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
       DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
 
+  /**
+   * Max stage attempts allowed before a stage is aborted.
+   */
+  private[scheduler] val maxStageAttempts: Int = {
+    Math.max(maxConsecutiveStageAttempts, sc.getConf.get(config.STAGE_MAX_ATTEMPTS))
+  }
+
   /**
    * Whether ignore stage fetch failure caused by executor decommission when
    * count spark.stage.maxConsecutiveAttempts
@@ -1355,16 +1362,23 @@ private[spark] class DAGScheduler(
       logDebug(s"submitStage($stage (name=${stage.name};" +
         s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
       if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
-        val missing = getMissingParentStages(stage).sortBy(_.id)
-        logDebug("missing: " + missing)
-        if (missing.isEmpty) {
-          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
-          submitMissingTasks(stage, jobId.get)
+        if (stage.getNextAttemptId >= maxStageAttempts) {
+          val reason = s"$stage (name=${stage.name}) has been resubmitted for the maximum " +
+            s"allowable number of times: ${maxStageAttempts}, which is the max value of " +
+            s"config `spark.stage.maxAttempts` and `spark.stage.maxConsecutiveAttempts`."
+          abortStage(stage, reason, None)
         } else {
-          for (parent <- missing) {
-            submitStage(parent)
+          val missing = getMissingParentStages(stage).sortBy(_.id)
+          logDebug("missing: " + missing)
+          if (missing.isEmpty) {
+            logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
+            submitMissingTasks(stage, jobId.get)
+          } else {
+            for (parent <- missing) {
+              submitStage(parent)
+            }
+            waitingStages += stage
           }
-          waitingStages += stage
         }
       }
     } else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 97072115ff8..f35beafd874 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -70,6 +70,7 @@ private[scheduler] abstract class Stage(
 
   /** The ID to use for the next new attempt for this stage. */
   private var nextAttemptId: Int = 0
+  private[scheduler] def getNextAttemptId: Int = nextAttemptId
 
   val name: String = callSite.shortForm
   val details: String = callSite.longForm
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index cc6562ef017..d441abe2233 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -405,13 +405,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     mapOutputTracker = spy(new MyMapOutputTrackerMaster(sc.getConf, broadcastManager))
     blockManagerMaster = spy(new MyBlockManagerMaster(sc.getConf))
     doNothing().when(blockManagerMaster).updateRDDBlockVisibility(any(), any())
-    scheduler = new MyDAGScheduler(
+    scheduler = spy(new MyDAGScheduler(
       sc,
       taskScheduler,
       sc.listenerBus,
       mapOutputTracker,
       blockManagerMaster,
-      sc.env)
+      sc.env))
 
     dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler)
   }
@@ -4595,6 +4595,58 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
     }
   }
 
+  test("SPARK-42577: fail the job if a shuffle map stage attempts beyond the limitation") {
+    setupStageAbortTest(sc)
+    doAnswer(_ => 2).when(scheduler).maxStageAttempts
+
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
+    submit(reduceRdd, Array(0))
+
+    // Stage 0 got scheduled with 2 tasks.
+    assert(taskSets.size === 1 && taskSets(0).tasks.length === 2)
+    assert(taskSets(0).stageId === 0)
+    val stage0 = scheduler.stageIdToStage(0)
+
+    // Task 0 of stage 0 finished successfully on hostA and then executor on hostA got killed and
+    // shuffle data got lost. Then task 1 of stage 0 finished successfully on hostB.  Stage 0 will
+    // be resubmitted due to shuffle data lost.
+    runEvent(makeCompletionEvent(taskSets(0).tasks(0), Success,
+      makeMapStatus("hostA", reduces = 1, mapTaskId = 0),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+    runEvent(ExecutorLost("hostA-exec", ExecutorKilled))
+    runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success,
+      makeMapStatus("hostB", reduces = 1, mapTaskId = 1),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+    assert(taskSets.size === 2 && taskSets(1).tasks.length === 1)
+    assert(taskSets(1).stageId === 0 && taskSets(1).stageAttemptId === 1)
+
+    // Executor on hostB got killed so that shuffle data from task 1 will be lost, after the
+    // resubmitted task completes stage 0 will be resubmitted again due to shuffle data missing.
+    // While because of the 2 times stage max attempts limitation, the job should be aborted.
+    runEvent(ExecutorLost("hostB-exec", ExecutorKilled))
+    runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success,
+      makeMapStatus("hostC", reduces = 1, mapTaskId = 2),
+      Seq.empty, Array.empty, createFakeTaskInfoWithId(2)))
+
+    // Stage should have been aborted and removed from running stages
+    assertDataStructuresEmpty()
+    sc.listenerBus.waitUntilEmpty()
+    assert(ended)
+
+    val expectedMsg = s"$stage0 (name=${stage0.name}) has been resubmitted for the maximum " +
+      s"allowable number of times: 2, which is the max value of " +
+      s"config `spark.stage.maxAttempts` and `spark.stage.maxConsecutiveAttempts`."
+
+    jobResult match {
+      case JobFailed(reason) =>
+        assert(reason.getMessage.contains(expectedMsg))
+      case other => fail(s"expected JobFailed, not $other")
+    }
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
    * Note that this checks only the host and not the executor ID.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org