You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/25 12:09:09 UTC
[spark] branch branch-3.3 updated: [SPARK-38644][SQL] DS V2 topN push-down supports project with alias
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 9277353 [SPARK-38644][SQL] DS V2 topN push-down supports project with alias
9277353 is described below
commit 9277353b23df4b54dfb65e948e1b3d001806929b
Author: Jiaan Geng <be...@163.com>
AuthorDate: Fri Mar 25 20:00:39 2022 +0800
[SPARK-38644][SQL] DS V2 topN push-down supports project with alias
### What changes were proposed in this pull request?
Currently, Spark DS V2 topN push-down doesn't supports project with alias.
This PR let it works good with alias.
**Example**:
the origin plan show below:
```
Sort [mySalary#10 ASC NULLS FIRST], true
+- Project [NAME#1, SALARY#2 AS mySalary#10]
+- ScanBuilderHolder [DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4], RelationV2[DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession7fd4b9ec,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true),StructField(IS_MANAGER,BooleanType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions3c8e4a82)
```
The `pushedLimit` and `sortOrders` of `JDBCScanBuilder` are empty.
If we can push down the top n, then the plan will be:
```
Project [NAME#1, SALARY#2 AS mySalary#10]
+- ScanBuilderHolder [DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4], RelationV2[DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession7fd4b9ec,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true),StructField(IS_MANAGER,BooleanType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions3c8e4a82)
```
The `pushedLimit` of `JDBCScanBuilder` will be `1` and `sortOrders` of `JDBCScanBuilder` will be `SALARY ASC NULLS FIRST`.
### Why are the changes needed?
Alias is more useful.
### Does this PR introduce _any_ user-facing change?
'Yes'.
Users could see DS V2 topN push-down supports project with alias.
### How was this patch tested?
New tests.
Closes #35961 from beliefer/SPARK-38644.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../datasources/v2/V2ScanRelationPushDown.scala | 15 ++++++++------
.../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 24 ++++++++++++++++++++++
2 files changed, 33 insertions(+), 6 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index c699e92..eaa30f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -19,14 +19,14 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.mutable
-import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, And, Attribute, AttributeReference, Cast, Divide, DivideDTInterval, DivideYMInterval, EqualTo, Expression, If, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, And, Attribute, AttributeReference, Cast, Divide, DivideDTInterval, DivideYMInterval, EqualTo, Expression, If, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.CollapseProject
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, Limit, LocalLimit, LogicalPlan, Project, Sample, Sort}
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.expressions.SortOrder
+import org.apache.spark.sql.connector.expressions.{SortOrder => V2SortOrder}
import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Avg, Count, GeneralAggregateFunc, Sum}
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan}
@@ -374,9 +374,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit
sHolder.pushedLimit = Some(limit)
}
operation
- case s @ Sort(order, _, operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder))
- if filter.isEmpty =>
- val orders = DataSourceStrategy.translateSortOrders(order)
+ case s @ Sort(order, _, operation @ ScanOperation(project, filter, sHolder: ScanBuilderHolder))
+ if filter.isEmpty && CollapseProject.canCollapseExpressions(
+ order, project, alwaysInline = true) =>
+ val aliasMap = getAliasMap(project)
+ val newOrder = order.map(replaceAlias(_, aliasMap)).asInstanceOf[Seq[SortOrder]]
+ val orders = DataSourceStrategy.translateSortOrders(newOrder)
if (orders.length == order.length) {
val topNPushed = PushDownUtils.pushTopN(sHolder.builder, orders.toArray, limit)
if (topNPushed) {
@@ -427,7 +430,7 @@ case class ScanBuilderHolder(
builder: ScanBuilder) extends LeafNode {
var pushedLimit: Option[Int] = None
- var sortOrders: Seq[SortOrder] = Seq.empty[SortOrder]
+ var sortOrders: Seq[V2SortOrder] = Seq.empty[V2SortOrder]
var pushedSample: Option[TableSampleInfo] = None
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index e7e9174..3ab87ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -267,6 +267,30 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(df8, Seq(Row(2, "alex", 12000.00, 1200.0, false)))
}
+ test("simple scan with top N: order by with alias") {
+ val df1 = spark.read
+ .table("h2.test.employee")
+ .select($"NAME", $"SALARY".as("mySalary"))
+ .sort("mySalary")
+ .limit(1)
+ checkSortRemoved(df1)
+ checkPushedInfo(df1,
+ "PushedFilters: [], PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] LIMIT 1, ")
+ checkAnswer(df1, Seq(Row("cathy", 9000.00)))
+
+ val df2 = spark.read
+ .table("h2.test.employee")
+ .select($"DEPT", $"NAME", $"SALARY".as("mySalary"))
+ .filter($"DEPT" > 1)
+ .sort("mySalary")
+ .limit(1)
+ checkSortRemoved(df2)
+ checkPushedInfo(df2,
+ "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], " +
+ "PushedTopN: ORDER BY [SALARY ASC NULLS FIRST] LIMIT 1, ")
+ checkAnswer(df2, Seq(Row(2, "david", 10000.00)))
+ }
+
test("scan with filter push-down") {
val df = spark.table("h2.test.people").filter($"id" > 1)
checkFiltersRemoved(df)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org