You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/24 05:52:03 UTC
[spark] branch master updated: [SPARK-42101][SQL][FOLLOWUP] Make QueryStageExec.resultOption and isMeterialized consistent
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8982cee315f [SPARK-42101][SQL][FOLLOWUP] Make QueryStageExec.resultOption and isMeterialized consistent
8982cee315f is described below
commit 8982cee315f4a940c340b2f55646fbde78e6231e
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Mar 24 13:51:46 2023 +0800
[SPARK-42101][SQL][FOLLOWUP] Make QueryStageExec.resultOption and isMeterialized consistent
### What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/39624 . `QueryStageExec.isMeterialized` should only return true if `resultOption` is assigned. It can be a potential bug to have this inconsistency.
### Why are the changes needed?
fix potential bug
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes #40522 from cloud-fan/follow.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 53 ++++++++++------------
.../adaptive/InsertAdaptiveSparkPlan.scala | 4 ++
.../sql/execution/adaptive/QueryStageExec.scala | 26 ++++++-----
3 files changed, 43 insertions(+), 40 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 69998fdf7e6..b3f3b74019e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -537,12 +537,13 @@ case class AdaptiveSparkPlanExec(
}
case i: InMemoryTableScanExec =>
+ // There is no reuse for `InMemoryTableScanExec`, which is different from `Exchange`. If we
+ // hit it the first time, we should always create a new query stage.
val newStage = newQueryStage(i)
- val isMaterialized = newStage.isMaterialized
CreateStageResult(
newPlan = newStage,
- allChildStagesMaterialized = isMaterialized,
- newStages = if (isMaterialized) Seq.empty else Seq(newStage))
+ allChildStagesMaterialized = false,
+ newStages = Seq(newStage))
case q: QueryStageExec =>
CreateStageResult(newPlan = q,
@@ -561,34 +562,30 @@ case class AdaptiveSparkPlanExec(
}
private def newQueryStage(plan: SparkPlan): QueryStageExec = {
- val optimizedPlan = plan match {
- case e: Exchange =>
- e.withNewChildren(Seq(optimizeQueryStage(e.child, isFinalStage = false)))
- case _ => plan
- }
- val newPlan = applyPhysicalRules(
- optimizedPlan,
- postStageCreationRules(outputsColumnar = plan.supportsColumnar),
- Some((planChangeLogger, "AQE Post Stage Creation")))
val queryStage = plan match {
- case s: ShuffleExchangeLike =>
- if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
- throw SparkException.internalError(
- "Custom columnar rules cannot transform shuffle node to something else.")
- }
- ShuffleQueryStageExec(currentStageId, newPlan, s.canonicalized)
- case b: BroadcastExchangeLike =>
- if (!newPlan.isInstanceOf[BroadcastExchangeLike]) {
- throw SparkException.internalError(
- "Custom columnar rules cannot transform broadcast node to something else.")
+ case e: Exchange =>
+ val optimized = e.withNewChildren(Seq(optimizeQueryStage(e.child, isFinalStage = false)))
+ val newPlan = applyPhysicalRules(
+ optimized,
+ postStageCreationRules(outputsColumnar = plan.supportsColumnar),
+ Some((planChangeLogger, "AQE Post Stage Creation")))
+ if (e.isInstanceOf[ShuffleExchangeLike]) {
+ if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
+ throw SparkException.internalError(
+ "Custom columnar rules cannot transform shuffle node to something else.")
+ }
+ ShuffleQueryStageExec(currentStageId, newPlan, e.canonicalized)
+ } else {
+ assert(e.isInstanceOf[BroadcastExchangeLike])
+ if (!newPlan.isInstanceOf[BroadcastExchangeLike]) {
+ throw SparkException.internalError(
+ "Custom columnar rules cannot transform broadcast node to something else.")
+ }
+ BroadcastQueryStageExec(currentStageId, newPlan, e.canonicalized)
}
- BroadcastQueryStageExec(currentStageId, newPlan, b.canonicalized)
case i: InMemoryTableScanExec =>
- if (!newPlan.isInstanceOf[InMemoryTableScanExec]) {
- throw SparkException.internalError("Custom columnar rules cannot transform " +
- "`InMemoryTableScanExec` node to something else.")
- }
- TableCacheQueryStageExec(currentStageId, newPlan.asInstanceOf[InMemoryTableScanExec])
+ // No need to optimize `InMemoryTableScanExec` as it's a leaf node.
+ TableCacheQueryStageExec(currentStageId, i)
}
currentStageId += 1
setLogicalLinkForNewQueryStage(queryStage, plan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 633142170e1..a0635255706 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -96,6 +96,10 @@ case class InsertAdaptiveSparkPlan(
plan.exists {
case _: Exchange => true
case p if !p.requiredChildDistribution.forall(_ == UnspecifiedDistribution) => true
+ // AQE framework has a different way to update the query plan in the UI: it updates the plan
+ // at the end of execution, while non-AQE updates the plan before execution. If the cached
+ // plan is already AQEed, the current plan must be AQEed as well so that the UI can get plan
+ // update correctly.
case i: InMemoryTableScanExec
if i.relation.cachedPlan.isInstanceOf[AdaptiveSparkPlanExec] => true
case p => p.expressions.exists(_.exists {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index a27f783215e..d48b4fe1751 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -86,7 +86,7 @@ abstract class QueryStageExec extends LeafExecNode {
protected var _resultOption = new AtomicReference[Option[Any]](None)
private[adaptive] def resultOption: AtomicReference[Option[Any]] = _resultOption
- def isMaterialized: Boolean = resultOption.get().isDefined
+ final def isMaterialized: Boolean = resultOption.get().isDefined
override def output: Seq[Attribute] = plan.output
override def outputPartitioning: Partitioning = plan.outputPartitioning
@@ -275,20 +275,22 @@ case class TableCacheQueryStageExec(
}
@transient
- private lazy val future: FutureAction[Unit] = {
- val rdd = inMemoryTableScan.baseCacheRDD()
- sparkContext.submitJob(
- rdd,
- (_: Iterator[CachedBatch]) => (),
- (0 until rdd.getNumPartitions).toSeq,
- (_: Int, _: Unit) => (),
- ()
- )
+ private lazy val future: Future[Unit] = {
+ if (inMemoryTableScan.isMaterialized) {
+ Future.successful(())
+ } else {
+ val rdd = inMemoryTableScan.baseCacheRDD()
+ sparkContext.submitJob(
+ rdd,
+ (_: Iterator[CachedBatch]) => (),
+ (0 until rdd.getNumPartitions).toSeq,
+ (_: Int, _: Unit) => (),
+ ()
+ )
+ }
}
override protected def doMaterialize(): Future[Any] = future
- override def isMaterialized: Boolean = super.isMaterialized || inMemoryTableScan.isMaterialized
-
override def getRuntimeStatistics: Statistics = inMemoryTableScan.relation.computeStats()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org