You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/12 13:36:21 UTC

[spark] branch master updated: [SPARK-35881][SQL][FOLLOWUP] Remove the AQE post stage creation extension

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 124d011  [SPARK-35881][SQL][FOLLOWUP] Remove the AQE post stage creation extension
124d011 is described below

commit 124d011ee73f9805ac840aa5a6eddc27cd09b2e1
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Aug 12 21:35:28 2021 +0800

    [SPARK-35881][SQL][FOLLOWUP] Remove the AQE post stage creation extension
    
    ### What changes were proposed in this pull request?
    
    This is a followup of #33140
    
    It turns out that we may be able to complete the AQE and columnar execution integration without the AQE post stage creation extension. The rule `ApplyColumnarRulesAndInsertTransitions` can add to-columnar transition if the shuffle/broadcast supports columnar.
    
    ### Why are the changes needed?
    
    remove APIs that are not needed.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, the APIs are not released yet.
    
    ### How was this patch tested?
    
    existing and manual tests
    
    Closes #33701 from cloud-fan/aqe.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/SparkSessionExtensions.scala    | 20 --------------------
 .../execution/adaptive/AdaptiveSparkPlanExec.scala   | 16 ++++++++++------
 .../spark/sql/internal/BaseSessionStateBuilder.scala |  7 +------
 .../org/apache/spark/sql/internal/SessionState.scala |  3 +--
 4 files changed, 12 insertions(+), 34 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
index 18ebae5..a4ec481 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
@@ -47,7 +47,6 @@ import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
  * <li>(External) Catalog listeners.</li>
  * <li>Columnar Rules.</li>
  * <li>Adaptive Query Stage Preparation Rules.</li>
- * <li>Adaptive Query Post Stage Preparation Rules.</li>
  * </ul>
  *
  * The extensions can be used by calling `withExtensions` on the [[SparkSession.Builder]], for
@@ -111,12 +110,9 @@ class SparkSessionExtensions {
   type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder)
   type ColumnarRuleBuilder = SparkSession => ColumnarRule
   type QueryStagePrepRuleBuilder = SparkSession => Rule[SparkPlan]
-  type PostStageCreationRuleBuilder = SparkSession => Rule[SparkPlan]
 
   private[this] val columnarRuleBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]
   private[this] val queryStagePrepRuleBuilders = mutable.Buffer.empty[QueryStagePrepRuleBuilder]
-  private[this] val postStageCreationRuleBuilders =
-    mutable.Buffer.empty[PostStageCreationRuleBuilder]
 
   /**
    * Build the override rules for columnar execution.
@@ -133,14 +129,6 @@ class SparkSessionExtensions {
   }
 
   /**
-   * Build the override rules for the final query stage preparation phase of adaptive query
-   * execution.
-   */
-  private[sql] def buildPostStageCreationRules(session: SparkSession): Seq[Rule[SparkPlan]] = {
-    postStageCreationRuleBuilders.map(_.apply(session)).toSeq
-  }
-
-  /**
    * Inject a rule that can override the columnar execution of an executor.
    */
   def injectColumnar(builder: ColumnarRuleBuilder): Unit = {
@@ -155,14 +143,6 @@ class SparkSessionExtensions {
     queryStagePrepRuleBuilders += builder
   }
 
-  /**
-   * Inject a rule that can override the final query stage preparation phase of adaptive query
-   * execution.
-   */
-  def injectPostStageCreationRule(builder: PostStageCreationRuleBuilder): Unit = {
-    postStageCreationRuleBuilders += builder
-  }
-
   private[this] val resolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 2c242d1..cd47fd0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -125,14 +125,18 @@ case class AdaptiveSparkPlanExec(
     OptimizeShuffleWithLocalRead
   )
 
-  @transient private val staticPostStageCreationRules: Seq[Rule[SparkPlan]] =
-    CollapseCodegenStages() +: context.session.sessionState.postStageCreationRules
+  // This rule is stateful as it maintains the codegen stage ID. We can't create a fresh one every
+  // time and need to keep it in a variable.
+  @transient private val collapseCodegenStagesRule: Rule[SparkPlan] =
+    CollapseCodegenStages()
 
   // A list of physical optimizer rules to be applied right after a new stage is created. The input
   // plan to these rules has exchange as its root node.
-  private def postStageCreationRules(outputsColumnar: Boolean) =
+  private def postStageCreationRules(outputsColumnar: Boolean) = Seq(
     ApplyColumnarRulesAndInsertTransitions(
-      context.session.sessionState.columnarRules, outputsColumnar) +: staticPostStageCreationRules
+      context.session.sessionState.columnarRules, outputsColumnar),
+    collapseCodegenStagesRule
+  )
 
   private def optimizeQueryStage(plan: SparkPlan, isFinalStage: Boolean): SparkPlan = {
     val optimized = queryStageOptimizerRules.foldLeft(plan) { case (latestPlan, rule) =>
@@ -522,7 +526,7 @@ case class AdaptiveSparkPlanExec(
       case s: ShuffleExchangeLike =>
         val newShuffle = applyPhysicalRules(
           s.withNewChildren(Seq(optimizedPlan)),
-          postStageCreationRules(outputsColumnar = false),
+          postStageCreationRules(outputsColumnar = s.supportsColumnar),
           Some((planChangeLogger, "AQE Post Stage Creation")))
         if (!newShuffle.isInstanceOf[ShuffleExchangeLike]) {
           throw new IllegalStateException(
@@ -532,7 +536,7 @@ case class AdaptiveSparkPlanExec(
       case b: BroadcastExchangeLike =>
         val newBroadcast = applyPhysicalRules(
           b.withNewChildren(Seq(optimizedPlan)),
-          postStageCreationRules(outputsColumnar = false),
+          postStageCreationRules(outputsColumnar = b.supportsColumnar),
           Some((planChangeLogger, "AQE Post Stage Creation")))
         if (!newBroadcast.isInstanceOf[BroadcastExchangeLike]) {
           throw new IllegalStateException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 1c0c916..8289819 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -307,10 +307,6 @@ abstract class BaseSessionStateBuilder(
     extensions.buildQueryStagePrepRules(session)
   }
 
-  protected def postStageCreationRules: Seq[Rule[SparkPlan]] = {
-    extensions.buildPostStageCreationRules(session)
-  }
-
   /**
    * Create a query execution object.
    */
@@ -364,8 +360,7 @@ abstract class BaseSessionStateBuilder(
       createQueryExecution,
       createClone,
       columnarRules,
-      queryStagePrepRules,
-      postStageCreationRules)
+      queryStagePrepRules)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 7685e54..cdf764a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -79,8 +79,7 @@ private[sql] class SessionState(
     createQueryExecution: (LogicalPlan, CommandExecutionMode.Value) => QueryExecution,
     createClone: (SparkSession, SessionState) => SessionState,
     val columnarRules: Seq[ColumnarRule],
-    val queryStagePrepRules: Seq[Rule[SparkPlan]],
-    val postStageCreationRules: Seq[Rule[SparkPlan]]) {
+    val queryStagePrepRules: Seq[Rule[SparkPlan]]) {
 
   // The following fields are lazy to avoid creating the Hive client when creating SessionState.
   lazy val catalog: SessionCatalog = catalogBuilder()

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org