You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/25 12:09:09 UTC

[spark] branch branch-3.3 updated: [SPARK-38644][SQL] DS V2 topN push-down supports project with alias

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 9277353  [SPARK-38644][SQL] DS V2 topN push-down supports project with alias
9277353 is described below

commit 9277353b23df4b54dfb65e948e1b3d001806929b
Author: Jiaan Geng <be...@163.com>
AuthorDate: Fri Mar 25 20:00:39 2022 +0800

    [SPARK-38644][SQL] DS V2 topN push-down supports project with alias
    
    ### What changes were proposed in this pull request?
    Currently, Spark DS V2 topN push-down doesn't supports project with alias.
    
    This PR let it works good with alias.
    
    **Example**:
    the origin plan show below:
    ```
    Sort [mySalary#10 ASC NULLS FIRST], true
    +- Project [NAME#1, SALARY#2 AS mySalary#10]
       +- ScanBuilderHolder [DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4], RelationV2[DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession7fd4b9ec,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true),StructField(IS_MANAGER,BooleanType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions3c8e4a82)
    ```
    The `pushedLimit` and `sortOrders` of `JDBCScanBuilder` are empty.
    
    If we can push down the top n, then the plan will be:
    ```
    Project [NAME#1, SALARY#2 AS mySalary#10]
    +- ScanBuilderHolder [DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4], RelationV2[DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession7fd4b9ec,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true),StructField(IS_MANAGER,BooleanType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions3c8e4a82)
    ```
    The `pushedLimit` of `JDBCScanBuilder` will be `1` and `sortOrders` of `JDBCScanBuilder` will be `SALARY ASC NULLS FIRST`.
    
    ### Why are the changes needed?
    Alias is more useful.
    
    ### Does this PR introduce _any_ user-facing change?
    'Yes'.
    Users could see DS V2 topN push-down supports project with alias.
    
    ### How was this patch tested?
    New tests.
    
    Closes #35961 from beliefer/SPARK-38644.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../datasources/v2/V2ScanRelationPushDown.scala    | 15 ++++++++------
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala    | 24 ++++++++++++++++++++++
 2 files changed, 33 insertions(+), 6 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index c699e92..eaa30f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -19,14 +19,14 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, And, Attribute, AttributeReference, Cast, Divide, DivideDTInterval, DivideYMInterval, EqualTo, Expression, If, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, And, Attribute, AttributeReference, Cast, Divide, DivideDTInterval, DivideYMInterval, EqualTo, Expression, If, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.aggregate
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.optimizer.CollapseProject
 import org.apache.spark.sql.catalyst.planning.ScanOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, Limit, LocalLimit, LogicalPlan, Project, Sample, Sort}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.expressions.SortOrder
+import org.apache.spark.sql.connector.expressions.{SortOrder => V2SortOrder}
 import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Avg, Count, GeneralAggregateFunc, Sum}
 import org.apache.spark.sql.connector.expressions.filter.Predicate
 import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan}
@@ -374,9 +374,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit
         sHolder.pushedLimit = Some(limit)
       }
       operation
-    case s @ Sort(order, _, operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder))
-        if filter.isEmpty =>
-      val orders = DataSourceStrategy.translateSortOrders(order)
+    case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder))
+        if filter.isEmpty && CollapseProject.canCollapseExpressions(
+          order, project, alwaysInline = true) =>
+      val aliasMap = getAliasMap(project)
+      val newOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]]
+      val orders = DataSourceStrategy.translateSortOrders(newOrder)
       if (orders.length == order.length) {
         val topNPushed = PushDownUtils.pushTopN(sHolder.builder, orders.toArray, limit)
         if (topNPushed) {
@@ -427,7 +430,7 @@ case class ScanBuilderHolder(
     builder: ScanBuilder) extends LeafNode {
   var pushedLimit: Option[Int] = None
 
-  var sortOrders: Seq[SortOrder] = Seq.empty[SortOrder]
+  var sortOrders: Seq[V2SortOrder] = Seq.empty[V2SortOrder]
 
   var pushedSample: Option[TableSampleInfo] = None
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index e7e9174..3ab87ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -267,6 +267,30 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     checkAnswer(df8, Seq(Row(2, "alex", 12000.00, 1200.0, false)))
   }
 
+  test("simple scan with top N: order by with alias") {
+    val df1 = spark.read
+      .table("h2.test.employee")
+      .select($"NAME", $"SALARY".as("mySalary"))
+      .sort("mySalary")
+      .limit(1)
+    checkSortRemoved(df1)
+    checkPushedInfo(df1,
+      "PushedFilters: [], PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] LIMIT 1, ")
+    checkAnswer(df1, Seq(Row("cathy", 9000.00)))
+
+    val df2 = spark.read
+      .table("h2.test.employee")
+      .select($"DEPT", $"NAME", $"SALARY".as("mySalary"))
+      .filter($"DEPT" > 1)
+      .sort("mySalary")
+      .limit(1)
+    checkSortRemoved(df2)
+    checkPushedInfo(df2,
+      "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], " +
+        "PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] LIMIT 1, ")
+    checkAnswer(df2, Seq(Row(2, "david", 10000.00)))
+  }
+
   test("scan with filter push-down") {
     val df = spark.table("h2.test.people").filter($"id" > 1)
     checkFiltersRemoved(df)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org