You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/14 13:03:52 UTC
[spark] branch master updated: [SPARK-42778][SQL] QueryStageExec should respect supportsRowBased
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 13307f1ecea [SPARK-42778][SQL] QueryStageExec should respect supportsRowBased
13307f1ecea is described below
commit 13307f1ecea1e81ff82a9eb348a7b43e4fd1a332
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Mar 14 21:03:35 2023 +0800
[SPARK-42778][SQL] QueryStageExec should respect supportsRowBased
### What changes were proposed in this pull request?
Make `QueryStageExec` respect plan.supportsRowBased
### Why are the changes needed?
It is a long time issue that if the plan support both columnar and row, then it would add a unnecessary `ColumnarToRow`
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
add test
Closes #40407 from ulysses-you/SPARK-42778.
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/execution/adaptive/QueryStageExec.scala | 1 +
.../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 16 +++++++++++++++-
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index 72e7fc937f2..8a2abadd19e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -97,6 +97,7 @@ abstract class QueryStageExec extends LeafExecNode {
override def executeToIterator(): Iterator[InternalRow] = plan.executeToIterator()
protected override def doExecute(): RDD[InternalRow] = plan.execute()
+ override def supportsRowBased: Boolean = plan.supportsRowBased
override def supportsColumnar: Boolean = plan.supportsColumnar
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = plan.executeColumnar()
override def doExecuteBroadcast[T](): Broadcast[T] = plan.executeBroadcast()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 8ed31e1968c..faf1f911b1f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe
import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
-import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec}
+import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.DataWritingCommandExec
@@ -2716,6 +2716,20 @@ class AdaptiveQueryExecSuite
}
}
+ test("SPARK-42778: QueryStageExec should respect supportsRowBased") {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ withTempView("t") {
+ Seq(1).toDF("c1").createOrReplaceTempView("t")
+ spark.catalog.cacheTable("t")
+ val df = spark.table("t")
+ df.collect()
+ assert(collect(df.queryExecution.executedPlan) {
+ case c: ColumnarToRowExec => c
+ }.isEmpty)
+ }
+ }
+ }
+
test("SPARK-42101: Apply AQE if contains nested AdaptiveSparkPlanExec") {
withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
val df = spark.range(3).repartition().cache()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org