You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/14 13:03:52 UTC

[spark] branch master updated: [SPARK-42778][SQL] QueryStageExec should respect supportsRowBased

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 13307f1ecea [SPARK-42778][SQL] QueryStageExec should respect supportsRowBased
13307f1ecea is described below

commit 13307f1ecea1e81ff82a9eb348a7b43e4fd1a332
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Mar 14 21:03:35 2023 +0800

    [SPARK-42778][SQL] QueryStageExec should respect supportsRowBased
    
    ### What changes were proposed in this pull request?
    
    Make `QueryStageExec` respect plan.supportsRowBased
    
    ### Why are the changes needed?
    
    It is a long time issue that if the plan support both columnar and row, then it would add a unnecessary `ColumnarToRow`
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    add test
    
    Closes #40407 from ulysses-you/SPARK-42778.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/adaptive/QueryStageExec.scala    |  1 +
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala  | 16 +++++++++++++++-
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index 72e7fc937f2..8a2abadd19e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -97,6 +97,7 @@ abstract class QueryStageExec extends LeafExecNode {
   override def executeToIterator(): Iterator[InternalRow] = plan.executeToIterator()
 
   protected override def doExecute(): RDD[InternalRow] = plan.execute()
+  override def supportsRowBased: Boolean = plan.supportsRowBased
   override def supportsColumnar: Boolean = plan.supportsColumnar
   protected override def doExecuteColumnar(): RDD[ColumnarBatch] = plan.executeColumnar()
   override def doExecuteBroadcast[T](): Broadcast[T] = plan.executeBroadcast()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 8ed31e1968c..faf1f911b1f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
-import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec}
+import org.apache.spark.sql.execution.{CollectLimitExec, ColumnarToRowExec, LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, SparkPlanInfo, UnionExec}
 import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
@@ -2716,6 +2716,20 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-42778: QueryStageExec should respect supportsRowBased") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+      withTempView("t") {
+        Seq(1).toDF("c1").createOrReplaceTempView("t")
+        spark.catalog.cacheTable("t")
+        val df = spark.table("t")
+        df.collect()
+        assert(collect(df.queryExecution.executedPlan) {
+          case c: ColumnarToRowExec => c
+        }.isEmpty)
+      }
+    }
+  }
+
   test("SPARK-42101: Apply AQE if contains nested AdaptiveSparkPlanExec") {
     withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true") {
       val df = spark.range(3).repartition().cache()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org