You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2020/03/03 21:42:19 UTC

[spark] branch branch-3.0 updated: [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2732980  [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize
2732980 is described below

commit 27329806c36d0b403153fe1ad0077acb72d92606
Author: yi.wu <yi...@databricks.com>
AuthorDate: Tue Mar 3 13:40:51 2020 -0800

    [SPARK-30999][SQL] Don't cancel a QueryStageExec which failed before call doMaterialize
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to not cancel a `QueryStageExec` which failed before calling `doMaterialize`.
    
    Besides, this PR also includes 2 minor improvements:
    
    * fail fast when stage failed before calling `doMaterialize`
    
    * format Exception with Cause
    
    ### Why are the changes needed?
    
    For a stage which failed before materializing the lazy value (e.g. `inputRDD`), calling `cancel` on it could re-trigger the same failure again, e.g. executing child node again(see `AdaptiveQueryExecSuite`.`SPARK-30291: AQE should catch the exceptions when doing materialize` for example). And finally, the same failure will be counted 2 times, one is for materialize error and another is for cancel error.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Updated test.
    
    Closes #27752 from Ngone51/avoid_cancel_finished_stage.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
    (cherry picked from commit 380e8876316d6ef5a74358be2a04ab20e8b6e7ca)
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala | 23 +++++++++++++---------
 .../adaptive/AdaptiveQueryExecSuite.scala          |  3 ++-
 2 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 4036424..c018ca4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -165,7 +165,7 @@ case class AdaptiveSparkPlanExec(
           stagesToReplace = result.newStages ++ stagesToReplace
           executionId.foreach(onUpdatePlan)
 
-          // Start materialization of all new stages.
+          // Start materialization of all new stages and fail fast if any stages failed eagerly
           result.newStages.foreach { stage =>
             try {
               stage.materialize().onComplete { res =>
@@ -176,7 +176,10 @@ case class AdaptiveSparkPlanExec(
                 }
               }(AdaptiveSparkPlanExec.executionContext)
             } catch {
-              case e: Throwable => events.offer(StageFailure(stage, e))
+              case e: Throwable =>
+                val ex = new SparkException(
+                  s"Early failed query stage found: ${stage.treeString}", e)
+                cleanUpAndThrowException(Seq(ex), Some(stage.id))
             }
           }
         }
@@ -192,13 +195,12 @@ case class AdaptiveSparkPlanExec(
             stage.resultOption = Some(res)
           case StageFailure(stage, ex) =>
             errors.append(
-              new SparkException(s"Failed to materialize query stage: ${stage.treeString}." +
-                s" and the cause is ${ex.getMessage}", ex))
+              new SparkException(s"Failed to materialize query stage: ${stage.treeString}.", ex))
         }
 
         // In case of errors, we cancel all running stages and throw exception.
         if (errors.nonEmpty) {
-          cleanUpAndThrowException(errors)
+          cleanUpAndThrowException(errors, None)
         }
 
         // Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less
@@ -522,9 +524,13 @@ case class AdaptiveSparkPlanExec(
    * Cancel all running stages with best effort and throw an Exception containing all stage
    * materialization errors and stage cancellation errors.
    */
-  private def cleanUpAndThrowException(errors: Seq[SparkException]): Unit = {
+  private def cleanUpAndThrowException(
+       errors: Seq[SparkException],
+       earlyFailedStage: Option[Int]): Unit = {
     val runningStages = currentPhysicalPlan.collect {
-      case s: QueryStageExec => s
+      // earlyFailedStage is the stage which failed before calling doMaterialize,
+      // so we should avoid calling cancel on it to re-trigger the failure again.
+      case s: QueryStageExec if !earlyFailedStage.contains(s.id) => s
     }
     val cancelErrors = new mutable.ArrayBuffer[SparkException]()
     try {
@@ -539,8 +545,7 @@ case class AdaptiveSparkPlanExec(
       }
     } finally {
       val ex = new SparkException(
-        "Adaptive execution failed due to stage materialization failures." +
-          s" and the cause is ${errors.head.getMessage}", errors.head)
+        "Adaptive execution failed due to stage materialization failures.", errors.head)
       errors.tail.foreach(ex.addSuppressed)
       cancelErrors.foreach(ex.addSuppressed)
       throw ex
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 17f6b29..500b6cc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -692,7 +692,8 @@ class AdaptiveQueryExecSuite
         val error = intercept[Exception] {
           agged.count()
         }
-        assert(error.getCause().toString contains "Failed to materialize query stage")
+        assert(error.getCause().toString contains "Early failed query stage found")
+        assert(error.getSuppressed.size === 0)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org