You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2017/03/17 14:35:07 UTC

spark git commit: [SPARK-13369] Add config for number of consecutive fetch failures

Repository: spark
Updated Branches:
  refs/heads/master 13538cf3d -> 7b5d873ae


[SPARK-13369] Add config for number of consecutive fetch failures

The previously hardcoded max 4 retries per stage is not suitable for all cluster configurations. Since spark retries a stage at the sign of the first fetch failure, you can easily end up with many stage retries to discover all the failures. In particular, two scenarios this value should change are (1) if there are more than 4 executors per node; in that case, it may take 4 retries to discover the problem with each executor on the node and (2) during cluster maintenance on large clusters, where multiple machines are serviced at once, but you also cannot afford total cluster downtime. By making this value configurable, cluster managers can tune this value to something more appropriate to their cluster configuration.

Unit tests

Author: Sital Kedia <sk...@fb.com>

Closes #17307 from sitalkedia/SPARK-13369.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b5d873a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b5d873a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b5d873a

Branch: refs/heads/master
Commit: 7b5d873aef672aa0aee41e338bab7428101e1ad3
Parents: 13538cf
Author: Sital Kedia <sk...@fb.com>
Authored: Fri Mar 17 09:33:45 2017 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Fri Mar 17 09:33:58 2017 -0500

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/DAGScheduler.scala | 15 +++++++++++++--
 .../scala/org/apache/spark/scheduler/Stage.scala  | 18 +-----------------
 .../spark/scheduler/DAGSchedulerSuite.scala       | 16 ++++++++--------
 docs/configuration.md                             |  5 +++++
 4 files changed, 27 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7b5d873a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 692ed80..d944f26 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -187,6 +187,13 @@ class DAGScheduler(
   /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
   private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
 
+  /**
+   * Number of consecutive stage attempts allowed before a stage is aborted.
+   */
+  private[scheduler] val maxConsecutiveStageAttempts =
+    sc.getConf.getInt("spark.stage.maxConsecutiveAttempts",
+      DAGScheduler.DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS)
+
   private val messageScheduler =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
 
@@ -1282,8 +1289,9 @@ class DAGScheduler(
               s"longer running")
           }
 
+          failedStage.fetchFailedAttemptIds.add(task.stageAttemptId)
           val shouldAbortStage =
-            failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId) ||
+            failedStage.fetchFailedAttemptIds.size >= maxConsecutiveStageAttempts ||
             disallowStageRetryForTest
 
           if (shouldAbortStage) {
@@ -1292,7 +1300,7 @@ class DAGScheduler(
             } else {
               s"""$failedStage (${failedStage.name})
                  |has failed the maximum allowable number of
-                 |times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}.
+                 |times: $maxConsecutiveStageAttempts.
                  |Most recent failure reason: $failureMessage""".stripMargin.replaceAll("\n", " ")
             }
             abortStage(failedStage, abortMessage, None)
@@ -1726,4 +1734,7 @@ private[spark] object DAGScheduler {
   // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
   // as more failure events come in
   val RESUBMIT_TIMEOUT = 200
+
+  // Number of consecutive stage attempts allowed before a stage is aborted
+  val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7b5d873a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 32e5df6..290fd07 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -87,23 +87,12 @@ private[scheduler] abstract class Stage(
    * We keep track of each attempt ID that has failed to avoid recording duplicate failures if
    * multiple tasks from the same stage attempt fail (SPARK-5945).
    */
-  private val fetchFailedAttemptIds = new HashSet[Int]
+  val fetchFailedAttemptIds = new HashSet[Int]
 
   private[scheduler] def clearFailures() : Unit = {
     fetchFailedAttemptIds.clear()
   }
 
-  /**
-   * Check whether we should abort the failedStage due to multiple consecutive fetch failures.
-   *
-   * This method updates the running set of failed stage attempts and returns
-   * true if the number of failures exceeds the allowable number of failures.
-   */
-  private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
-    fetchFailedAttemptIds.add(stageAttemptId)
-    fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
-  }
-
   /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
   def makeNewStageAttempt(
       numPartitionsToCompute: Int,
@@ -128,8 +117,3 @@ private[scheduler] abstract class Stage(
   /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
   def findMissingPartitions(): Seq[Int]
 }
-
-private[scheduler] object Stage {
-  // The number of consecutive failures allowed before a stage is aborted
-  val MAX_CONSECUTIVE_FETCH_FAILURES = 4
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7b5d873a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 8eaf9df..dfad5db 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -801,7 +801,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
     submit(reduceRdd, Array(0, 1))
 
-    for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
+    for (attempt <- 0 until scheduler.maxConsecutiveStageAttempts) {
       // Complete all the tasks for the current attempt of stage 0 successfully
       completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
 
@@ -813,7 +813,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
       // map output, for the next iteration through the loop
       scheduler.resubmitFailedStages()
 
-      if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
+      if (attempt < scheduler.maxConsecutiveStageAttempts - 1) {
         assert(scheduler.runningStages.nonEmpty)
         assert(!ended)
       } else {
@@ -847,11 +847,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
 
     // In the first two iterations, Stage 0 succeeds and stage 1 fails. In the next two iterations,
     // stage 2 fails.
-    for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
+    for (attempt <- 0 until scheduler.maxConsecutiveStageAttempts) {
       // Complete all the tasks for the current attempt of stage 0 successfully
       completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
 
-      if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2) {
+      if (attempt < scheduler.maxConsecutiveStageAttempts / 2) {
         // Now we should have a new taskSet, for a new attempt of stage 1.
         // Fail all these tasks with FetchFailure
         completeNextStageWithFetchFailure(1, attempt, shuffleDepOne)
@@ -859,8 +859,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         completeShuffleMapStageSuccessfully(1, attempt, numShufflePartitions = 1)
 
         // Fail stage 2
-        completeNextStageWithFetchFailure(2, attempt - Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2,
-          shuffleDepTwo)
+        completeNextStageWithFetchFailure(2,
+          attempt - scheduler.maxConsecutiveStageAttempts / 2, shuffleDepTwo)
       }
 
       // this will trigger a resubmission of stage 0, since we've lost some of its
@@ -872,7 +872,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     completeShuffleMapStageSuccessfully(1, 4, numShufflePartitions = 1)
 
     // Succeed stage2 with a "42"
-    completeNextResultStageWithSuccess(2, Stage.MAX_CONSECUTIVE_FETCH_FAILURES/2)
+    completeNextResultStageWithSuccess(2, scheduler.maxConsecutiveStageAttempts / 2)
 
     assert(results === Map(0 -> 42))
     assertDataStructuresEmpty()
@@ -895,7 +895,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     submit(finalRdd, Array(0))
 
     // First, execute stages 0 and 1, failing stage 1 up to MAX-1 times.
-    for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
+    for (attempt <- 0 until scheduler.maxConsecutiveStageAttempts - 1) {
       // Make each task in stage 0 success
       completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7b5d873a/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 63392a7..4729f1b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1506,6 +1506,11 @@ Apart from these, the following properties are also available, and may be useful
     of this setting is to act as a safety-net to prevent runaway uncancellable tasks from rendering
     an executor unusable.
   </td>
+  <td><code>spark.stage.maxConsecutiveAttempts</code></td>
+  <td>4</td>
+  <td>
+    Number of consecutive stage attempts allowed before a stage is aborted.
+  </td>
 </tr>
 </table>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org