You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2023/03/13 07:23:06 UTC
[spark] branch master updated: [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9c7aa90c771 [SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry
9c7aa90c771 is described below
commit 9c7aa90c771868da727073f9941b8b2c4b856946
Author: Tengfei Huang <te...@gmail.com>
AuthorDate: Mon Mar 13 02:22:49 2023 -0500
[SPARK-42577][CORE] Add max attempts limitation for stages to avoid potential infinite retry
### What changes were proposed in this pull request?
Currently a stage will be resubmitted in a few scenarios:
1. Task failed with `FetchFailed` will trigger stage re-submit;
2. Barrier task failed;
3. Shuffle data loss due to executor/host decommissioned;
For the first 2 scenarios, there is a config `spark.stage.maxConsecutiveAttempts` to limit the retry times. While for the 3rd scenario, there'll be potential risks for inifinite retry if there are always executors hosting the shuffle data from successful tasks got killed/lost, the stage will be re-run again and again.
To avoid the potential risk, the proposal in this PR is to add a new config `spark.stage.maxConsecutiveAttempts` to limit the overall max attempts number for each stage, the stage will be aborted once the retry times beyond the limitation.
### Why are the changes needed?
To avoid the potential risks for stage infinite retry.
### Does this PR introduce _any_ user-facing change?
Added limitation for stage retry times, so jobs may fail if they need to retry for mutiplte times beyond the limitation.
### How was this patch tested?
Added new UT.
Closes #40286 from ivoson/SPARK-42577.
Authored-by: Tengfei Huang <te...@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../org/apache/spark/internal/config/package.scala | 10 ++++
.../org/apache/spark/scheduler/DAGScheduler.scala | 30 ++++++++----
.../scala/org/apache/spark/scheduler/Stage.scala | 1 +
.../apache/spark/scheduler/DAGSchedulerSuite.scala | 56 +++++++++++++++++++++-
4 files changed, 87 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 99f9b78c09b..7f93bf76216 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2479,4 +2479,14 @@ package object config {
.version("3.5.0")
.booleanConf
.createWithDefault(false)
+
+ private[spark] val STAGE_MAX_ATTEMPTS =
+ ConfigBuilder("spark.stage.maxAttempts")
+ .doc("Specify the max attempts for a stage - the spark job will be aborted if any of its " +
+ "stages is resubmitted multiple times beyond the max retries limitation. The maximum " +
+ "number of stage retries is the maximum of `spark.stage.maxAttempts` and " +
+ "`spark.stage.maxConsecutiveAttempts`.")
+ .version("3.5.0")
+ .intConf
+ .createWithDefault(Int.MaxValue)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 1a1f0cbba7f..cc018ac6aec 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -232,6 +232,13 @@ private[spark] class DAGScheduler(
sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
+ /**
+ * Max stage attempts allowed before a stage is aborted.
+ */
+ private[scheduler] val maxStageAttempts: Int = {
+ Math.max(maxConsecutiveStageAttempts, sc.getConf.get(config.STAGE_MAX_ATTEMPTS))
+ }
+
/**
* Whether ignore stage fetch failure caused by executor decommission when
* count spark.stage.maxConsecutiveAttempts
@@ -1355,16 +1362,23 @@ private[spark] class DAGScheduler(
logDebug(s"submitStage($stage (name=${stage.name};" +
s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
- val missing = getMissingParentStages(stage).sortBy(_.id)
- logDebug("missing: " + missing)
- if (missing.isEmpty) {
- logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
- submitMissingTasks(stage, jobId.get)
+ if (stage.getNextAttemptId >= maxStageAttempts) {
+ val reason = s"$stage (name=${stage.name}) has been resubmitted for the maximum " +
+ s"allowable number of times: ${maxStageAttempts}, which is the max value of " +
+ s"config `spark.stage.maxAttempts` and `spark.stage.maxConsecutiveAttempts`."
+ abortStage(stage, reason, None)
} else {
- for (parent <- missing) {
- submitStage(parent)
+ val missing = getMissingParentStages(stage).sortBy(_.id)
+ logDebug("missing: " + missing)
+ if (missing.isEmpty) {
+ logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
+ submitMissingTasks(stage, jobId.get)
+ } else {
+ for (parent <- missing) {
+ submitStage(parent)
+ }
+ waitingStages += stage
}
- waitingStages += stage
}
}
} else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 97072115ff8..f35beafd874 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -70,6 +70,7 @@ private[scheduler] abstract class Stage(
/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0
+ private[scheduler] def getNextAttemptId: Int = nextAttemptId
val name: String = callSite.shortForm
val details: String = callSite.longForm
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index cc6562ef017..d441abe2233 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -405,13 +405,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
mapOutputTracker = spy(new MyMapOutputTrackerMaster(sc.getConf, broadcastManager))
blockManagerMaster = spy(new MyBlockManagerMaster(sc.getConf))
doNothing().when(blockManagerMaster).updateRDDBlockVisibility(any(), any())
- scheduler = new MyDAGScheduler(
+ scheduler = spy(new MyDAGScheduler(
sc,
taskScheduler,
sc.listenerBus,
mapOutputTracker,
blockManagerMaster,
- sc.env)
+ sc.env))
dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler)
}
@@ -4595,6 +4595,58 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
}
}
+ test("SPARK-42577: fail the job if a shuffle map stage attempts beyond the limitation") {
+ setupStageAbortTest(sc)
+ doAnswer(_ => 2).when(scheduler).maxStageAttempts
+
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
+ submit(reduceRdd, Array(0))
+
+ // Stage 0 got scheduled with 2 tasks.
+ assert(taskSets.size === 1 && taskSets(0).tasks.length === 2)
+ assert(taskSets(0).stageId === 0)
+ val stage0 = scheduler.stageIdToStage(0)
+
+ // Task 0 of stage 0 finished successfully on hostA and then executor on hostA got killed and
+ // shuffle data got lost. Then task 1 of stage 0 finished successfully on hostB. Stage 0 will
+ // be resubmitted due to shuffle data lost.
+ runEvent(makeCompletionEvent(taskSets(0).tasks(0), Success,
+ makeMapStatus("hostA", reduces = 1, mapTaskId = 0),
+ Seq.empty, Array.empty, createFakeTaskInfoWithId(0)))
+ runEvent(ExecutorLost("hostA-exec", ExecutorKilled))
+ runEvent(makeCompletionEvent(taskSets(0).tasks(1), Success,
+ makeMapStatus("hostB", reduces = 1, mapTaskId = 1),
+ Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
+ assert(taskSets.size === 2 && taskSets(1).tasks.length === 1)
+ assert(taskSets(1).stageId === 0 && taskSets(1).stageAttemptId === 1)
+
+ // Executor on hostB got killed so that shuffle data from task 1 will be lost, after the
+ // resubmitted task completes stage 0 will be resubmitted again due to shuffle data missing.
+ // While because of the 2 times stage max attempts limitation, the job should be aborted.
+ runEvent(ExecutorLost("hostB-exec", ExecutorKilled))
+ runEvent(makeCompletionEvent(taskSets(1).tasks(0), Success,
+ makeMapStatus("hostC", reduces = 1, mapTaskId = 2),
+ Seq.empty, Array.empty, createFakeTaskInfoWithId(2)))
+
+ // Stage should have been aborted and removed from running stages
+ assertDataStructuresEmpty()
+ sc.listenerBus.waitUntilEmpty()
+ assert(ended)
+
+ val expectedMsg = s"$stage0 (name=${stage0.name}) has been resubmitted for the maximum " +
+ s"allowable number of times: 2, which is the max value of " +
+ s"config `spark.stage.maxAttempts` and `spark.stage.maxConsecutiveAttempts`."
+
+ jobResult match {
+ case JobFailed(reason) =>
+ assert(reason.getMessage.contains(expectedMsg))
+ case other => fail(s"expected JobFailed, not $other")
+ }
+ }
+
/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org