You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/03/12 07:28:06 UTC
[GitHub] [spark] beliefer opened a new pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
beliefer opened a new pull request #35823:
URL: https://github.com/apache/spark/pull/35823
### What changes were proposed in this pull request?
Currently, Spark DS V2 aggregate push-down doesn't supports project with alias.
Refer https://github.com/apache/spark/blob/c91c2e9afec0d5d5bbbd2e155057fe409c5bb928/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala#L96
This PR let it works good with alias.
### Why are the changes needed?
Alias is more useful.
### Does this PR introduce _any_ user-facing change?
'Yes'.
Users could see DS V2 aggregate push-down supports project with alias.
### How was this patch tested?
New tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r828723083
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -93,22 +92,28 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ if filters.isEmpty && project.forall(_.deterministic) =>
sHolder.builder match {
case r: SupportsPushDownAggregates =>
+ val aliasMap = getAliasMap(project)
+ val newResultExpressions = resultExpressions.map(replaceAliasWithAttr(_, aliasMap))
+ val newGroupingExpressions = groupingExpressions.map {
+ case e: NamedExpression => replaceAliasWithAttr(e, aliasMap)
+ case other => other
+ }
Review comment:
can we make the code more explicit? We need to clearly show the steps
1. collapse aggregate and project
2. remove the alias from aggregate functions and group by expressions (this logic should be put here instead of `AliasHelper` as this is not a common logic)
3. push down agg
4. add back alias for group by expressions only.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826955158
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +91,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// update the scan builder with agg pushdown and return a new plan with agg pushed
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
- case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
Review comment:
We need to clearly describe the final plan. This is more complicated now as the project may contain arbitrary expressions.
For example
```
Aggregate(sum(a + b) + max(a - c),
Project(x + 1 as a, x * 2 as b , x + y as c,
Table(x, y, z)
)
)
```
what the final plan looks like if the aggregate can be pushed, or can be partial pushed, or can't be pushed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
beliefer commented on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1066001189
> I actually have an alias over aggregate test in FileSource too. Could you please change that one as well?
> https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala#L187
Thank you for the remind.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r825591018
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -779,15 +779,19 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(df, Seq(Row(1d), Row(1d), Row(null)))
}
- test("scan with aggregate push-down: aggregate over alias NOT push down") {
+ test("scan with aggregate push-down: aggregate over alias push down") {
Review comment:
It doesn't matter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] codecov-commenter edited a comment on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1065861634
# [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#35823](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c91c2e9) into [master](https://codecov.io/gh/apache/spark/commit/c483e2977cbc6ae33d999c9c9d1dbacd9c53d85a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c483e29) will **increase** coverage by `0.00%`.
> The diff coverage is `90.62%`.
> :exclamation: Current head c91c2e9 differs from pull request most recent head 787e0dd. Consider uploading reports for the commit 787e0dd to get more accurate results
[![Impacted file tree graph](https://codecov.io/gh/apache/spark/pull/35823/graphs/tree.svg?width=650&height=150&src=pr&token=R9pHLWgWi8&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #35823 +/- ##
=======================================
Coverage 91.19% 91.19%
=======================================
Files 297 297
Lines 64696 64724 +28
Branches 9919 9921 +2
=======================================
+ Hits 58999 59025 +26
- Misses 4330 4332 +2
Partials 1367 1367
```
| Flag | Coverage Δ | |
|---|---|---|
| unittests | `91.17% <90.62%> (+<0.01%)` | :arrow_up: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [python/pyspark/sql/tests/test\_udf.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL3Rlc3RzL3Rlc3RfdWRmLnB5) | `95.56% <ø> (ø)` | |
| [python/pyspark/pandas/indexes/category.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvY2F0ZWdvcnkucHk=) | `93.33% <66.66%> (-0.92%)` | :arrow_down: |
| [python/pyspark/pandas/indexes/datetimes.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvZGF0ZXRpbWVzLnB5) | `95.23% <66.66%> (-0.52%)` | :arrow_down: |
| [python/pyspark/pandas/indexes/timedelta.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvdGltZWRlbHRhLnB5) | `75.40% <66.66%> (-0.46%)` | :arrow_down: |
| [python/pyspark/pandas/base.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2Jhc2UucHk=) | `94.14% <100.00%> (+0.03%)` | :arrow_up: |
| [python/pyspark/pandas/frame.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2ZyYW1lLnB5) | `97.06% <100.00%> (+<0.01%)` | :arrow_up: |
| [python/pyspark/pandas/tests/test\_dataframe.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Rlc3RzL3Rlc3RfZGF0YWZyYW1lLnB5) | `97.34% <100.00%> (+0.01%)` | :arrow_up: |
| [python/pyspark/pandas/tests/test\_series.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Rlc3RzL3Rlc3Rfc2VyaWVzLnB5) | `96.24% <100.00%> (+<0.01%)` | :arrow_up: |
| [...n/pyspark/mllib/tests/test\_streaming\_algorithms.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvbWxsaWIvdGVzdHMvdGVzdF9zdHJlYW1pbmdfYWxnb3JpdGhtcy5weQ==) | `76.34% <0.00%> (-0.36%)` | :arrow_down: |
| [python/pyspark/streaming/tests/test\_context.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3RyZWFtaW5nL3Rlc3RzL3Rlc3RfY29udGV4dC5weQ==) | `98.42% <0.00%> (ø)` | |
| ... and [1 more](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [216b972...787e0dd](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826944867
##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala
##########
@@ -77,6 +85,17 @@ trait AliasHelper {
}).asInstanceOf[NamedExpression]
}
+ /**
+ * Replace all alias, with the aliased attribute.
+ */
+ protected def replaceAliasWithAttr(
Review comment:
why can't we use the existing `replaceAlias` method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r827564615
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +91,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// update the scan builder with agg pushdown and return a new plan with agg pushed
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
- case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
Review comment:
```
Aggregate [myDept#0], [((cast(sum(CheckOverflow((promote_precision(cast(mySalary#1 as decimal(23,2))) + promote_precision(cast(yourSalary#2 as decimal(23,2)))), DecimalType(23,2))) as double) + max((cast(mySalary#1 as double) - bonus#6))) + cast(myDept#0 as double)) AS ((sum((mySalary + yourSalary)) + max((mySalary - bonus))) + myDept)#9]
+- Project [dept#3 AS myDept#0, CheckOverflow((promote_precision(cast(salary#5 as decimal(21,2))) + 1.00), DecimalType(21,2)) AS mySalary#1, CheckOverflow((promote_precision(salary#5) * 2.00), DecimalType(22,2)) AS yourSalary#2, bonus#6]
+- ScanBuilderHolder [DEPT#3, NAME#4, SALARY#5, BONUS#6], RelationV2[DEPT#3, NAME#4, SALARY#5, BONUS#6] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession@463a1f47,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions@47224d5d)
```
`cast`, `CheckOverflow`, `promote_precision` not supported in aggregate push-down.
I updated description of PR and add a plan.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826946482
##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala
##########
@@ -77,6 +85,17 @@ trait AliasHelper {
}).asInstanceOf[NamedExpression]
}
+ /**
+ * Replace all alias, with the aliased attribute.
+ */
+ protected def replaceAliasWithAttr(
+ expr: NamedExpression,
+ aliasMap: AttributeMap[Alias]): NamedExpression = {
+ replaceAliasButKeepName(expr, aliasMap).transform {
+ case Alias(attr: Attribute, _) => attr
Review comment:
why is this line neded?
##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala
##########
@@ -77,6 +85,17 @@ trait AliasHelper {
}).asInstanceOf[NamedExpression]
}
+ /**
+ * Replace all alias, with the aliased attribute.
+ */
+ protected def replaceAliasWithAttr(
+ expr: NamedExpression,
+ aliasMap: AttributeMap[Alias]): NamedExpression = {
+ replaceAliasButKeepName(expr, aliasMap).transform {
+ case Alias(attr: Attribute, _) => attr
Review comment:
why is this line needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
beliefer commented on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1074989781
https://github.com/apache/spark/pull/35932 replaces this PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r827564615
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +91,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// update the scan builder with agg pushdown and return a new plan with agg pushed
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
- case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
Review comment:
```
Aggregate [myDept#0], [((cast(sum(CheckOverflow((promote_precision(cast(mySalary#1 as decimal(23,2))) + promote_precision(cast(yourSalary#2 as decimal(23,2)))), DecimalType(23,2))) as double) + max((cast(mySalary#1 as double) - bonus#6))) + cast(myDept#0 as double)) AS ((sum((mySalary + yourSalary)) + max((mySalary - bonus))) + myDept)#9]
+- Project [dept#3 AS myDept#0, CheckOverflow((promote_precision(cast(salary#5 as decimal(21,2))) + 1.00), DecimalType(21,2)) AS mySalary#1, CheckOverflow((promote_precision(salary#5) * 2.00), DecimalType(22,2)) AS yourSalary#2, bonus#6]
+- ScanBuilderHolder [DEPT#3, NAME#4, SALARY#5, BONUS#6], RelationV2[DEPT#3, NAME#4, SALARY#5, BONUS#6] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession@463a1f47,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions@47224d5d)
```
cast, CheckOverflow, promote_precision not supported in aggregate push-down.
I updated description of PR and add a plan.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer closed pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
beliefer closed pull request #35823:
URL: https://github.com/apache/spark/pull/35823
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r828724079
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -93,22 +92,28 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ if filters.isEmpty && project.forall(_.deterministic) =>
sHolder.builder match {
case r: SupportsPushDownAggregates =>
+ val aliasMap = getAliasMap(project)
+ val newResultExpressions = resultExpressions.map(replaceAliasWithAttr(_, aliasMap))
+ val newGroupingExpressions = groupingExpressions.map {
+ case e: NamedExpression => replaceAliasWithAttr(e, aliasMap)
+ case other => other
+ }
Review comment:
Thank you for the good idea.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] huaxingao commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r825331821
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -234,7 +257,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// Aggregate [c2#10], [min(min(c1)#21) AS min(c1)#17, max(max(c1)#22) AS max(c1)#18]
// +- RelationV2[c2#10, min(c1)#21, max(c1)#22] ...
// scalastyle:on
- plan.transformExpressions {
+ val agg = plan.transformExpressions {
Review comment:
nit: unnecessary change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r825953066
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +92,37 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// update the scan builder with agg pushdown and return a new plan with agg pushed
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
- case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
+ project.forall(p => p.isInstanceOf[AttributeReference] || p.isInstanceOf[Alias]) =>
Review comment:
Thank you for the reminder.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r828720488
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -93,22 +92,28 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ if filters.isEmpty && project.forall(_.deterministic) =>
sHolder.builder match {
case r: SupportsPushDownAggregates =>
+ val aliasMap = getAliasMap(project)
+ val newResultExpressions = resultExpressions.map(replaceAliasWithAttr(_, aliasMap))
+ val newGroupingExpressions = groupingExpressions.map {
+ case e: NamedExpression => replaceAliasWithAttr(e, aliasMap)
+ case other => other
+ }
val aggExprToOutputOrdinal = mutable.HashMap.empty[Expression, Int]
- val aggregates = collectAggregates(resultExpressions, aggExprToOutputOrdinal)
+ val aggregates = collectAggregates(newResultExpressions, aggExprToOutputOrdinal)
val normalizedAggregates = DataSourceStrategy.normalizeExprs(
aggregates, sHolder.relation.output).asInstanceOf[Seq[AggregateExpression]]
val normalizedGroupingExpressions = DataSourceStrategy.normalizeExprs(
- groupingExpressions, sHolder.relation.output)
+ newGroupingExpressions, sHolder.relation.output)
val translatedAggregates = DataSourceStrategy.translateAggregation(
normalizedAggregates, normalizedGroupingExpressions)
- val (finalResultExpressions, finalAggregates, finalTranslatedAggregates) = {
+ val (selectedResultExpressions, selectedAggregates, selectedTranslatedAggregates) = {
Review comment:
It's not confirmed yet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826955158
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +91,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// update the scan builder with agg pushdown and return a new plan with agg pushed
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
- case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
Review comment:
We need to clearly describe the final plan. This is more complicated now as the project may contain arbitrary expressions.
For example
```
Aggregate(sum(a + b) + max(a - c) + x, group by x,
Project(x, x + 1 as a, x * 2 as b , x + y as c,
Table(x, y, z)
)
)
```
what the final plan looks like if the aggregate can be pushed, or can be partial pushed, or can't be pushed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826955158
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +91,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// update the scan builder with agg pushdown and return a new plan with agg pushed
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
- case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
Review comment:
We need to clearly describe the final plan. This is more complicated now as the project may contain arbitrary expressions.
For example
```
Aggregate(sum(a + b) + max(a - c) + a, group by a,
Project(x + 1 as a, x * 2 as b , x + y as c,
Table(x, y, z)
)
)
```
what the final plan looks like if the aggregate can be pushed, or can be partial pushed, or can't be pushed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r827552580
##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala
##########
@@ -77,6 +85,17 @@ trait AliasHelper {
}).asInstanceOf[NamedExpression]
}
+ /**
+ * Replace all alias, with the aliased attribute.
+ */
+ protected def replaceAliasWithAttr(
+ expr: NamedExpression,
+ aliasMap: AttributeMap[Alias]): NamedExpression = {
+ replaceAliasButKeepName(expr, aliasMap).transform {
+ case Alias(attr: Attribute, _) => attr
Review comment:
Attribute in groupingExpressions may be alias. Before push down SQL to JDBC, I want replace the alias Attribute with origin Attribute, not the Alias.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r825933825
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +92,37 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// update the scan builder with agg pushdown and return a new plan with agg pushed
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
- case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
+ project.forall(p => p.isInstanceOf[AttributeReference] || p.isInstanceOf[Alias]) =>
Review comment:
Please follow the predicate pushdown optimizer rule and leverage `AliasHelper` to do it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] codecov-commenter edited a comment on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1065861634
# [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#35823](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c91c2e9) into [master](https://codecov.io/gh/apache/spark/commit/c483e2977cbc6ae33d999c9c9d1dbacd9c53d85a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c483e29) will **decrease** coverage by `14.31%`.
> The diff coverage is `70.58%`.
> :exclamation: Current head c91c2e9 differs from pull request most recent head 787e0dd. Consider uploading reports for the commit 787e0dd to get more accurate results
[![Impacted file tree graph](https://codecov.io/gh/apache/spark/pull/35823/graphs/tree.svg?width=650&height=150&src=pr&token=R9pHLWgWi8&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #35823 +/- ##
===========================================
- Coverage 91.19% 76.88% -14.32%
===========================================
Files 297 243 -54
Lines 64696 46054 -18642
Branches 9919 8139 -1780
===========================================
- Hits 58999 35408 -23591
- Misses 4330 9168 +4838
- Partials 1367 1478 +111
```
| Flag | Coverage Δ | |
|---|---|---|
| unittests | `76.87% <70.58%> (-14.30%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [python/pyspark/pandas/frame.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2ZyYW1lLnB5) | `33.01% <33.33%> (-64.06%)` | :arrow_down: |
| [python/pyspark/pandas/indexes/category.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvY2F0ZWdvcnkucHk=) | `93.33% <66.66%> (-0.92%)` | :arrow_down: |
| [python/pyspark/pandas/indexes/datetimes.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvZGF0ZXRpbWVzLnB5) | `79.76% <66.66%> (-16.00%)` | :arrow_down: |
| [python/pyspark/pandas/indexes/timedelta.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvdGltZWRlbHRhLnB5) | `72.13% <66.66%> (-3.74%)` | :arrow_down: |
| [python/pyspark/pandas/base.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2Jhc2UucHk=) | `83.51% <100.00%> (-10.61%)` | :arrow_down: |
| [python/pyspark/sql/observation.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL29ic2VydmF0aW9uLnB5) | `26.08% <0.00%> (-69.57%)` | :arrow_down: |
| [python/pyspark/pandas/series.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Nlcmllcy5weQ==) | `39.85% <0.00%> (-56.08%)` | :arrow_down: |
| [python/pyspark/sql/streaming.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL3N0cmVhbWluZy5weQ==) | `27.03% <0.00%> (-54.95%)` | :arrow_down: |
| [python/pyspark/sql/sql\_formatter.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL3NxbF9mb3JtYXR0ZXIucHk=) | `32.50% <0.00%> (-52.50%)` | :arrow_down: |
| ... and [118 more](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [216b972...787e0dd](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] codecov-commenter commented on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1065861634
# [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#35823](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c91c2e9) into [master](https://codecov.io/gh/apache/spark/commit/c483e2977cbc6ae33d999c9c9d1dbacd9c53d85a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c483e29) will **decrease** coverage by `29.55%`.
> The diff coverage is `70.58%`.
> :exclamation: Current head c91c2e9 differs from pull request most recent head 787e0dd. Consider uploading reports for the commit 787e0dd to get more accurate results
[![Impacted file tree graph](https://codecov.io/gh/apache/spark/pull/35823/graphs/tree.svg?width=650&height=150&src=pr&token=R9pHLWgWi8&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #35823 +/- ##
===========================================
- Coverage 91.19% 61.63% -29.56%
===========================================
Files 297 202 -95
Lines 64696 40330 -24366
Branches 9919 7528 -2391
===========================================
- Hits 58999 24859 -34140
- Misses 4330 14256 +9926
+ Partials 1367 1215 -152
```
| Flag | Coverage Δ | |
|---|---|---|
| unittests | `61.63% <70.58%> (-29.54%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [python/pyspark/pandas/frame.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2ZyYW1lLnB5) | `33.01% <33.33%> (-64.06%)` | :arrow_down: |
| [python/pyspark/pandas/indexes/category.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvY2F0ZWdvcnkucHk=) | `93.33% <66.66%> (-0.92%)` | :arrow_down: |
| [python/pyspark/pandas/indexes/datetimes.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvZGF0ZXRpbWVzLnB5) | `79.76% <66.66%> (-16.00%)` | :arrow_down: |
| [python/pyspark/pandas/indexes/timedelta.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvdGltZWRlbHRhLnB5) | `72.13% <66.66%> (-3.74%)` | :arrow_down: |
| [python/pyspark/pandas/base.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2Jhc2UucHk=) | `83.51% <100.00%> (-10.61%)` | :arrow_down: |
| [python/pyspark/join.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvam9pbi5weQ==) | `12.12% <0.00%> (-81.82%)` | :arrow_down: |
| [python/pyspark/sql/observation.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL29ic2VydmF0aW9uLnB5) | `26.08% <0.00%> (-69.57%)` | :arrow_down: |
| [python/pyspark/ml/tuning.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvbWwvdHVuaW5nLnB5) | `25.03% <0.00%> (-67.46%)` | :arrow_down: |
| [python/pyspark/streaming/dstream.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3RyZWFtaW5nL2RzdHJlYW0ucHk=) | `22.65% <0.00%> (-61.72%)` | :arrow_down: |
| ... and [196 more](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [216b972...787e0dd](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] huaxingao commented on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
huaxingao commented on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1065933348
I actually have an alias over aggregate test in FileSource too. Could you please change that one as well?
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala#L187
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
beliefer commented on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1066640576
ping @huaxingao cc @cloud-fan
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826940304
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +92,37 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// update the scan builder with agg pushdown and return a new plan with agg pushed
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
- case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
+ project.forall(p => p.isInstanceOf[AttributeReference] || p.isInstanceOf[Alias]) =>
Review comment:
if we follow it, then here should be `project.forall(_.deterministic)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r828000530
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -93,22 +92,28 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ if filters.isEmpty && project.forall(_.deterministic) =>
sHolder.builder match {
case r: SupportsPushDownAggregates =>
+ val aliasMap = getAliasMap(project)
+ val newResultExpressions = resultExpressions.map(replaceAliasWithAttr(_, aliasMap))
+ val newGroupingExpressions = groupingExpressions.map {
+ case e: NamedExpression => replaceAliasWithAttr(e, aliasMap)
+ case other => other
+ }
val aggExprToOutputOrdinal = mutable.HashMap.empty[Expression, Int]
- val aggregates = collectAggregates(resultExpressions, aggExprToOutputOrdinal)
+ val aggregates = collectAggregates(newResultExpressions, aggExprToOutputOrdinal)
val normalizedAggregates = DataSourceStrategy.normalizeExprs(
aggregates, sHolder.relation.output).asInstanceOf[Seq[AggregateExpression]]
val normalizedGroupingExpressions = DataSourceStrategy.normalizeExprs(
- groupingExpressions, sHolder.relation.output)
+ newGroupingExpressions, sHolder.relation.output)
val translatedAggregates = DataSourceStrategy.translateAggregation(
normalizedAggregates, normalizedGroupingExpressions)
- val (finalResultExpressions, finalAggregates, finalTranslatedAggregates) = {
+ val (selectedResultExpressions, selectedAggregates, selectedTranslatedAggregates) = {
Review comment:
why do we rename these?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826947256
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +91,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// update the scan builder with agg pushdown and return a new plan with agg pushed
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
- case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
+ project.forall(p => p.isInstanceOf[AttributeReference] || p.isInstanceOf[Alias]) =>
sHolder.builder match {
case r: SupportsPushDownAggregates =>
+ val aliasMap = getAliasMap(project)
+ val newResultExpressions = resultExpressions.map(replaceAliasWithAttr(_, aliasMap))
+ val newGroupingExpressions = groupingExpressions.asInstanceOf[Seq[NamedExpression]]
Review comment:
`groupingExpressions` may not be `NamedExpression`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] codecov-commenter edited a comment on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1065861634
# [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#35823](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c91c2e9) into [master](https://codecov.io/gh/apache/spark/commit/c483e2977cbc6ae33d999c9c9d1dbacd9c53d85a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c483e29) will **decrease** coverage by `4.70%`.
> The diff coverage is `90.62%`.
> :exclamation: Current head c91c2e9 differs from pull request most recent head 787e0dd. Consider uploading reports for the commit 787e0dd to get more accurate results
[![Impacted file tree graph](https://codecov.io/gh/apache/spark/pull/35823/graphs/tree.svg?width=650&height=150&src=pr&token=R9pHLWgWi8&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## master #35823 +/- ##
==========================================
- Coverage 91.19% 86.48% -4.71%
==========================================
Files 297 252 -45
Lines 64696 55695 -9001
Branches 9919 8929 -990
==========================================
- Hits 58999 48168 -10831
- Misses 4330 6256 +1926
+ Partials 1367 1271 -96
```
| Flag | Coverage Δ | |
|---|---|---|
| unittests | `86.46% <90.62%> (-4.71%)` | :arrow_down: |
Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
| [Impacted Files](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [python/pyspark/pandas/indexes/category.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvY2F0ZWdvcnkucHk=) | `93.33% <66.66%> (-0.92%)` | :arrow_down: |
| [python/pyspark/pandas/indexes/datetimes.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvZGF0ZXRpbWVzLnB5) | `95.23% <66.66%> (-0.52%)` | :arrow_down: |
| [python/pyspark/pandas/indexes/timedelta.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvdGltZWRlbHRhLnB5) | `75.40% <66.66%> (-0.46%)` | :arrow_down: |
| [python/pyspark/pandas/base.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2Jhc2UucHk=) | `94.14% <100.00%> (+0.03%)` | :arrow_up: |
| [python/pyspark/pandas/frame.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2ZyYW1lLnB5) | `97.06% <100.00%> (+<0.01%)` | :arrow_up: |
| [python/pyspark/pandas/tests/test\_dataframe.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Rlc3RzL3Rlc3RfZGF0YWZyYW1lLnB5) | `97.34% <100.00%> (+0.01%)` | :arrow_up: |
| [python/pyspark/pandas/tests/test\_series.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Rlc3RzL3Rlc3Rfc2VyaWVzLnB5) | `96.24% <100.00%> (+<0.01%)` | :arrow_up: |
| [python/pyspark/sql/observation.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL29ic2VydmF0aW9uLnB5) | `26.08% <0.00%> (-69.57%)` | :arrow_down: |
| [python/pyspark/sql/streaming.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL3N0cmVhbWluZy5weQ==) | `27.03% <0.00%> (-54.95%)` | :arrow_down: |
| [python/pyspark/sql/sql\_formatter.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL3NxbF9mb3JtYXR0ZXIucHk=) | `32.50% <0.00%> (-52.50%)` | :arrow_down: |
| ... and [88 more](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [216b972...787e0dd](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] dcoliversun commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias
Posted by GitBox <gi...@apache.org>.
dcoliversun commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r825562687
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -779,15 +779,19 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
checkAnswer(df, Seq(Row(1d), Row(1d), Row(null)))
}
- test("scan with aggregate push-down: aggregate over alias NOT push down") {
+ test("scan with aggregate push-down: aggregate over alias push down") {
Review comment:
Hi. Is it better to specify `SPARK-38533` ?
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -1032,4 +1036,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
|ON h2.test.view1.`|col1` = h2.test.view2.`|col1`""".stripMargin)
checkAnswer(df, Seq.empty[Row])
}
+
+ test("scan with aggregate push-down: complete push-down aggregate with alias") {
+ val df = spark.table("h2.test.employee")
+ .select($"DEPT", $"SALARY".as("mySalary"))
+ .groupBy($"DEPT")
+ .agg(sum($"mySalary").as("total"))
+ .filter($"total" > 1000)
+ checkAggregateRemoved(df)
+ df.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ val expected_plan_fragment =
Review comment:
Why not use `camel case` naming? https://docs.scala-lang.org/style/naming-conventions.html
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -1032,4 +1036,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
|ON h2.test.view1.`|col1` = h2.test.view2.`|col1`""".stripMargin)
checkAnswer(df, Seq.empty[Row])
}
+
+ test("scan with aggregate push-down: complete push-down aggregate with alias") {
Review comment:
ditto
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -1032,4 +1036,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
|ON h2.test.view1.`|col1` = h2.test.view2.`|col1`""".stripMargin)
checkAnswer(df, Seq.empty[Row])
}
+
+ test("scan with aggregate push-down: complete push-down aggregate with alias") {
+ val df = spark.table("h2.test.employee")
+ .select($"DEPT", $"SALARY".as("mySalary"))
+ .groupBy($"DEPT")
+ .agg(sum($"mySalary").as("total"))
+ .filter($"total" > 1000)
+ checkAggregateRemoved(df)
+ df.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ val expected_plan_fragment =
+ "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+ checkKeywordsExistsInExplain(df, expected_plan_fragment)
+ }
+ checkAnswer(df, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+
+ val df2 = spark.table("h2.test.employee")
+ .select($"DEPT".as("myDept"), $"SALARY".as("mySalary"))
+ .groupBy($"myDept")
+ .agg(sum($"mySalary").as("total"))
+ .filter($"total" > 1000)
+ checkAggregateRemoved(df2)
+ df2.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ val expected_plan_fragment =
+ "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+ checkKeywordsExistsInExplain(df2, expected_plan_fragment)
+ }
+ checkAnswer(df2, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+ }
+
+ test("scan with aggregate push-down: partial push-down aggregate with alias") {
Review comment:
ditto
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -1032,4 +1036,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
|ON h2.test.view1.`|col1` = h2.test.view2.`|col1`""".stripMargin)
checkAnswer(df, Seq.empty[Row])
}
+
+ test("scan with aggregate push-down: complete push-down aggregate with alias") {
+ val df = spark.table("h2.test.employee")
+ .select($"DEPT", $"SALARY".as("mySalary"))
+ .groupBy($"DEPT")
+ .agg(sum($"mySalary").as("total"))
+ .filter($"total" > 1000)
+ checkAggregateRemoved(df)
+ df.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ val expected_plan_fragment =
+ "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+ checkKeywordsExistsInExplain(df, expected_plan_fragment)
+ }
+ checkAnswer(df, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+
+ val df2 = spark.table("h2.test.employee")
+ .select($"DEPT".as("myDept"), $"SALARY".as("mySalary"))
+ .groupBy($"myDept")
+ .agg(sum($"mySalary").as("total"))
+ .filter($"total" > 1000)
+ checkAggregateRemoved(df2)
+ df2.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ val expected_plan_fragment =
+ "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+ checkKeywordsExistsInExplain(df2, expected_plan_fragment)
+ }
+ checkAnswer(df2, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+ }
+
+ test("scan with aggregate push-down: partial push-down aggregate with alias") {
+ val df = spark.read
+ .option("partitionColumn", "DEPT")
+ .option("lowerBound", "0")
+ .option("upperBound", "2")
+ .option("numPartitions", "2")
+ .table("h2.test.employee")
+ .select($"NAME", $"SALARY".as("mySalary"))
+ .groupBy($"NAME")
+ .agg(sum($"mySalary").as("total"))
+ .filter($"total" > 1000)
+ checkAggregateRemoved(df, false)
+ df.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ val expected_plan_fragment =
Review comment:
ditto
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -1032,4 +1036,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
|ON h2.test.view1.`|col1` = h2.test.view2.`|col1`""".stripMargin)
checkAnswer(df, Seq.empty[Row])
}
+
+ test("scan with aggregate push-down: complete push-down aggregate with alias") {
+ val df = spark.table("h2.test.employee")
+ .select($"DEPT", $"SALARY".as("mySalary"))
+ .groupBy($"DEPT")
+ .agg(sum($"mySalary").as("total"))
+ .filter($"total" > 1000)
+ checkAggregateRemoved(df)
+ df.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ val expected_plan_fragment =
+ "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+ checkKeywordsExistsInExplain(df, expected_plan_fragment)
+ }
+ checkAnswer(df, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+
+ val df2 = spark.table("h2.test.employee")
+ .select($"DEPT".as("myDept"), $"SALARY".as("mySalary"))
+ .groupBy($"myDept")
+ .agg(sum($"mySalary").as("total"))
+ .filter($"total" > 1000)
+ checkAggregateRemoved(df2)
+ df2.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ val expected_plan_fragment =
Review comment:
ditto
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -1032,4 +1036,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
|ON h2.test.view1.`|col1` = h2.test.view2.`|col1`""".stripMargin)
checkAnswer(df, Seq.empty[Row])
}
+
+ test("scan with aggregate push-down: complete push-down aggregate with alias") {
+ val df = spark.table("h2.test.employee")
+ .select($"DEPT", $"SALARY".as("mySalary"))
+ .groupBy($"DEPT")
+ .agg(sum($"mySalary").as("total"))
+ .filter($"total" > 1000)
+ checkAggregateRemoved(df)
+ df.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ val expected_plan_fragment =
+ "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+ checkKeywordsExistsInExplain(df, expected_plan_fragment)
+ }
+ checkAnswer(df, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+
+ val df2 = spark.table("h2.test.employee")
+ .select($"DEPT".as("myDept"), $"SALARY".as("mySalary"))
+ .groupBy($"myDept")
+ .agg(sum($"mySalary").as("total"))
+ .filter($"total" > 1000)
+ checkAggregateRemoved(df2)
+ df2.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ val expected_plan_fragment =
+ "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+ checkKeywordsExistsInExplain(df2, expected_plan_fragment)
+ }
+ checkAnswer(df2, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+ }
+
+ test("scan with aggregate push-down: partial push-down aggregate with alias") {
+ val df = spark.read
+ .option("partitionColumn", "DEPT")
+ .option("lowerBound", "0")
+ .option("upperBound", "2")
+ .option("numPartitions", "2")
+ .table("h2.test.employee")
+ .select($"NAME", $"SALARY".as("mySalary"))
+ .groupBy($"NAME")
+ .agg(sum($"mySalary").as("total"))
+ .filter($"total" > 1000)
+ checkAggregateRemoved(df, false)
+ df.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ val expected_plan_fragment =
+ "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]"
+ checkKeywordsExistsInExplain(df, expected_plan_fragment)
+ }
+ checkAnswer(df, Seq(Row("alex", 12000.00), Row("amy", 10000.00),
+ Row("cathy", 9000.00), Row("david", 10000.00), Row("jen", 12000.00)))
+
+ val df2 = spark.read
+ .option("partitionColumn", "DEPT")
+ .option("lowerBound", "0")
+ .option("upperBound", "2")
+ .option("numPartitions", "2")
+ .table("h2.test.employee")
+ .select($"NAME".as("myName"), $"SALARY".as("mySalary"))
+ .groupBy($"myName")
+ .agg(sum($"mySalary").as("total"))
+ .filter($"total" > 1000)
+ checkAggregateRemoved(df2, false)
+ df2.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ val expected_plan_fragment =
Review comment:
ditto
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +92,38 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
// update the scan builder with agg pushdown and return a new plan with agg pushed
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
child match {
- case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
- if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+ case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
+ project.forall(p => p.isInstanceOf[AttributeReference] || p.isInstanceOf[Alias]) =>
sHolder.builder match {
case r: SupportsPushDownAggregates =>
+ val aliasAttrToOriginAttr = mutable.HashMap.empty[Expression, AttributeReference]
+ val originAttrToAliasAttr = mutable.HashMap.empty[Expression, Attribute]
+ collectAliases(project, aliasAttrToOriginAttr, originAttrToAliasAttr)
+ val newResultExpressions = resultExpressions.map { expr =>
+ expr.transform {
+ case r: AttributeReference if aliasAttrToOriginAttr.contains(r.canonicalized) =>
+ aliasAttrToOriginAttr(r.canonicalized)
+ }
+ }.asInstanceOf[Seq[NamedExpression]]
+ val newGroupingExpressions = groupingExpressions.map { expr =>
+ expr.transform {
+ case r: AttributeReference if aliasAttrToOriginAttr.contains(r.canonicalized) =>
+ aliasAttrToOriginAttr(r.canonicalized)
Review comment:
These two lambda expressions behave the same, we can consider reusing a function
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org