You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/12 13:36:21 UTC
[spark] branch master updated: [SPARK-35881][SQL][FOLLOWUP] Remove
the AQE post stage creation extension
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 124d011 [SPARK-35881][SQL][FOLLOWUP] Remove the AQE post stage creation extension
124d011 is described below
commit 124d011ee73f9805ac840aa5a6eddc27cd09b2e1
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Aug 12 21:35:28 2021 +0800
[SPARK-35881][SQL][FOLLOWUP] Remove the AQE post stage creation extension
### What changes were proposed in this pull request?
This is a followup of #33140
It turns out that we may be able to complete the AQE and columnar execution integration without the AQE post stage creation extension. The rule `ApplyColumnarRulesAndInsertTransitions` can add to-columnar transition if the shuffle/broadcast supports columnar.
### Why are the changes needed?
remove APIs that are not needed.
### Does this PR introduce _any_ user-facing change?
No, the APIs are not released yet.
### How was this patch tested?
existing and manual tests
Closes #33701 from cloud-fan/aqe.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/sql/SparkSessionExtensions.scala | 20 --------------------
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 16 ++++++++++------
.../spark/sql/internal/BaseSessionStateBuilder.scala | 7 +------
.../org/apache/spark/sql/internal/SessionState.scala | 3 +--
4 files changed, 12 insertions(+), 34 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
index 18ebae5..a4ec481 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
@@ -47,7 +47,6 @@ import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
* <li>(External) Catalog listeners.</li>
* <li>Columnar Rules.</li>
* <li>Adaptive Query Stage Preparation Rules.</li>
- * <li>Adaptive Query Post Stage Preparation Rules.</li>
* </ul>
*
* The extensions can be used by calling `withExtensions` on the [[SparkSession.Builder]], for
@@ -111,12 +110,9 @@ class SparkSessionExtensions {
type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder)
type ColumnarRuleBuilder = SparkSession => ColumnarRule
type QueryStagePrepRuleBuilder = SparkSession => Rule[SparkPlan]
- type PostStageCreationRuleBuilder = SparkSession => Rule[SparkPlan]
private[this] val columnarRuleBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]
private[this] val queryStagePrepRuleBuilders = mutable.Buffer.empty[QueryStagePrepRuleBuilder]
- private[this] val postStageCreationRuleBuilders =
- mutable.Buffer.empty[PostStageCreationRuleBuilder]
/**
* Build the override rules for columnar execution.
@@ -133,14 +129,6 @@ class SparkSessionExtensions {
}
/**
- * Build the override rules for the final query stage preparation phase of adaptive query
- * execution.
- */
- private[sql] def buildPostStageCreationRules(session: SparkSession): Seq[Rule[SparkPlan]] = {
- postStageCreationRuleBuilders.map(_.apply(session)).toSeq
- }
-
- /**
* Inject a rule that can override the columnar execution of an executor.
*/
def injectColumnar(builder: ColumnarRuleBuilder): Unit = {
@@ -155,14 +143,6 @@ class SparkSessionExtensions {
queryStagePrepRuleBuilders += builder
}
- /**
- * Inject a rule that can override the final query stage preparation phase of adaptive query
- * execution.
- */
- def injectPostStageCreationRule(builder: PostStageCreationRuleBuilder): Unit = {
- postStageCreationRuleBuilders += builder
- }
-
private[this] val resolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 2c242d1..cd47fd0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -125,14 +125,18 @@ case class AdaptiveSparkPlanExec(
OptimizeShuffleWithLocalRead
)
- @transient private val staticPostStageCreationRules: Seq[Rule[SparkPlan]] =
- CollapseCodegenStages() +: context.session.sessionState.postStageCreationRules
+ // This rule is stateful as it maintains the codegen stage ID. We can't create a fresh one every
+ // time and need to keep it in a variable.
+ @transient private val collapseCodegenStagesRule: Rule[SparkPlan] =
+ CollapseCodegenStages()
// A list of physical optimizer rules to be applied right after a new stage is created. The input
// plan to these rules has exchange as its root node.
- private def postStageCreationRules(outputsColumnar: Boolean) =
+ private def postStageCreationRules(outputsColumnar: Boolean) = Seq(
ApplyColumnarRulesAndInsertTransitions(
- context.session.sessionState.columnarRules, outputsColumnar) +: staticPostStageCreationRules
+ context.session.sessionState.columnarRules, outputsColumnar),
+ collapseCodegenStagesRule
+ )
private def optimizeQueryStage(plan: SparkPlan, isFinalStage: Boolean): SparkPlan = {
val optimized = queryStageOptimizerRules.foldLeft(plan) { case (latestPlan, rule) =>
@@ -522,7 +526,7 @@ case class AdaptiveSparkPlanExec(
case s: ShuffleExchangeLike =>
val newShuffle = applyPhysicalRules(
s.withNewChildren(Seq(optimizedPlan)),
- postStageCreationRules(outputsColumnar = false),
+ postStageCreationRules(outputsColumnar = s.supportsColumnar),
Some((planChangeLogger, "AQE Post Stage Creation")))
if (!newShuffle.isInstanceOf[ShuffleExchangeLike]) {
throw new IllegalStateException(
@@ -532,7 +536,7 @@ case class AdaptiveSparkPlanExec(
case b: BroadcastExchangeLike =>
val newBroadcast = applyPhysicalRules(
b.withNewChildren(Seq(optimizedPlan)),
- postStageCreationRules(outputsColumnar = false),
+ postStageCreationRules(outputsColumnar = b.supportsColumnar),
Some((planChangeLogger, "AQE Post Stage Creation")))
if (!newBroadcast.isInstanceOf[BroadcastExchangeLike]) {
throw new IllegalStateException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 1c0c916..8289819 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -307,10 +307,6 @@ abstract class BaseSessionStateBuilder(
extensions.buildQueryStagePrepRules(session)
}
- protected def postStageCreationRules: Seq[Rule[SparkPlan]] = {
- extensions.buildPostStageCreationRules(session)
- }
-
/**
* Create a query execution object.
*/
@@ -364,8 +360,7 @@ abstract class BaseSessionStateBuilder(
createQueryExecution,
createClone,
columnarRules,
- queryStagePrepRules,
- postStageCreationRules)
+ queryStagePrepRules)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 7685e54..cdf764a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -79,8 +79,7 @@ private[sql] class SessionState(
createQueryExecution: (LogicalPlan, CommandExecutionMode.Value) => QueryExecution,
createClone: (SparkSession, SessionState) => SessionState,
val columnarRules: Seq[ColumnarRule],
- val queryStagePrepRules: Seq[Rule[SparkPlan]],
- val postStageCreationRules: Seq[Rule[SparkPlan]]) {
+ val queryStagePrepRules: Seq[Rule[SparkPlan]]) {
// The following fields are lazy to avoid creating the Hive client when creating SessionState.
lazy val catalog: SessionCatalog = catalogBuilder()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org