You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/13 00:00:47 UTC

[GitHub] [spark] anchovYu opened a new pull request, #39040: [WIP][SPARK-27561][SQL] Support implicit lateral column alias resolution on Aggregate

anchovYu opened a new pull request, #39040:
URL: https://github.com/apache/spark/pull/39040

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This PR is based on https://github.com/apache/spark/pull/38776. To view difference, https://github.com/anchovyu/spark/compare/SPARK-27561-refactor...SPARK-27561-agg.
   
   TODO: descriptions
   TODO: update comments in code
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
anchovYu commented on PR #39040:
URL: https://github.com/apache/spark/pull/39040#issuecomment-1350398518

   Thanks @dongjoon-hyun. One question, actually the two PRs are quite equal - each one serves as one of the two code paths of the feature (Project and Aggregate). In this case we still mark it [FOLLOWUP]?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on PR #39040:
URL: https://github.com/apache/spark/pull/39040#issuecomment-1354141698

   @anchovYu could you resolve the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on PR #39040:
URL: https://github.com/apache/spark/pull/39040#issuecomment-1350400527

   The point is you cannot **reuse** JIRA ID without using `FOLLOWUP`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on pull request #39040: [SPARK-41631][SQL] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
anchovYu commented on PR #39040:
URL: https://github.com/apache/spark/pull/39040#issuecomment-1360652092

   Is that a Github issue that I can re-request review from only one person? Every time I requested a second reviewer it automatically remove the request from the first reviewer.. But anyway cc @gengliangwang and @cloud-fan to review this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #39040: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1047993373


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAliasReference.scala:
##########
@@ -129,6 +163,45 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
               child = Project(innerProjectList.toSeq, child)
             )
           }
+
+        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if agg.resolved
+            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+
+          val newAggExprs = collection.mutable.Set.empty[NamedExpression]
+          val expressionMap = collection.mutable.LinkedHashMap.empty[Expression, NamedExpression]
+          val projectExprs = aggregateExpressions.map { exp =>
+            exp.transformDown {
+              case aggExpr: AggregateExpression =>
+                // Doesn't support referencing a lateral alias in aggregate function
+                if (aggExpr.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+                  aggExpr.collectFirst {
+                    case lcaRef: LateralColumnAliasReference =>
+                      throw QueryCompilationErrors.lateralColumnAliasInAggFuncUnsupportedError(
+                        lcaRef.nameParts, aggExpr)
+                  }
+                }
+                val ne = expressionMap.getOrElseUpdate(aggExpr.canonicalized, assignAlias(aggExpr))
+                newAggExprs += ne
+                ne.toAttribute
+              case e if groupingExpressions.exists(_.semanticEquals(e)) =>
+                // TODO one concern here, is condition here be able to match all grouping

Review Comment:
   There is reorder: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala#L483



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #39040: [SPARK-41631][SQL] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #39040: [SPARK-41631][SQL] Support implicit lateral column alias resolution on Aggregate
URL: https://github.com/apache/spark/pull/39040


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on PR #39040:
URL: https://github.com/apache/spark/pull/39040#issuecomment-1350400041

   In that case, I'd recommend you to use a new JIRA ID in order to make your new code path, @anchovYu . :)
   > Thanks @dongjoon-hyun. One question, actually the two PRs are quite equal - each one serves as one of the two code paths of the feature (Project and Aggregate). In this case we still mark it [FOLLOWUP]?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
anchovYu commented on PR #39040:
URL: https://github.com/apache/spark/pull/39040#issuecomment-1355358289

   @gengliangwang sure, will do today, fyi i was sick yesterday.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #39040: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1047990077


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAliasReference.scala:
##########
@@ -129,6 +163,45 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
               child = Project(innerProjectList.toSeq, child)
             )
           }
+
+        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if agg.resolved
+            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+
+          val newAggExprs = collection.mutable.Set.empty[NamedExpression]
+          val expressionMap = collection.mutable.LinkedHashMap.empty[Expression, NamedExpression]
+          val projectExprs = aggregateExpressions.map { exp =>
+            exp.transformDown {
+              case aggExpr: AggregateExpression =>
+                // Doesn't support referencing a lateral alias in aggregate function
+                if (aggExpr.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+                  aggExpr.collectFirst {
+                    case lcaRef: LateralColumnAliasReference =>
+                      throw QueryCompilationErrors.lateralColumnAliasInAggFuncUnsupportedError(
+                        lcaRef.nameParts, aggExpr)
+                  }
+                }
+                val ne = expressionMap.getOrElseUpdate(aggExpr.canonicalized, assignAlias(aggExpr))
+                newAggExprs += ne
+                ne.toAttribute
+              case e if groupingExpressions.exists(_.semanticEquals(e)) =>
+                // TODO one concern here, is condition here be able to match all grouping
+                //  expressions? For example, Agg [age + 10] [1 + age + 10], when transforming down,
+                //  is it possible that (1 + age) + 10, so that it won't be able to match (age + 10)
+                //  add a test.
+                val ne = expressionMap.getOrElseUpdate(e.canonicalized, assignAlias(e))
+                newAggExprs += ne
+                ne.toAttribute
+            }.asInstanceOf[NamedExpression]
+          }
+          if (newAggExprs.isEmpty) {
+            agg
+          } else {
+            Project(
+              projectList = projectExprs,
+              child = agg.copy(aggregateExpressions = newAggExprs.toSeq)
+            )
+          }
+        // TODO withOrigin?

Review Comment:
   No need. All the results from `resolveOperatorsUpWithPruning` will have `withOrigin` around it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1051207085


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAliasReference.scala:
##########
@@ -129,6 +163,45 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
               child = Project(innerProjectList.toSeq, child)
             )
           }
+
+        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if agg.resolved
+            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+
+          val newAggExprs = collection.mutable.Set.empty[NamedExpression]
+          val expressionMap = collection.mutable.LinkedHashMap.empty[Expression, NamedExpression]
+          val projectExprs = aggregateExpressions.map { exp =>
+            exp.transformDown {
+              case aggExpr: AggregateExpression =>
+                // Doesn't support referencing a lateral alias in aggregate function
+                if (aggExpr.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+                  aggExpr.collectFirst {
+                    case lcaRef: LateralColumnAliasReference =>
+                      throw QueryCompilationErrors.lateralColumnAliasInAggFuncUnsupportedError(
+                        lcaRef.nameParts, aggExpr)
+                  }
+                }
+                val ne = expressionMap.getOrElseUpdate(aggExpr.canonicalized, assignAlias(aggExpr))
+                newAggExprs += ne

Review Comment:
   Yes we do. This is the general strategy of this design, extract **all** aggregation functions or grouping expressions. It should be simple and avoid many extra considerations of code path (e.g., mark if the alias is used as lateral alias by later expressions).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1051251048


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAliasReference.scala:
##########
@@ -129,6 +163,45 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
               child = Project(innerProjectList.toSeq, child)
             )
           }
+
+        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if agg.resolved
+            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+
+          val newAggExprs = collection.mutable.Set.empty[NamedExpression]
+          val expressionMap = collection.mutable.LinkedHashMap.empty[Expression, NamedExpression]
+          val projectExprs = aggregateExpressions.map { exp =>
+            exp.transformDown {
+              case aggExpr: AggregateExpression =>
+                // Doesn't support referencing a lateral alias in aggregate function
+                if (aggExpr.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+                  aggExpr.collectFirst {
+                    case lcaRef: LateralColumnAliasReference =>
+                      throw QueryCompilationErrors.lateralColumnAliasInAggFuncUnsupportedError(
+                        lcaRef.nameParts, aggExpr)
+                  }
+                }
+                val ne = expressionMap.getOrElseUpdate(aggExpr.canonicalized, assignAlias(aggExpr))
+                newAggExprs += ne
+                ne.toAttribute
+              case e if groupingExpressions.exists(_.semanticEquals(e)) =>
+                // TODO one concern here, is condition here be able to match all grouping

Review Comment:
   I surprisingly found out that this existing query can't analyze:
   ```
   select 1 + dept + 10 from $testTable group by dept + 10
   -- error: [MISSING_AGGREGATION] The non-aggregating expression "dept" is based on columns which are not participating in the GROUP BY clause
   ```
   Seems in our checkAnalysis, we don't canonicalize to compare the expressions. It is structured as (1 + dept) + 10, and can't match the grouping expression (dept + 10).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1052738997


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -244,7 +303,67 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
               child = Project(innerProjectList.toSeq, child)
             )
           }
+
+        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if agg.resolved
+            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+
+          val newAggExprs = collection.mutable.Set.empty[NamedExpression]
+          val expressionMap = collection.mutable.LinkedHashMap.empty[Expression, NamedExpression]
+          val projectExprs = aggregateExpressions.map { exp =>
+            exp.transformDown {
+              case aggExpr: AggregateExpression =>
+                // Doesn't support referencing a lateral alias in aggregate function
+                if (aggExpr.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+                  aggExpr.collectFirst {
+                    case lcaRef: LateralColumnAliasReference =>
+                      throw QueryCompilationErrors.lateralColumnAliasInAggFuncUnsupportedError(
+                        lcaRef.nameParts, aggExpr)
+                  }
+                }
+                val ne = expressionMap.getOrElseUpdate(aggExpr.canonicalized, assignAlias(aggExpr))
+                newAggExprs += ne
+                ne.toAttribute
+              case e if groupingExpressions.exists(_.semanticEquals(e)) =>
+                val ne = expressionMap.getOrElseUpdate(e.canonicalized, assignAlias(e))
+                newAggExprs += ne
+                ne.toAttribute
+            }.asInstanceOf[NamedExpression]
+          }
+          if (newAggExprs.isEmpty) {
+            agg
+          } else {
+            // perform an early check on current Aggregate before any lift-up / push-down to throw
+            // the same exception such as non-aggregate expressions not in group by, which becomes
+            // missing input after transformation
+            earlyCheckAggregate(agg)

Review Comment:
   Yes, actually simplified from `checkAnalysis`.  If this solution works and all tests pass, I can move this method to Aggregate or AnalysisHelper object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1050336921


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAliasReference.scala:
##########
@@ -129,6 +163,45 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
               child = Project(innerProjectList.toSeq, child)
             )
           }
+
+        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if agg.resolved
+            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+
+          val newAggExprs = collection.mutable.Set.empty[NamedExpression]
+          val expressionMap = collection.mutable.LinkedHashMap.empty[Expression, NamedExpression]
+          val projectExprs = aggregateExpressions.map { exp =>
+            exp.transformDown {
+              case aggExpr: AggregateExpression =>
+                // Doesn't support referencing a lateral alias in aggregate function
+                if (aggExpr.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+                  aggExpr.collectFirst {
+                    case lcaRef: LateralColumnAliasReference =>
+                      throw QueryCompilationErrors.lateralColumnAliasInAggFuncUnsupportedError(
+                        lcaRef.nameParts, aggExpr)
+                  }
+                }
+                val ne = expressionMap.getOrElseUpdate(aggExpr.canonicalized, assignAlias(aggExpr))
+                newAggExprs += ne

Review Comment:
   So for the following query
   ```
   create table table_1(a int, b int, c int);
   select a as x, x+1, sum(b)+sum(c) from table_1 group by a;
   ```
   Do we need to extract `sum(b)` and `sum(c)` and put them separately in the new aggregation expressions?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1053617144


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -244,6 +303,61 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
               child = Project(innerProjectList.toSeq, child)
             )
           }
+
+        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if agg.resolved
+            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+
+          // Check if current Aggregate is eligible to lift up with Project: the aggregate
+          // expression only contains: 1) aggregate functions, 2) grouping expressions, 3) lateral
+          // column alias reference or 4) literals.
+          // This check is to prevent unnecessary transformation on invalid plan, to guarantee it
+          // throws the same exception. For example, cases like non-aggregate expressions not
+          // in group by, once transformed, will throw a different exception: missing input.
+          def eligibleToLiftUp(exp: Expression): Boolean = {
+            exp match {
+              case e: AggregateExpression if AggregateExpression.isAggregate(e) => true

Review Comment:
   oops, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
anchovYu commented on PR #39040:
URL: https://github.com/apache/spark/pull/39040#issuecomment-1355792680

   I will shortly address the left todos in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #39040:
URL: https://github.com/apache/spark/pull/39040#issuecomment-1351323107

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1052735150


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -244,7 +303,67 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
               child = Project(innerProjectList.toSeq, child)
             )
           }
+
+        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if agg.resolved
+            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+
+          val newAggExprs = collection.mutable.Set.empty[NamedExpression]
+          val expressionMap = collection.mutable.LinkedHashMap.empty[Expression, NamedExpression]
+          val projectExprs = aggregateExpressions.map { exp =>
+            exp.transformDown {
+              case aggExpr: AggregateExpression =>
+                // Doesn't support referencing a lateral alias in aggregate function
+                if (aggExpr.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+                  aggExpr.collectFirst {
+                    case lcaRef: LateralColumnAliasReference =>
+                      throw QueryCompilationErrors.lateralColumnAliasInAggFuncUnsupportedError(
+                        lcaRef.nameParts, aggExpr)
+                  }
+                }
+                val ne = expressionMap.getOrElseUpdate(aggExpr.canonicalized, assignAlias(aggExpr))
+                newAggExprs += ne
+                ne.toAttribute
+              case e if groupingExpressions.exists(_.semanticEquals(e)) =>
+                val ne = expressionMap.getOrElseUpdate(e.canonicalized, assignAlias(e))
+                newAggExprs += ne
+                ne.toAttribute
+            }.asInstanceOf[NamedExpression]
+          }
+          if (newAggExprs.isEmpty) {
+            agg
+          } else {
+            // perform an early check on current Aggregate before any lift-up / push-down to throw
+            // the same exception such as non-aggregate expressions not in group by, which becomes
+            // missing input after transformation
+            earlyCheckAggregate(agg)

Review Comment:
   This seems from checkAnalysis. Could you refactor and reduce duplicated code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1052986240


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -244,6 +303,61 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
               child = Project(innerProjectList.toSeq, child)
             )
           }
+
+        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if agg.resolved
+            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+
+          // Check if current Aggregate is eligible to lift up with Project: the aggregate
+          // expression only contains: 1) aggregate functions, 2) grouping expressions, 3) lateral
+          // column alias reference or 4) literals.
+          // This check is to prevent unnecessary transformation on invalid plan, to guarantee it
+          // throws the same exception. For example, cases like non-aggregate expressions not
+          // in group by, once transformed, will throw a different exception: missing input.
+          def eligibleToLiftUp(exp: Expression): Boolean = {
+            exp match {
+              case e: AggregateExpression if AggregateExpression.isAggregate(e) => true

Review Comment:
   ```suggestion
                 case e: Expression if AggregateExpression.isAggregate(e) => true
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -244,6 +303,61 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
               child = Project(innerProjectList.toSeq, child)
             )
           }
+
+        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if agg.resolved
+            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+
+          // Check if current Aggregate is eligible to lift up with Project: the aggregate
+          // expression only contains: 1) aggregate functions, 2) grouping expressions, 3) lateral
+          // column alias reference or 4) literals.
+          // This check is to prevent unnecessary transformation on invalid plan, to guarantee it
+          // throws the same exception. For example, cases like non-aggregate expressions not
+          // in group by, once transformed, will throw a different exception: missing input.
+          def eligibleToLiftUp(exp: Expression): Boolean = {
+            exp match {
+              case e: AggregateExpression if AggregateExpression.isAggregate(e) => true

Review Comment:
   ```suggestion
                 case e if AggregateExpression.isAggregate(e) => true
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #39040: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1047597647


##########
sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala:
##########
@@ -268,46 +507,87 @@ class LateralColumnAliasSuite extends QueryTest with SharedSparkSession {
   }
   // TODO: more tests on LCA in subquery
 
-  test("Lateral alias of a complex type - Project") {
-    checkAnswer(
-      sql("SELECT named_struct('a', 1) AS foo, foo.a + 1 AS bar, bar + 1"),
-      Row(Row(1), 2, 3))
-
-    checkAnswer(
-      sql("SELECT named_struct('a', named_struct('b', 1)) AS foo, foo.a.b + 1 AS bar"),
-      Row(Row(Row(1)), 2)
-    )
+  test("Lateral alias conflicts with OuterReference - Aggregate") {
+    // test if lca rule strips the OuterReference and resolves to lateral alias
+    val query =

Review Comment:
   @cloud-fan  As we discussed yesterday, I tried to just plainly substitute the id with the expression, and yes it no longer throws exception during analysis, but during execution.
   
   * When LCA is off, the id is resolved to outer reference, it throws exception during analysis:
     [UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.NON_CORRELATED_COLUMNS_IN_GROUP_BY] Unsupported subquery expression: A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns
   * When LCA is on, or when LCA is off but just replace the id with the expression, it throws exception during execution:
     [SCALAR_SUBQUERY_TOO_MANY_ROWS] More than one row returned by a subquery used as an expression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1048017023


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAliasReference.scala:
##########
@@ -129,6 +163,45 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
               child = Project(innerProjectList.toSeq, child)
             )
           }
+
+        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if agg.resolved
+            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+
+          val newAggExprs = collection.mutable.Set.empty[NamedExpression]
+          val expressionMap = collection.mutable.LinkedHashMap.empty[Expression, NamedExpression]
+          val projectExprs = aggregateExpressions.map { exp =>
+            exp.transformDown {
+              case aggExpr: AggregateExpression =>
+                // Doesn't support referencing a lateral alias in aggregate function
+                if (aggExpr.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+                  aggExpr.collectFirst {
+                    case lcaRef: LateralColumnAliasReference =>
+                      throw QueryCompilationErrors.lateralColumnAliasInAggFuncUnsupportedError(
+                        lcaRef.nameParts, aggExpr)
+                  }
+                }
+                val ne = expressionMap.getOrElseUpdate(aggExpr.canonicalized, assignAlias(aggExpr))
+                newAggExprs += ne

Review Comment:
   No, actually all the grouping expression and aggregate expressions in the aggregate list should be extracted,  otherwise they will be unresolved from the upper Project, correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #39040: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1047989425


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAliasReference.scala:
##########
@@ -129,6 +163,45 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
               child = Project(innerProjectList.toSeq, child)
             )
           }
+
+        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _) if agg.resolved
+            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+
+          val newAggExprs = collection.mutable.Set.empty[NamedExpression]
+          val expressionMap = collection.mutable.LinkedHashMap.empty[Expression, NamedExpression]
+          val projectExprs = aggregateExpressions.map { exp =>
+            exp.transformDown {
+              case aggExpr: AggregateExpression =>
+                // Doesn't support referencing a lateral alias in aggregate function
+                if (aggExpr.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+                  aggExpr.collectFirst {
+                    case lcaRef: LateralColumnAliasReference =>
+                      throw QueryCompilationErrors.lateralColumnAliasInAggFuncUnsupportedError(
+                        lcaRef.nameParts, aggExpr)
+                  }
+                }
+                val ne = expressionMap.getOrElseUpdate(aggExpr.canonicalized, assignAlias(aggExpr))
+                newAggExprs += ne

Review Comment:
   So we are extracting all AggregateExpression and grouping expressions from `aggregateExpressions`. Shall we just extract the ones which are required by the lateral column aliases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1052973933


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -182,6 +183,157 @@ object AnalysisContext {
   }
 }
 
+object Analyzer extends Logging {

Review Comment:
   This is even more change than reverting https://github.com/apache/spark/pull/39054 ...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #39040: [SPARK-41631][SQL] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #39040:
URL: https://github.com/apache/spark/pull/39040#issuecomment-1360784531

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #39040: [SPARK-27561][SQL][FOLLOWUP] Support implicit lateral column alias resolution on Aggregate

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #39040:
URL: https://github.com/apache/spark/pull/39040#discussion_r1052735930


##########
sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala:
##########
@@ -689,4 +713,38 @@ class LateralColumnAliasSuite extends LateralColumnAliasSuiteBase {
             s"after extracted from Alias. Current aggregateExpressions: $aggregateExpressions")
     }
   }
+
+  test("Non-aggregating expressions not in group by still throws the same error") {
+    // query without lateral alias
+    assert(

Review Comment:
   use `checkError`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org