You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/03/12 07:28:06 UTC

[GitHub] [spark] beliefer opened a new pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

beliefer opened a new pull request #35823:
URL: https://github.com/apache/spark/pull/35823


   ### What changes were proposed in this pull request?
   Currently, Spark DS V2 aggregate push-down doesn't supports project with alias.
   
   Refer https://github.com/apache/spark/blob/c91c2e9afec0d5d5bbbd2e155057fe409c5bb928/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala#L96
   
   This PR let it works good with alias.
   
   
   ### Why are the changes needed?
   Alias is more useful.
   
   
   ### Does this PR introduce _any_ user-facing change?
   'Yes'.
   Users could see DS V2 aggregate push-down supports project with alias.
   
   
   ### How was this patch tested?
   New tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r828723083



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -93,22 +92,28 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
         case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+          if filters.isEmpty && project.forall(_.deterministic) =>
           sHolder.builder match {
             case r: SupportsPushDownAggregates =>
+              val aliasMap = getAliasMap(project)
+              val newResultExpressions = resultExpressions.map(replaceAliasWithAttr(_, aliasMap))
+              val newGroupingExpressions = groupingExpressions.map {
+                case e: NamedExpression => replaceAliasWithAttr(e, aliasMap)
+                case other => other
+              }

Review comment:
       can we make the code more explicit? We need to clearly show the steps
   1. collapse aggregate and project
   2. remove the alias from aggregate functions and group by expressions (this logic should be put here instead of `AliasHelper` as this is not a common logic)
   3. push down agg
   4. add back alias for group by expressions only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826955158



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +91,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     // update the scan builder with agg pushdown and return a new plan with agg pushed
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
-        case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+        case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&

Review comment:
       We need to clearly describe the final plan. This is more complicated now as the project may contain arbitrary expressions.
   
   For example
   ```
   Aggregate(sum(a + b) + max(a - c),
     Project(x + 1 as a, x * 2 as b , x + y as c,
       Table(x, y, z)
     )
   )
   ```
   what the final plan looks like if the aggregate can be pushed, or can be partial pushed, or can't be pushed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
beliefer commented on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1066001189


   > I actually have an alias over aggregate test in FileSource too. Could you please change that one as well?
   > https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala#L187
   
   Thank you for the remind.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r825591018



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -779,15 +779,19 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     checkAnswer(df, Seq(Row(1d), Row(1d), Row(null)))
   }
 
-  test("scan with aggregate push-down: aggregate over alias NOT push down") {
+  test("scan with aggregate push-down: aggregate over alias push down") {

Review comment:
       It doesn't matter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] codecov-commenter edited a comment on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1065861634


   # [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#35823](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c91c2e9) into [master](https://codecov.io/gh/apache/spark/commit/c483e2977cbc6ae33d999c9c9d1dbacd9c53d85a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c483e29) will **increase** coverage by `0.00%`.
   > The diff coverage is `90.62%`.
   
   > :exclamation: Current head c91c2e9 differs from pull request most recent head 787e0dd. Consider uploading reports for the commit 787e0dd to get more accurate results
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/spark/pull/35823/graphs/tree.svg?width=650&height=150&src=pr&token=R9pHLWgWi8&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master   #35823   +/-   ##
   =======================================
     Coverage   91.19%   91.19%           
   =======================================
     Files         297      297           
     Lines       64696    64724   +28     
     Branches     9919     9921    +2     
   =======================================
   + Hits        58999    59025   +26     
   - Misses       4330     4332    +2     
     Partials     1367     1367           
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `91.17% <90.62%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [python/pyspark/sql/tests/test\_udf.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL3Rlc3RzL3Rlc3RfdWRmLnB5) | `95.56% <ø> (ø)` | |
   | [python/pyspark/pandas/indexes/category.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvY2F0ZWdvcnkucHk=) | `93.33% <66.66%> (-0.92%)` | :arrow_down: |
   | [python/pyspark/pandas/indexes/datetimes.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvZGF0ZXRpbWVzLnB5) | `95.23% <66.66%> (-0.52%)` | :arrow_down: |
   | [python/pyspark/pandas/indexes/timedelta.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvdGltZWRlbHRhLnB5) | `75.40% <66.66%> (-0.46%)` | :arrow_down: |
   | [python/pyspark/pandas/base.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2Jhc2UucHk=) | `94.14% <100.00%> (+0.03%)` | :arrow_up: |
   | [python/pyspark/pandas/frame.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2ZyYW1lLnB5) | `97.06% <100.00%> (+<0.01%)` | :arrow_up: |
   | [python/pyspark/pandas/tests/test\_dataframe.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Rlc3RzL3Rlc3RfZGF0YWZyYW1lLnB5) | `97.34% <100.00%> (+0.01%)` | :arrow_up: |
   | [python/pyspark/pandas/tests/test\_series.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Rlc3RzL3Rlc3Rfc2VyaWVzLnB5) | `96.24% <100.00%> (+<0.01%)` | :arrow_up: |
   | [...n/pyspark/mllib/tests/test\_streaming\_algorithms.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvbWxsaWIvdGVzdHMvdGVzdF9zdHJlYW1pbmdfYWxnb3JpdGhtcy5weQ==) | `76.34% <0.00%> (-0.36%)` | :arrow_down: |
   | [python/pyspark/streaming/tests/test\_context.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3RyZWFtaW5nL3Rlc3RzL3Rlc3RfY29udGV4dC5weQ==) | `98.42% <0.00%> (ø)` | |
   | ... and [1 more](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [216b972...787e0dd](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826944867



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala
##########
@@ -77,6 +85,17 @@ trait AliasHelper {
     }).asInstanceOf[NamedExpression]
   }
 
+  /**
+   * Replace all alias, with the aliased attribute.
+   */
+  protected def replaceAliasWithAttr(

Review comment:
       why can't we use the existing `replaceAlias` method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r827564615



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +91,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     // update the scan builder with agg pushdown and return a new plan with agg pushed
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
-        case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+        case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&

Review comment:
       ```
   Aggregate [myDept#0], [((cast(sum(CheckOverflow((promote_precision(cast(mySalary#1 as decimal(23,2))) + promote_precision(cast(yourSalary#2 as decimal(23,2)))), DecimalType(23,2))) as double) + max((cast(mySalary#1 as double) - bonus#6))) + cast(myDept#0 as double)) AS ((sum((mySalary + yourSalary)) + max((mySalary - bonus))) + myDept)#9]
   +- Project [dept#3 AS myDept#0, CheckOverflow((promote_precision(cast(salary#5 as decimal(21,2))) + 1.00), DecimalType(21,2)) AS mySalary#1, CheckOverflow((promote_precision(salary#5) * 2.00), DecimalType(22,2)) AS yourSalary#2, bonus#6]
      +- ScanBuilderHolder [DEPT#3, NAME#4, SALARY#5, BONUS#6], RelationV2[DEPT#3, NAME#4, SALARY#5, BONUS#6] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession@463a1f47,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions@47224d5d)
   ```
   `cast`, `CheckOverflow`, `promote_precision` not supported in aggregate push-down.
   I updated description of PR and add a plan.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826946482



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala
##########
@@ -77,6 +85,17 @@ trait AliasHelper {
     }).asInstanceOf[NamedExpression]
   }
 
+  /**
+   * Replace all alias, with the aliased attribute.
+   */
+  protected def replaceAliasWithAttr(
+      expr: NamedExpression,
+      aliasMap: AttributeMap[Alias]): NamedExpression = {
+    replaceAliasButKeepName(expr, aliasMap).transform {
+      case Alias(attr: Attribute, _) => attr

Review comment:
       why is this line neded?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala
##########
@@ -77,6 +85,17 @@ trait AliasHelper {
     }).asInstanceOf[NamedExpression]
   }
 
+  /**
+   * Replace all alias, with the aliased attribute.
+   */
+  protected def replaceAliasWithAttr(
+      expr: NamedExpression,
+      aliasMap: AttributeMap[Alias]): NamedExpression = {
+    replaceAliasButKeepName(expr, aliasMap).transform {
+      case Alias(attr: Attribute, _) => attr

Review comment:
       why is this line needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
beliefer commented on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1074989781


   https://github.com/apache/spark/pull/35932 replaces this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r827564615



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +91,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     // update the scan builder with agg pushdown and return a new plan with agg pushed
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
-        case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+        case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&

Review comment:
       ```
   Aggregate [myDept#0], [((cast(sum(CheckOverflow((promote_precision(cast(mySalary#1 as decimal(23,2))) + promote_precision(cast(yourSalary#2 as decimal(23,2)))), DecimalType(23,2))) as double) + max((cast(mySalary#1 as double) - bonus#6))) + cast(myDept#0 as double)) AS ((sum((mySalary + yourSalary)) + max((mySalary - bonus))) + myDept)#9]
   +- Project [dept#3 AS myDept#0, CheckOverflow((promote_precision(cast(salary#5 as decimal(21,2))) + 1.00), DecimalType(21,2)) AS mySalary#1, CheckOverflow((promote_precision(salary#5) * 2.00), DecimalType(22,2)) AS yourSalary#2, bonus#6]
      +- ScanBuilderHolder [DEPT#3, NAME#4, SALARY#5, BONUS#6], RelationV2[DEPT#3, NAME#4, SALARY#5, BONUS#6] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession@463a1f47,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions@47224d5d)
   ```
   cast, CheckOverflow, promote_precision not supported in aggregate push-down.
   I updated description of PR and add a plan.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer closed pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
beliefer closed pull request #35823:
URL: https://github.com/apache/spark/pull/35823


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r828724079



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -93,22 +92,28 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
         case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+          if filters.isEmpty && project.forall(_.deterministic) =>
           sHolder.builder match {
             case r: SupportsPushDownAggregates =>
+              val aliasMap = getAliasMap(project)
+              val newResultExpressions = resultExpressions.map(replaceAliasWithAttr(_, aliasMap))
+              val newGroupingExpressions = groupingExpressions.map {
+                case e: NamedExpression => replaceAliasWithAttr(e, aliasMap)
+                case other => other
+              }

Review comment:
       Thank you for the good idea.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r825331821



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -234,7 +257,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
                   // Aggregate [c2#10], [min(min(c1)#21) AS min(c1)#17, max(max(c1)#22) AS max(c1)#18]
                   // +- RelationV2[c2#10, min(c1)#21, max(c1)#22] ...
                   // scalastyle:on
-                  plan.transformExpressions {
+                  val agg = plan.transformExpressions {

Review comment:
       nit: unnecessary change?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r825953066



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +92,37 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     // update the scan builder with agg pushdown and return a new plan with agg pushed
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
-        case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+        case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
+          project.forall(p => p.isInstanceOf[AttributeReference] || p.isInstanceOf[Alias]) =>

Review comment:
       Thank you for the reminder.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r828720488



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -93,22 +92,28 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
         case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+          if filters.isEmpty && project.forall(_.deterministic) =>
           sHolder.builder match {
             case r: SupportsPushDownAggregates =>
+              val aliasMap = getAliasMap(project)
+              val newResultExpressions = resultExpressions.map(replaceAliasWithAttr(_, aliasMap))
+              val newGroupingExpressions = groupingExpressions.map {
+                case e: NamedExpression => replaceAliasWithAttr(e, aliasMap)
+                case other => other
+              }
               val aggExprToOutputOrdinal = mutable.HashMap.empty[Expression, Int]
-              val aggregates = collectAggregates(resultExpressions, aggExprToOutputOrdinal)
+              val aggregates = collectAggregates(newResultExpressions, aggExprToOutputOrdinal)
               val normalizedAggregates = DataSourceStrategy.normalizeExprs(
                 aggregates, sHolder.relation.output).asInstanceOf[Seq[AggregateExpression]]
               val normalizedGroupingExpressions = DataSourceStrategy.normalizeExprs(
-                groupingExpressions, sHolder.relation.output)
+                newGroupingExpressions, sHolder.relation.output)
               val translatedAggregates = DataSourceStrategy.translateAggregation(
                 normalizedAggregates, normalizedGroupingExpressions)
-              val (finalResultExpressions, finalAggregates, finalTranslatedAggregates) = {
+              val (selectedResultExpressions, selectedAggregates, selectedTranslatedAggregates) = {

Review comment:
       It's not confirmed yet.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826955158



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +91,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     // update the scan builder with agg pushdown and return a new plan with agg pushed
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
-        case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+        case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&

Review comment:
       We need to clearly describe the final plan. This is more complicated now as the project may contain arbitrary expressions.
   
   For example
   ```
   Aggregate(sum(a + b) + max(a - c) + x, group by x,
     Project(x, x + 1 as a, x * 2 as b , x + y as c,
       Table(x, y, z)
     )
   )
   ```
   what the final plan looks like if the aggregate can be pushed, or can be partial pushed, or can't be pushed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826955158



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +91,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     // update the scan builder with agg pushdown and return a new plan with agg pushed
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
-        case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+        case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&

Review comment:
       We need to clearly describe the final plan. This is more complicated now as the project may contain arbitrary expressions.
   
   For example
   ```
   Aggregate(sum(a + b) + max(a - c) + a, group by a,
     Project(x + 1 as a, x * 2 as b , x + y as c,
       Table(x, y, z)
     )
   )
   ```
   what the final plan looks like if the aggregate can be pushed, or can be partial pushed, or can't be pushed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
beliefer commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r827552580



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AliasHelper.scala
##########
@@ -77,6 +85,17 @@ trait AliasHelper {
     }).asInstanceOf[NamedExpression]
   }
 
+  /**
+   * Replace all alias, with the aliased attribute.
+   */
+  protected def replaceAliasWithAttr(
+      expr: NamedExpression,
+      aliasMap: AttributeMap[Alias]): NamedExpression = {
+    replaceAliasButKeepName(expr, aliasMap).transform {
+      case Alias(attr: Attribute, _) => attr

Review comment:
       Attribute in groupingExpressions may be alias. Before push down SQL to JDBC, I want replace the alias Attribute with origin Attribute, not the Alias.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r825933825



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +92,37 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     // update the scan builder with agg pushdown and return a new plan with agg pushed
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
-        case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+        case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
+          project.forall(p => p.isInstanceOf[AttributeReference] || p.isInstanceOf[Alias]) =>

Review comment:
       Please follow the predicate pushdown optimizer rule and leverage `AliasHelper` to do it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] codecov-commenter edited a comment on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1065861634


   # [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#35823](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c91c2e9) into [master](https://codecov.io/gh/apache/spark/commit/c483e2977cbc6ae33d999c9c9d1dbacd9c53d85a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c483e29) will **decrease** coverage by `14.31%`.
   > The diff coverage is `70.58%`.
   
   > :exclamation: Current head c91c2e9 differs from pull request most recent head 787e0dd. Consider uploading reports for the commit 787e0dd to get more accurate results
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/spark/pull/35823/graphs/tree.svg?width=650&height=150&src=pr&token=R9pHLWgWi8&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master   #35823       +/-   ##
   ===========================================
   - Coverage   91.19%   76.88%   -14.32%     
   ===========================================
     Files         297      243       -54     
     Lines       64696    46054    -18642     
     Branches     9919     8139     -1780     
   ===========================================
   - Hits        58999    35408    -23591     
   - Misses       4330     9168     +4838     
   - Partials     1367     1478      +111     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `76.87% <70.58%> (-14.30%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [python/pyspark/pandas/frame.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2ZyYW1lLnB5) | `33.01% <33.33%> (-64.06%)` | :arrow_down: |
   | [python/pyspark/pandas/indexes/category.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvY2F0ZWdvcnkucHk=) | `93.33% <66.66%> (-0.92%)` | :arrow_down: |
   | [python/pyspark/pandas/indexes/datetimes.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvZGF0ZXRpbWVzLnB5) | `79.76% <66.66%> (-16.00%)` | :arrow_down: |
   | [python/pyspark/pandas/indexes/timedelta.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvdGltZWRlbHRhLnB5) | `72.13% <66.66%> (-3.74%)` | :arrow_down: |
   | [python/pyspark/pandas/base.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2Jhc2UucHk=) | `83.51% <100.00%> (-10.61%)` | :arrow_down: |
   | [python/pyspark/sql/observation.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL29ic2VydmF0aW9uLnB5) | `26.08% <0.00%> (-69.57%)` | :arrow_down: |
   | [python/pyspark/pandas/series.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Nlcmllcy5weQ==) | `39.85% <0.00%> (-56.08%)` | :arrow_down: |
   | [python/pyspark/sql/streaming.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL3N0cmVhbWluZy5weQ==) | `27.03% <0.00%> (-54.95%)` | :arrow_down: |
   | [python/pyspark/sql/sql\_formatter.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL3NxbF9mb3JtYXR0ZXIucHk=) | `32.50% <0.00%> (-52.50%)` | :arrow_down: |
   | ... and [118 more](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [216b972...787e0dd](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] codecov-commenter commented on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1065861634


   # [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#35823](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c91c2e9) into [master](https://codecov.io/gh/apache/spark/commit/c483e2977cbc6ae33d999c9c9d1dbacd9c53d85a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c483e29) will **decrease** coverage by `29.55%`.
   > The diff coverage is `70.58%`.
   
   > :exclamation: Current head c91c2e9 differs from pull request most recent head 787e0dd. Consider uploading reports for the commit 787e0dd to get more accurate results
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/spark/pull/35823/graphs/tree.svg?width=650&height=150&src=pr&token=R9pHLWgWi8&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master   #35823       +/-   ##
   ===========================================
   - Coverage   91.19%   61.63%   -29.56%     
   ===========================================
     Files         297      202       -95     
     Lines       64696    40330    -24366     
     Branches     9919     7528     -2391     
   ===========================================
   - Hits        58999    24859    -34140     
   - Misses       4330    14256     +9926     
   + Partials     1367     1215      -152     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `61.63% <70.58%> (-29.54%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [python/pyspark/pandas/frame.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2ZyYW1lLnB5) | `33.01% <33.33%> (-64.06%)` | :arrow_down: |
   | [python/pyspark/pandas/indexes/category.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvY2F0ZWdvcnkucHk=) | `93.33% <66.66%> (-0.92%)` | :arrow_down: |
   | [python/pyspark/pandas/indexes/datetimes.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvZGF0ZXRpbWVzLnB5) | `79.76% <66.66%> (-16.00%)` | :arrow_down: |
   | [python/pyspark/pandas/indexes/timedelta.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvdGltZWRlbHRhLnB5) | `72.13% <66.66%> (-3.74%)` | :arrow_down: |
   | [python/pyspark/pandas/base.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2Jhc2UucHk=) | `83.51% <100.00%> (-10.61%)` | :arrow_down: |
   | [python/pyspark/join.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvam9pbi5weQ==) | `12.12% <0.00%> (-81.82%)` | :arrow_down: |
   | [python/pyspark/sql/observation.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL29ic2VydmF0aW9uLnB5) | `26.08% <0.00%> (-69.57%)` | :arrow_down: |
   | [python/pyspark/ml/tuning.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvbWwvdHVuaW5nLnB5) | `25.03% <0.00%> (-67.46%)` | :arrow_down: |
   | [python/pyspark/streaming/dstream.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3RyZWFtaW5nL2RzdHJlYW0ucHk=) | `22.65% <0.00%> (-61.72%)` | :arrow_down: |
   | ... and [196 more](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [216b972...787e0dd](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
huaxingao commented on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1065933348


   I actually have an alias over aggregate test in FileSource too. Could you please change that one as well?
   https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala#L187


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] beliefer commented on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
beliefer commented on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1066640576


   ping @huaxingao cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826940304



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +92,37 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     // update the scan builder with agg pushdown and return a new plan with agg pushed
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
-        case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+        case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
+          project.forall(p => p.isInstanceOf[AttributeReference] || p.isInstanceOf[Alias]) =>

Review comment:
       if we follow it, then here should be `project.forall(_.deterministic)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r828000530



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -93,22 +92,28 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
         case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+          if filters.isEmpty && project.forall(_.deterministic) =>
           sHolder.builder match {
             case r: SupportsPushDownAggregates =>
+              val aliasMap = getAliasMap(project)
+              val newResultExpressions = resultExpressions.map(replaceAliasWithAttr(_, aliasMap))
+              val newGroupingExpressions = groupingExpressions.map {
+                case e: NamedExpression => replaceAliasWithAttr(e, aliasMap)
+                case other => other
+              }
               val aggExprToOutputOrdinal = mutable.HashMap.empty[Expression, Int]
-              val aggregates = collectAggregates(resultExpressions, aggExprToOutputOrdinal)
+              val aggregates = collectAggregates(newResultExpressions, aggExprToOutputOrdinal)
               val normalizedAggregates = DataSourceStrategy.normalizeExprs(
                 aggregates, sHolder.relation.output).asInstanceOf[Seq[AggregateExpression]]
               val normalizedGroupingExpressions = DataSourceStrategy.normalizeExprs(
-                groupingExpressions, sHolder.relation.output)
+                newGroupingExpressions, sHolder.relation.output)
               val translatedAggregates = DataSourceStrategy.translateAggregation(
                 normalizedAggregates, normalizedGroupingExpressions)
-              val (finalResultExpressions, finalAggregates, finalTranslatedAggregates) = {
+              val (selectedResultExpressions, selectedAggregates, selectedTranslatedAggregates) = {

Review comment:
       why do we rename these?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r826947256



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +91,27 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     // update the scan builder with agg pushdown and return a new plan with agg pushed
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
-        case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+        case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
+          project.forall(p => p.isInstanceOf[AttributeReference] || p.isInstanceOf[Alias]) =>
           sHolder.builder match {
             case r: SupportsPushDownAggregates =>
+              val aliasMap = getAliasMap(project)
+              val newResultExpressions = resultExpressions.map(replaceAliasWithAttr(_, aliasMap))
+              val newGroupingExpressions = groupingExpressions.asInstanceOf[Seq[NamedExpression]]

Review comment:
       `groupingExpressions` may not be `NamedExpression`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] codecov-commenter edited a comment on pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #35823:
URL: https://github.com/apache/spark/pull/35823#issuecomment-1065861634


   # [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#35823](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c91c2e9) into [master](https://codecov.io/gh/apache/spark/commit/c483e2977cbc6ae33d999c9c9d1dbacd9c53d85a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c483e29) will **decrease** coverage by `4.70%`.
   > The diff coverage is `90.62%`.
   
   > :exclamation: Current head c91c2e9 differs from pull request most recent head 787e0dd. Consider uploading reports for the commit 787e0dd to get more accurate results
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/spark/pull/35823/graphs/tree.svg?width=650&height=150&src=pr&token=R9pHLWgWi8&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #35823      +/-   ##
   ==========================================
   - Coverage   91.19%   86.48%   -4.71%     
   ==========================================
     Files         297      252      -45     
     Lines       64696    55695    -9001     
     Branches     9919     8929     -990     
   ==========================================
   - Hits        58999    48168   -10831     
   - Misses       4330     6256    +1926     
   + Partials     1367     1271      -96     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `86.46% <90.62%> (-4.71%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [python/pyspark/pandas/indexes/category.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvY2F0ZWdvcnkucHk=) | `93.33% <66.66%> (-0.92%)` | :arrow_down: |
   | [python/pyspark/pandas/indexes/datetimes.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvZGF0ZXRpbWVzLnB5) | `95.23% <66.66%> (-0.52%)` | :arrow_down: |
   | [python/pyspark/pandas/indexes/timedelta.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2luZGV4ZXMvdGltZWRlbHRhLnB5) | `75.40% <66.66%> (-0.46%)` | :arrow_down: |
   | [python/pyspark/pandas/base.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2Jhc2UucHk=) | `94.14% <100.00%> (+0.03%)` | :arrow_up: |
   | [python/pyspark/pandas/frame.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2ZyYW1lLnB5) | `97.06% <100.00%> (+<0.01%)` | :arrow_up: |
   | [python/pyspark/pandas/tests/test\_dataframe.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Rlc3RzL3Rlc3RfZGF0YWZyYW1lLnB5) | `97.34% <100.00%> (+0.01%)` | :arrow_up: |
   | [python/pyspark/pandas/tests/test\_series.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Rlc3RzL3Rlc3Rfc2VyaWVzLnB5) | `96.24% <100.00%> (+<0.01%)` | :arrow_up: |
   | [python/pyspark/sql/observation.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL29ic2VydmF0aW9uLnB5) | `26.08% <0.00%> (-69.57%)` | :arrow_down: |
   | [python/pyspark/sql/streaming.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL3N0cmVhbWluZy5weQ==) | `27.03% <0.00%> (-54.95%)` | :arrow_down: |
   | [python/pyspark/sql/sql\_formatter.py](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL3NxbF9mb3JtYXR0ZXIucHk=) | `32.50% <0.00%> (-52.50%)` | :arrow_down: |
   | ... and [88 more](https://codecov.io/gh/apache/spark/pull/35823/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [216b972...787e0dd](https://codecov.io/gh/apache/spark/pull/35823?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dcoliversun commented on a change in pull request #35823: [SPARK-38533][SQL] DS V2 aggregate push-down supports project with alias

Posted by GitBox <gi...@apache.org>.
dcoliversun commented on a change in pull request #35823:
URL: https://github.com/apache/spark/pull/35823#discussion_r825562687



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -779,15 +779,19 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     checkAnswer(df, Seq(Row(1d), Row(1d), Row(null)))
   }
 
-  test("scan with aggregate push-down: aggregate over alias NOT push down") {
+  test("scan with aggregate push-down: aggregate over alias push down") {

Review comment:
       Hi. Is it better to specify `SPARK-38533` ?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -1032,4 +1036,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
         |ON h2.test.view1.`|col1` = h2.test.view2.`|col1`""".stripMargin)
     checkAnswer(df, Seq.empty[Row])
   }
+
+  test("scan with aggregate push-down: complete push-down aggregate with alias") {
+    val df = spark.table("h2.test.employee")
+      .select($"DEPT", $"SALARY".as("mySalary"))
+      .groupBy($"DEPT")
+      .agg(sum($"mySalary").as("total"))
+      .filter($"total" > 1000)
+    checkAggregateRemoved(df)
+    df.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =

Review comment:
       Why not use `camel case` naming? https://docs.scala-lang.org/style/naming-conventions.html

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -1032,4 +1036,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
         |ON h2.test.view1.`|col1` = h2.test.view2.`|col1`""".stripMargin)
     checkAnswer(df, Seq.empty[Row])
   }
+
+  test("scan with aggregate push-down: complete push-down aggregate with alias") {

Review comment:
       ditto

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -1032,4 +1036,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
         |ON h2.test.view1.`|col1` = h2.test.view2.`|col1`""".stripMargin)
     checkAnswer(df, Seq.empty[Row])
   }
+
+  test("scan with aggregate push-down: complete push-down aggregate with alias") {
+    val df = spark.table("h2.test.employee")
+      .select($"DEPT", $"SALARY".as("mySalary"))
+      .groupBy($"DEPT")
+      .agg(sum($"mySalary").as("total"))
+      .filter($"total" > 1000)
+    checkAggregateRemoved(df)
+    df.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+        checkKeywordsExistsInExplain(df, expected_plan_fragment)
+    }
+    checkAnswer(df, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+
+    val df2 = spark.table("h2.test.employee")
+      .select($"DEPT".as("myDept"), $"SALARY".as("mySalary"))
+      .groupBy($"myDept")
+      .agg(sum($"mySalary").as("total"))
+      .filter($"total" > 1000)
+    checkAggregateRemoved(df2)
+    df2.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+        checkKeywordsExistsInExplain(df2, expected_plan_fragment)
+    }
+    checkAnswer(df2, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+  }
+
+  test("scan with aggregate push-down: partial push-down aggregate with alias") {

Review comment:
       ditto

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -1032,4 +1036,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
         |ON h2.test.view1.`|col1` = h2.test.view2.`|col1`""".stripMargin)
     checkAnswer(df, Seq.empty[Row])
   }
+
+  test("scan with aggregate push-down: complete push-down aggregate with alias") {
+    val df = spark.table("h2.test.employee")
+      .select($"DEPT", $"SALARY".as("mySalary"))
+      .groupBy($"DEPT")
+      .agg(sum($"mySalary").as("total"))
+      .filter($"total" > 1000)
+    checkAggregateRemoved(df)
+    df.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+        checkKeywordsExistsInExplain(df, expected_plan_fragment)
+    }
+    checkAnswer(df, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+
+    val df2 = spark.table("h2.test.employee")
+      .select($"DEPT".as("myDept"), $"SALARY".as("mySalary"))
+      .groupBy($"myDept")
+      .agg(sum($"mySalary").as("total"))
+      .filter($"total" > 1000)
+    checkAggregateRemoved(df2)
+    df2.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+        checkKeywordsExistsInExplain(df2, expected_plan_fragment)
+    }
+    checkAnswer(df2, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+  }
+
+  test("scan with aggregate push-down: partial push-down aggregate with alias") {
+    val df = spark.read
+      .option("partitionColumn", "DEPT")
+      .option("lowerBound", "0")
+      .option("upperBound", "2")
+      .option("numPartitions", "2")
+      .table("h2.test.employee")
+      .select($"NAME", $"SALARY".as("mySalary"))
+      .groupBy($"NAME")
+      .agg(sum($"mySalary").as("total"))
+      .filter($"total" > 1000)
+    checkAggregateRemoved(df, false)
+    df.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =

Review comment:
       ditto

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -1032,4 +1036,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
         |ON h2.test.view1.`|col1` = h2.test.view2.`|col1`""".stripMargin)
     checkAnswer(df, Seq.empty[Row])
   }
+
+  test("scan with aggregate push-down: complete push-down aggregate with alias") {
+    val df = spark.table("h2.test.employee")
+      .select($"DEPT", $"SALARY".as("mySalary"))
+      .groupBy($"DEPT")
+      .agg(sum($"mySalary").as("total"))
+      .filter($"total" > 1000)
+    checkAggregateRemoved(df)
+    df.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+        checkKeywordsExistsInExplain(df, expected_plan_fragment)
+    }
+    checkAnswer(df, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+
+    val df2 = spark.table("h2.test.employee")
+      .select($"DEPT".as("myDept"), $"SALARY".as("mySalary"))
+      .groupBy($"myDept")
+      .agg(sum($"mySalary").as("total"))
+      .filter($"total" > 1000)
+    checkAggregateRemoved(df2)
+    df2.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =

Review comment:
       ditto

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -1032,4 +1036,76 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
         |ON h2.test.view1.`|col1` = h2.test.view2.`|col1`""".stripMargin)
     checkAnswer(df, Seq.empty[Row])
   }
+
+  test("scan with aggregate push-down: complete push-down aggregate with alias") {
+    val df = spark.table("h2.test.employee")
+      .select($"DEPT", $"SALARY".as("mySalary"))
+      .groupBy($"DEPT")
+      .agg(sum($"mySalary").as("total"))
+      .filter($"total" > 1000)
+    checkAggregateRemoved(df)
+    df.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+        checkKeywordsExistsInExplain(df, expected_plan_fragment)
+    }
+    checkAnswer(df, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+
+    val df2 = spark.table("h2.test.employee")
+      .select($"DEPT".as("myDept"), $"SALARY".as("mySalary"))
+      .groupBy($"myDept")
+      .agg(sum($"mySalary").as("total"))
+      .filter($"total" > 1000)
+    checkAggregateRemoved(df2)
+    df2.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [DEPT]"
+        checkKeywordsExistsInExplain(df2, expected_plan_fragment)
+    }
+    checkAnswer(df2, Seq(Row(1, 19000.00), Row(2, 22000.00), Row(6, 12000.00)))
+  }
+
+  test("scan with aggregate push-down: partial push-down aggregate with alias") {
+    val df = spark.read
+      .option("partitionColumn", "DEPT")
+      .option("lowerBound", "0")
+      .option("upperBound", "2")
+      .option("numPartitions", "2")
+      .table("h2.test.employee")
+      .select($"NAME", $"SALARY".as("mySalary"))
+      .groupBy($"NAME")
+      .agg(sum($"mySalary").as("total"))
+      .filter($"total" > 1000)
+    checkAggregateRemoved(df, false)
+    df.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =
+          "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByColumns: [NAME]"
+        checkKeywordsExistsInExplain(df, expected_plan_fragment)
+    }
+    checkAnswer(df, Seq(Row("alex", 12000.00), Row("amy", 10000.00),
+      Row("cathy", 9000.00), Row("david", 10000.00), Row("jen", 12000.00)))
+
+    val df2 = spark.read
+      .option("partitionColumn", "DEPT")
+      .option("lowerBound", "0")
+      .option("upperBound", "2")
+      .option("numPartitions", "2")
+      .table("h2.test.employee")
+      .select($"NAME".as("myName"), $"SALARY".as("mySalary"))
+      .groupBy($"myName")
+      .agg(sum($"mySalary").as("total"))
+      .filter($"total" > 1000)
+    checkAggregateRemoved(df2, false)
+    df2.queryExecution.optimizedPlan.collect {
+      case _: DataSourceV2ScanRelation =>
+        val expected_plan_fragment =

Review comment:
       ditto

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -92,23 +92,38 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
     // update the scan builder with agg pushdown and return a new plan with agg pushed
     case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) =>
       child match {
-        case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
-          if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
+        case ScanOperation(project, filters, sHolder: ScanBuilderHolder) if filters.isEmpty &&
+          project.forall(p => p.isInstanceOf[AttributeReference] || p.isInstanceOf[Alias]) =>
           sHolder.builder match {
             case r: SupportsPushDownAggregates =>
+              val aliasAttrToOriginAttr = mutable.HashMap.empty[Expression, AttributeReference]
+              val originAttrToAliasAttr = mutable.HashMap.empty[Expression, Attribute]
+              collectAliases(project, aliasAttrToOriginAttr, originAttrToAliasAttr)
+              val newResultExpressions = resultExpressions.map { expr =>
+                expr.transform {
+                  case r: AttributeReference if aliasAttrToOriginAttr.contains(r.canonicalized) =>
+                    aliasAttrToOriginAttr(r.canonicalized)
+                }
+              }.asInstanceOf[Seq[NamedExpression]]
+              val newGroupingExpressions = groupingExpressions.map { expr =>
+                expr.transform {
+                  case r: AttributeReference if aliasAttrToOriginAttr.contains(r.canonicalized) =>
+                    aliasAttrToOriginAttr(r.canonicalized)

Review comment:
       These two lambda expressions behave the same, we can consider reusing a function




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org