You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/14 05:24:19 UTC
[spark] branch branch-3.0 updated: [SPARK-30953][SQL]
InsertAdaptiveSparkPlan should apply AQE on child plan of write commands
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ca1adf7 [SPARK-30953][SQL] InsertAdaptiveSparkPlan should apply AQE on child plan of write commands
ca1adf7 is described below
commit ca1adf76b1d1fc15fa8d03450080072062d6f672
Author: yi.wu <yi...@databricks.com>
AuthorDate: Tue Apr 14 05:18:58 2020 +0000
[SPARK-30953][SQL] InsertAdaptiveSparkPlan should apply AQE on child plan of write commands
This PR changes `InsertAdaptiveSparkPlan` to apply AQE on the child plan of V1/V2 write commands rather than the command itself.
Apply AQE on write commands with child plan will expose `LogicalQueryStage` to `Analyzer` while it should hider under `AdaptiveSparkPlanExec` only to avoid unexpected broken.
No.
Pass Jenkins.
Closes #27701 from Ngone51/skip_v2_commands.
Authored-by: yi.wu <yi...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 5d4f5d36a26f07327dcc0928d437dd23582ad756)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala | 5 ++++-
.../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 10 ++++++++++
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 621c063..ea586f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec}
+import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.internal.SQLConf
@@ -45,6 +46,8 @@ case class InsertAdaptiveSparkPlan(
private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
case _ if !conf.adaptiveExecutionEnabled => plan
case _: ExecutedCommandExec => plan
+ case c: DataWritingCommandExec => c.copy(child = apply(c.child))
+ case c: V2CommandExec => c.withNewChildren(c.children.map(apply))
case _ if shouldApplyAQE(plan, isSubquery) =>
if (supportAdaptive(plan)) {
try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index b8ac4ddc..64dd9aa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.execution.{ReusedSubqueryExec, ShuffledRowRDD, SparkPlan}
import org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildRight, SortMergeJoinExec}
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
@@ -795,4 +796,13 @@ class AdaptiveQueryExecSuite
}
}
}
+
+ test("SPARK-30953: InsertAdaptiveSparkPlan should apply AQE on child plan of write commands") {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ val plan = sql("CREATE TABLE t1 AS SELECT 1 col").queryExecution.executedPlan
+ assert(plan.isInstanceOf[DataWritingCommandExec])
+ assert(plan.asInstanceOf[DataWritingCommandExec].child.isInstanceOf[AdaptiveSparkPlanExec])
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org