You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/19 19:37:12 UTC

[GitHub] [spark] mridulm commented on a diff in pull request #37924: [SPARK-40481][CORE] Ignore stage fetch failure caused by decommissioned executor

mridulm commented on code in PR #37924:
URL: https://github.com/apache/spark/pull/37924#discussion_r974600104


##########
docs/configuration.md:
##########
@@ -2605,6 +2605,15 @@ Apart from these, the following properties are also available, and may be useful
   </td>
   <td>2.2.0</td>
 </tr>
+<tr>
+  <td><code>spark.stage.attempt.ignoreOnDecommissionFetchFailure</code></td>

Review Comment:
   `spark.stage.attempt.ignoreOnDecommissionFetchFailure` -> `spark.stage.ignoreOnDecommissionFetchFailure`



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -1860,8 +1867,18 @@ private[spark] class DAGScheduler(
             s"(attempt ${failedStage.latestInfo.attemptNumber}) running")
         } else {
           failedStage.failedAttemptIds.add(task.stageAttemptId)
+          val ignoreStageFailure = ignoreDecommissionFetchFailure &&
+            isExecutorDecommissioned(taskScheduler, bmAddress)
+          if (ignoreStageFailure) {
+            logInfo("Ignoring fetch failure from $task of $failedStage attempt " +
+              s"${task.stageAttemptId} when count spark.stage.maxConsecutiveAttempts " +
+              "as executor ${bmAddress.executorId} is decommissioned and " +
+              s" ${config.STAGE_IGNORE_DECOMMISSION_FETCH_FAILURE.key}=true")
+          }
+
           val shouldAbortStage =
-            failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts ||
+            (!ignoreStageFailure &&
+              failedStage.failedAttemptIds.size >= maxConsecutiveStageAttempts) ||
             disallowStageRetryForTest

Review Comment:
   QQ: We are preventing the immediate failure from aborting the stage, but might be effectively reducing the number of stage failures which can be tolerated ?
   
   For example:
   attempt 0, attempt 1, attempt 2 failed due to decommission
   attempt 3 failed for other reasons -> job failed (assuming maxConsecutiveStageAttempts = 4)
   
   Is this the behavior we will now exhibit ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org