You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/14 05:24:19 UTC

[spark] branch branch-3.0 updated: [SPARK-30953][SQL] InsertAdaptiveSparkPlan should apply AQE on child plan of write commands

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ca1adf7  [SPARK-30953][SQL] InsertAdaptiveSparkPlan should apply AQE on child plan of write commands
ca1adf7 is described below

commit ca1adf76b1d1fc15fa8d03450080072062d6f672
Author: yi.wu <yi...@databricks.com>
AuthorDate: Tue Apr 14 05:18:58 2020 +0000

    [SPARK-30953][SQL] InsertAdaptiveSparkPlan should apply AQE on child plan of write commands
    
    This PR changes `InsertAdaptiveSparkPlan` to apply AQE on the child plan of V1/V2 write commands rather than the command itself.
    
    Apply AQE on write commands with child plan will expose `LogicalQueryStage` to `Analyzer` while it should hider under `AdaptiveSparkPlanExec` only to avoid unexpected broken.
    
    No.
    
    Pass Jenkins.
    
    Closes #27701 from Ngone51/skip_v2_commands.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 5d4f5d36a26f07327dcc0928d437dd23582ad756)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala |  5 ++++-
 .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala  | 10 ++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 621c063..ea586f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
+import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
 import org.apache.spark.sql.execution.exchange.Exchange
 import org.apache.spark.sql.internal.SQLConf
 
@@ -45,6 +46,8 @@ case class InsertAdaptiveSparkPlan(
   private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
     case _ if !conf.adaptiveExecutionEnabled => plan
     case _: ExecutedCommandExec => plan
+    case c: DataWritingCommandExec => c.copy(child = apply(c.child))
+    case c: V2CommandExec => c.withNewChildren(c.children.map(apply))
     case _ if shouldApplyAQE(plan, isSubquery) =>
       if (supportAdaptive(plan)) {
         try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index b8ac4ddc..64dd9aa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe
 import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildRight, SortMergeJoinExec}
 import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
@@ -795,4 +796,13 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of write commands") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      val plan = sql("CREATE TABLE t1 AS SELECT 1 col").queryExecution.executedPlan
+      assert(plan.isInstanceOf[DataWritingCommandExec])
+      assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec])
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org