You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2020/03/03 21:42:19 UTC
[spark] branch branch-3.0 updated: [SPARK-30999][SQL] Don't cancel
a QueryStageExec which failed before call doMaterialize
This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2732980 [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize
2732980 is described below
commit 27329806c36d0b403153fe1ad0077acb72d92606
Author: yi.wu <yi...@databricks.com>
AuthorDate: Tue Mar 3 13:40:51 2020 -0800
[SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize
### What changes were proposed in this pull request?
This PR proposes to not cancel a `QueryStageExec` which failed before calling `doMaterialize`.
Besides, this PR also includes 2 minor improvements:
* fail fast when stage failed before calling `doMaterialize`
* format Exception with Cause
### Why are the changes needed?
For a stage which failed before materializing the lazy value (e.g. `inputRDD`), calling `cancel` on it could re-trigger the same failure again, e.g. executing child node again(see `AdaptiveQueryExecSuite`.`SPARK-30291: AQE should catch the exceptions when doing materialize` for example). And finally, the same failure will be counted 2 times, one is for materialize error and another is for cancel error.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Updated test.
Closes #27752 from Ngone51/avoid_cancel_finished_stage.
Authored-by: yi.wu <yi...@databricks.com>
Signed-off-by: gatorsmile <ga...@gmail.com>
(cherry picked from commit 380e8876316d6ef5a74358be2a04ab20e8b6e7ca)
Signed-off-by: gatorsmile <ga...@gmail.com>
---
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 23 +++++++++++++---------
.../adaptive/AdaptiveQueryExecSuite.scala | 3 ++-
2 files changed, 16 insertions(+), 10 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 4036424..c018ca4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -165,7 +165,7 @@ case class AdaptiveSparkPlanExec(
stagesToReplace = result.newStages ++ stagesToReplace
executionId.foreach(onUpdatePlan)
- // Start materialization of all new stages.
+ // Start materialization of all new stages and fail fast if any stages failed eagerly
result.newStages.foreach { stage =>
try {
stage.materialize().onComplete { res =>
@@ -176,7 +176,10 @@ case class AdaptiveSparkPlanExec(
}
}(AdaptiveSparkPlanExec.executionContext)
} catch {
- case e: Throwable => events.offer(StageFailure(stage, e))
+ case e: Throwable =>
+ val ex = new SparkException(
+ s"Early failed query stage found: ${stage.treeString}", e)
+ cleanUpAndThrowException(Seq(ex), Some(stage.id))
}
}
}
@@ -192,13 +195,12 @@ case class AdaptiveSparkPlanExec(
stage.resultOption = Some(res)
case StageFailure(stage, ex) =>
errors.append(
- new SparkException(s"Failed to materialize query stage: ${stage.treeString}." +
- s" and the cause is ${ex.getMessage}", ex))
+ new SparkException(s"Failed to materialize query stage: ${stage.treeString}.", ex))
}
// In case of errors, we cancel all running stages and throw exception.
if (errors.nonEmpty) {
- cleanUpAndThrowException(errors)
+ cleanUpAndThrowException(errors, None)
}
// Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less
@@ -522,9 +524,13 @@ case class AdaptiveSparkPlanExec(
* Cancel all running stages with best effort and throw an Exception containing all stage
* materialization errors and stage cancellation errors.
*/
- private def cleanUpAndThrowException(errors: Seq[SparkException]): Unit = {
+ private def cleanUpAndThrowException(
+ errors: Seq[SparkException],
+ earlyFailedStage: Option[Int]): Unit = {
val runningStages = currentPhysicalPlan.collect {
- case s: QueryStageExec => s
+ // earlyFailedStage is the stage which failed before calling doMaterialize,
+ // so we should avoid calling cancel on it to re-trigger the failure again.
+ case s: QueryStageExec if !earlyFailedStage.contains(s.id) => s
}
val cancelErrors = new mutable.ArrayBuffer[SparkException]()
try {
@@ -539,8 +545,7 @@ case class AdaptiveSparkPlanExec(
}
} finally {
val ex = new SparkException(
- "Adaptive execution failed due to stage materialization failures." +
- s" and the cause is ${errors.head.getMessage}", errors.head)
+ "Adaptive execution failed due to stage materialization failures.", errors.head)
errors.tail.foreach(ex.addSuppressed)
cancelErrors.foreach(ex.addSuppressed)
throw ex
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 17f6b29..500b6cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -692,7 +692,8 @@ class AdaptiveQueryExecSuite
val error = intercept[Exception] {
agged.count()
}
- assert(error.getCause().toString contains "Failed to materialize query stage")
+ assert(error.getCause().toString contains "Early failed query stage found")
+ assert(error.getSuppressed.size === 0)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org