You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/24 05:52:03 UTC

[spark] branch master updated: [SPARK-42101][SQL][FOLLOWUP] Make QueryStageExec.resultOption and isMeterialized consistent

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8982cee315f [SPARK-42101][SQL][FOLLOWUP] Make QueryStageExec.resultOption and isMeterialized consistent
8982cee315f is described below

commit 8982cee315f4a940c340b2f55646fbde78e6231e
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Mar 24 13:51:46 2023 +0800

    [SPARK-42101][SQL][FOLLOWUP] Make QueryStageExec.resultOption and isMeterialized consistent
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/39624 . `QueryStageExec.isMeterialized` should only return true if `resultOption` is assigned. It can be a potential bug to have this inconsistency.
    
    ### Why are the changes needed?
    
    fix potential bug
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #40522 from cloud-fan/follow.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala | 53 ++++++++++------------
 .../adaptive/InsertAdaptiveSparkPlan.scala         |  4 ++
 .../sql/execution/adaptive/QueryStageExec.scala    | 26 ++++++-----
 3 files changed, 43 insertions(+), 40 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 69998fdf7e6..b3f3b74019e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -537,12 +537,13 @@ case class AdaptiveSparkPlanExec(
       }
 
     case i: InMemoryTableScanExec =>
+      // There is no reuse for `InMemoryTableScanExec`, which is different from `Exchange`. If we
+      // hit it the first time, we should always create a new query stage.
       val newStage = newQueryStage(i)
-      val isMaterialized = newStage.isMaterialized
       CreateStageResult(
         newPlan = newStage,
-        allChildStagesMaterialized = isMaterialized,
-        newStages = if (isMaterialized) Seq.empty else Seq(newStage))
+        allChildStagesMaterialized = false,
+        newStages = Seq(newStage))
 
     case q: QueryStageExec =>
       CreateStageResult(newPlan = q,
@@ -561,34 +562,30 @@ case class AdaptiveSparkPlanExec(
   }
 
   private def newQueryStage(plan: SparkPlan): QueryStageExec = {
-    val optimizedPlan = plan match {
-      case e: Exchange =>
-        e.withNewChildren(Seq(optimizeQueryStage(e.child, isFinalStage = false)))
-      case _ => plan
-    }
-    val newPlan = applyPhysicalRules(
-      optimizedPlan,
-      postStageCreationRules(outputsColumnar = plan.supportsColumnar),
-      Some((planChangeLogger, "AQE Post Stage Creation")))
     val queryStage = plan match {
-      case s: ShuffleExchangeLike =>
-        if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
-          throw SparkException.internalError(
-            "Custom columnar rules cannot transform shuffle node to something else.")
-        }
-        ShuffleQueryStageExec(currentStageId, newPlan, s.canonicalized)
-      case b: BroadcastExchangeLike =>
-        if (!newPlan.isInstanceOf[BroadcastExchangeLike]) {
-          throw SparkException.internalError(
-            "Custom columnar rules cannot transform broadcast node to something else.")
+      case e: Exchange =>
+        val optimized = e.withNewChildren(Seq(optimizeQueryStage(e.child, isFinalStage = false)))
+        val newPlan = applyPhysicalRules(
+          optimized,
+          postStageCreationRules(outputsColumnar = plan.supportsColumnar),
+          Some((planChangeLogger, "AQE Post Stage Creation")))
+        if (e.isInstanceOf[ShuffleExchangeLike]) {
+          if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
+            throw SparkException.internalError(
+              "Custom columnar rules cannot transform shuffle node to something else.")
+          }
+          ShuffleQueryStageExec(currentStageId, newPlan, e.canonicalized)
+        } else {
+          assert(e.isInstanceOf[BroadcastExchangeLike])
+          if (!newPlan.isInstanceOf[BroadcastExchangeLike]) {
+            throw SparkException.internalError(
+              "Custom columnar rules cannot transform broadcast node to something else.")
+          }
+          BroadcastQueryStageExec(currentStageId, newPlan, e.canonicalized)
         }
-        BroadcastQueryStageExec(currentStageId, newPlan, b.canonicalized)
       case i: InMemoryTableScanExec =>
-        if (!newPlan.isInstanceOf[InMemoryTableScanExec]) {
-          throw SparkException.internalError("Custom columnar rules cannot transform " +
-            "`InMemoryTableScanExec` node to something else.")
-        }
-        TableCacheQueryStageExec(currentStageId, newPlan.asInstanceOf[InMemoryTableScanExec])
+        // No need to optimize `InMemoryTableScanExec` as it's a leaf node.
+        TableCacheQueryStageExec(currentStageId, i)
     }
     currentStageId += 1
     setLogicalLinkForNewQueryStage(queryStage, plan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 633142170e1..a0635255706 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -96,6 +96,10 @@ case class InsertAdaptiveSparkPlan(
       plan.exists {
         case _: Exchange => true
         case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true
+        // AQE framework has a different way to update the query plan in the UI: it updates the plan
+        // at the end of execution, while non-AQE updates the plan before execution. If the cached
+        // plan is already AQEed, the current plan must be AQEed as well so that the UI can get plan
+        // update correctly.
         case i: InMemoryTableScanExec
             if i.relation.cachedPlan.isInstanceOf[AdaptiveSparkPlanExec] => true
         case p => p.expressions.exists(_.exists {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index a27f783215e..d48b4fe1751 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -86,7 +86,7 @@ abstract class QueryStageExec extends LeafExecNode {
   protected var _resultOption = new AtomicReference[Option[Any]](None)
 
   private[adaptive] def resultOption: AtomicReference[Option[Any]] = _resultOption
-  def isMaterialized: Boolean = resultOption.get().isDefined
+  final def isMaterialized: Boolean = resultOption.get().isDefined
 
   override def output: Seq[Attribute] = plan.output
   override def outputPartitioning: Partitioning = plan.outputPartitioning
@@ -275,20 +275,22 @@ case class TableCacheQueryStageExec(
   }
 
   @transient
-  private lazy val future: FutureAction[Unit] = {
-    val rdd = inMemoryTableScan.baseCacheRDD()
-    sparkContext.submitJob(
-      rdd,
-      (_: Iterator[CachedBatch]) => (),
-      (0 until rdd.getNumPartitions).toSeq,
-      (_: Int, _: Unit) => (),
-      ()
-    )
+  private lazy val future: Future[Unit] = {
+    if (inMemoryTableScan.isMaterialized) {
+      Future.successful(())
+    } else {
+      val rdd = inMemoryTableScan.baseCacheRDD()
+      sparkContext.submitJob(
+        rdd,
+        (_: Iterator[CachedBatch]) => (),
+        (0 until rdd.getNumPartitions).toSeq,
+        (_: Int, _: Unit) => (),
+        ()
+      )
+    }
   }
 
   override protected def doMaterialize(): Future[Any] = future
 
-  override def isMaterialized: Boolean = super.isMaterialized || inMemoryTableScan.isMaterialized
-
   override def getRuntimeStatistics: Statistics = inMemoryTableScan.relation.computeStats()
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org