You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/23 21:05:21 UTC

[GitHub] [spark] anchovYu opened a new pull request, #38776: [WIP] Refactor Analyzer by moving several public methods to the Analyzer object

anchovYu opened a new pull request, #38776:
URL: https://github.com/apache/spark/pull/38776

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   Create a new Analyzer object, move several public methods originally in Analyzer class to it.
   
   ### Why are the changes needed?
   This refactor enables adding new rules as independent objects instead of objects in Analyzer due to the need to call these original private functions.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   Existing tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1042953641


##########
sql/core/src/test/scala/org/apache/spark/sql/LateralColumnAliasSuite.scala:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class LateralColumnAliasSuite extends QueryTest with SharedSparkSession {
+  protected val testTable: String = "employee"
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    sql(
+      s"""
+         |CREATE TABLE $testTable (
+         |  dept INTEGER,
+         |  name String,
+         |  salary INTEGER,
+         |  bonus INTEGER,
+         |  properties STRUCT<joinYear INTEGER, mostRecentEmployer STRING>)
+         |USING orc
+         |""".stripMargin)
+    sql(
+      s"""
+         |INSERT INTO $testTable VALUES
+         |  (1, 'amy', 10000, 1000, named_struct('joinYear', 2019, 'mostRecentEmployer', 'A')),
+         |  (2, 'alex', 12000, 1200, named_struct('joinYear', 2017, 'mostRecentEmployer', 'A')),
+         |  (1, 'cathy', 9000, 1200, named_struct('joinYear', 2020, 'mostRecentEmployer', 'B')),
+         |  (2, 'david', 10000, 1300, named_struct('joinYear', 2019, 'mostRecentEmployer', 'C')),
+         |  (6, 'jen', 12000, 1200, named_struct('joinYear', 2018, 'mostRecentEmployer', 'D'))
+         |""".stripMargin)
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      sql(s"DROP TABLE IF EXISTS $testTable")
+    } finally {
+      super.afterAll()
+    }
+  }
+
+  val lcaEnabled: Boolean = true
+  override protected def test(testName: String, testTags: Tag*)(testFun: => Any)
+    (implicit pos: Position): Unit = {
+    super.test(testName, testTags: _*) {
+      withSQLConf(SQLConf.LATERAL_COLUMN_ALIAS_IMPLICIT_ENABLED.key -> lcaEnabled.toString) {
+        testFun
+      }
+    }
+  }
+
+  private def withLCAOff(f: => Unit): Unit = {
+    withSQLConf(SQLConf.LATERAL_COLUMN_ALIAS_IMPLICIT_ENABLED.key -> "false") {
+      f
+    }
+  }
+  private def withLCAOn(f: => Unit): Unit = {
+    withSQLConf(SQLConf.LATERAL_COLUMN_ALIAS_IMPLICIT_ENABLED.key -> "true") {
+      f
+    }
+  }
+
+  test("Lateral alias basics - Project") {
+    checkAnswer(sql(s"select dept as d, d + 1 as e from $testTable where name = 'amy'"),

Review Comment:
   Shall we have a new test suite `LateralColumnAliasDisabledSuite` which overrides the `checkAnswer` method (or a new validation method) for checking exceptions? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1041599676


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {

Review Comment:
   This method is quite long...Shall we reduce its size?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1041325612


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {
+    // phase 1: wrap
+    val rewrittenPlan = plan.resolveOperatorsUpWithPruning(
+      _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) {
+      case p @ Project(projectList, child) if p.childrenResolved
+        && !Analyzer.containsStar(projectList)
+        && projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) =>
+
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        def wrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) {
+            case o: OuterReference
+              if aliasMap.contains(o.nameParts.map(_.head).getOrElse(o.name)) =>
+              val nameParts = o.nameParts.getOrElse(Seq(o.name))
+              val aliases = aliasMap.get(nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(nameParts, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, nameParts))
+                    .getOrElse(o)
+                case _ => o
+              }
+            case u: UnresolvedAttribute if aliasMap.contains(u.nameParts.head) &&
+              Analyzer.resolveExpressionByPlanChildren(u, p, resolver)
+                .isInstanceOf[UnresolvedAttribute] =>
+              val aliases = aliasMap.get(u.nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(u.name, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(u.nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, u.nameParts))
+                    .getOrElse(u)
+                case _ => u
+              }
+          }.asInstanceOf[NamedExpression]
+        }
+        val newProjectList = projectList.zipWithIndex.map {
+          case (a: Alias, idx) =>
+            val lcaWrapped = wrapLCAReference(a).asInstanceOf[Alias]
+            // Insert the LCA-resolved alias instead of the unresolved one into map. If it is
+            // resolved, it can be referenced as LCA by later expressions (chaining).
+            // Unresolved Alias is also added to the map to perform ambiguous name check, but only
+            // resolved alias can be LCA
+            aliasMap = insertIntoAliasMap(lcaWrapped, idx, aliasMap)
+            lcaWrapped
+          case (e, _) =>
+            wrapLCAReference(e)
+        }
+        p.copy(projectList = newProjectList)
+    }
+
+    // phase 2: unwrap
+    rewrittenPlan.resolveOperatorsUpWithPruning(
+      _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), UnknownRuleId) {
+      case p @ Project(projectList, child) if p.resolved
+        && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+        // build the map again in case the project list changes and index goes off
+        // TODO one risk: is there any rule that strips off /add the Alias? that the LCA is resolved
+        //  in the beginning, but when it comes to push down, it really can't find the matching one?
+        //  Restore back to UnresolvedAttribute.
+        //  Also, when resolving from bottom up should I worry about cases like:
+        //  Project [b AS c, c + 1 AS d]
+        //  +- Project [1 AS a, a AS b]
+        //  b AS c is resolved, even b refers to an alias contains the lateral alias?
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        val referencedAliases = collection.mutable.Set.empty[AliasEntry]
+        def unwrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+            case lcaRef: LateralColumnAliasReference if aliasMap.contains(lcaRef.nameParts.head) =>
+              val aliasEntry = aliasMap.get(lcaRef.nameParts.head).get.head
+              if (!aliasEntry.alias.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+                // If there is no chaining, push down the alias and resolve the attribute by
+                // constructing a dummy plan
+                referencedAliases += aliasEntry
+                // Implementation notes (to-delete):
+                // this is a design decision whether to restore the UnresolvedAttribute, or
+                // directly resolve by constructing a plan and using resolveExpressionByPlanChildren
+                resolveByLateralAlias(lcaRef.nameParts, aliasEntry.alias).getOrElse(lcaRef)

Review Comment:
   I think it is safe to resolve it one more time, in case the alias or the field changes or is reconstructed (different exprId).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1044238165


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -638,6 +638,14 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
               case UnresolvedWindowExpression(_, windowSpec) =>
                 throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name)
             })
+            // This should not happen, resolved Project or Aggregate should restore or resolve
+            // all lateral column alias references. Add check for extra safe.

Review Comment:
   shall we have a rule like `RemoveTempResolvedColumn` to restore `LateralColumnAliasReference`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1046592935


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -424,8 +424,51 @@ case class OuterReference(e: NamedExpression)
   override def qualifier: Seq[String] = e.qualifier
   override def exprId: ExprId = e.exprId
   override def toAttribute: Attribute = e.toAttribute
-  override def newInstance(): NamedExpression = OuterReference(e.newInstance())
+  override def newInstance(): NamedExpression =
+    OuterReference(e.newInstance()).setNameParts(nameParts)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
+
+  // optional field, the original name parts of UnresolvedAttribute before it is resolved to
+  // OuterReference. Used in rule ResolveLateralColumnAlias to convert OuterReference back to
+  // LateralColumnAliasReference.
+  var nameParts: Option[Seq[String]] = None

Review Comment:
   QQ: I didn't add it in the constructor of `OuterReference` due to binary compatibility. Is that concern valid? ~~Actually, what is the risk to change the constructor, but also write another `unapply` function?~~ This seems impossible without introducing a new object with another name, and still requires large portion of code change of pattern matching.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1044744657


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -638,6 +638,14 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
               case UnresolvedWindowExpression(_, windowSpec) =>
                 throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name)
             })
+            // This should not happen, resolved Project or Aggregate should restore or resolve
+            // all lateral column alias references. Add check for extra safe.

Review Comment:
   I didn't add it intentionally. This is because I don't want those attributes actually can be resolve as LCA but to show in the error msg as UnresolvedAttribute. Also note that unlike RemoveTempResolvedColumn, LCARef can't be directly resolved to the NamedExpression inside of it because the plan won't be right - there is no alias push down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1044068904


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1761,6 +1763,114 @@ class Analyzer(override val catalogManager: CatalogManager)
     }
   }
 
+  /**
+   * The first phase to resolve lateral column alias. See comments in
+   * [[ResolveLateralColumnAliasReference]] for more detailed explanation.
+   */
+  object WrapLateralColumnAliasReference extends Rule[LogicalPlan] {
+    import ResolveLateralColumnAliasReference.AliasEntry
+
+    private def insertIntoAliasMap(
+        a: Alias,
+        idx: Int,
+        aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+      val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+      aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+    }
+
+    /**
+     * Use the given lateral alias to resolve the unresolved attribute with the name parts.
+     *
+     * Construct a dummy plan with the given lateral alias as project list, use the output of the
+     * plan to resolve.
+     * @return The resolved [[LateralColumnAliasReference]] if succeeds. None if fails to resolve.
+     */
+    private def resolveByLateralAlias(
+        nameParts: Seq[String], lateralAlias: Alias): Option[LateralColumnAliasReference] = {
+      // TODO question: everytime it resolves the extract field it generates a new exprId.
+      //  Does it matter?
+      val resolvedAttr = resolveExpressionByPlanOutput(
+        expr = UnresolvedAttribute(nameParts),
+        plan = Project(Seq(lateralAlias), OneRowRelation()),

Review Comment:
   it's simpler to use `LocalRelation(Seq(lateralAlias.toAttribute))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on PR #38776:
URL: https://github.com/apache/spark/pull/38776#issuecomment-1342094831

   LGTM except minor comments. Great work, @anchovYu 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1042951934


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {
+    // phase 1: wrap
+    val rewrittenPlan = plan.resolveOperatorsUpWithPruning(
+      _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) {
+      case p @ Project(projectList, child) if p.childrenResolved
+        && !Analyzer.containsStar(projectList)
+        && projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) =>
+
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        def wrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) {
+            case o: OuterReference
+              if aliasMap.contains(o.nameParts.map(_.head).getOrElse(o.name)) =>

Review Comment:
   I am not familiar with `OuterReference`. But why only check the head here? Shouldn't we check the ending name part?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1041275311


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {
+    // phase 1: wrap
+    val rewrittenPlan = plan.resolveOperatorsUpWithPruning(
+      _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) {
+      case p @ Project(projectList, child) if p.childrenResolved
+        && !Analyzer.containsStar(projectList)
+        && projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) =>
+
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        def wrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) {
+            case o: OuterReference
+              if aliasMap.contains(o.nameParts.map(_.head).getOrElse(o.name)) =>
+              val nameParts = o.nameParts.getOrElse(Seq(o.name))
+              val aliases = aliasMap.get(nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(nameParts, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, nameParts))
+                    .getOrElse(o)
+                case _ => o
+              }
+            case u: UnresolvedAttribute if aliasMap.contains(u.nameParts.head) &&
+              Analyzer.resolveExpressionByPlanChildren(u, p, resolver)
+                .isInstanceOf[UnresolvedAttribute] =>
+              val aliases = aliasMap.get(u.nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(u.name, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(u.nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, u.nameParts))
+                    .getOrElse(u)
+                case _ => u
+              }
+          }.asInstanceOf[NamedExpression]
+        }
+        val newProjectList = projectList.zipWithIndex.map {
+          case (a: Alias, idx) =>
+            val lcaWrapped = wrapLCAReference(a).asInstanceOf[Alias]
+            // Insert the LCA-resolved alias instead of the unresolved one into map. If it is
+            // resolved, it can be referenced as LCA by later expressions (chaining).
+            // Unresolved Alias is also added to the map to perform ambiguous name check, but only
+            // resolved alias can be LCA
+            aliasMap = insertIntoAliasMap(lcaWrapped, idx, aliasMap)
+            lcaWrapped
+          case (e, _) =>
+            wrapLCAReference(e)
+        }
+        p.copy(projectList = newProjectList)
+    }
+
+    // phase 2: unwrap
+    rewrittenPlan.resolveOperatorsUpWithPruning(
+      _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), UnknownRuleId) {
+      case p @ Project(projectList, child) if p.resolved
+        && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+        // build the map again in case the project list changes and index goes off
+        // TODO one risk: is there any rule that strips off /add the Alias? that the LCA is resolved
+        //  in the beginning, but when it comes to push down, it really can't find the matching one?

Review Comment:
   Right, I worried that the alias is turned into an attribute in the Project list (in non-top level, I assume LCA can happen in non-top level?). Grad there isn't such case, but even there is, as this PR does, restore back to `UnresolvedAttribute` should be safe to just throw exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1046020925


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -638,6 +638,14 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
               case UnresolvedWindowExpression(_, windowSpec) =>
                 throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name)
             })
+            // This should not happen, resolved Project or Aggregate should restore or resolve
+            // all lateral column alias references. Add check for extra safe.

Review Comment:
   If this should not happen, we should throw an internal error `SparkThrowable.internalError`, so that it can include more debug information, instead of `UNRESOLVED_COLUMN`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1041275311


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {
+    // phase 1: wrap
+    val rewrittenPlan = plan.resolveOperatorsUpWithPruning(
+      _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) {
+      case p @ Project(projectList, child) if p.childrenResolved
+        && !Analyzer.containsStar(projectList)
+        && projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) =>
+
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        def wrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) {
+            case o: OuterReference
+              if aliasMap.contains(o.nameParts.map(_.head).getOrElse(o.name)) =>
+              val nameParts = o.nameParts.getOrElse(Seq(o.name))
+              val aliases = aliasMap.get(nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(nameParts, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, nameParts))
+                    .getOrElse(o)
+                case _ => o
+              }
+            case u: UnresolvedAttribute if aliasMap.contains(u.nameParts.head) &&
+              Analyzer.resolveExpressionByPlanChildren(u, p, resolver)
+                .isInstanceOf[UnresolvedAttribute] =>
+              val aliases = aliasMap.get(u.nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(u.name, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(u.nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, u.nameParts))
+                    .getOrElse(u)
+                case _ => u
+              }
+          }.asInstanceOf[NamedExpression]
+        }
+        val newProjectList = projectList.zipWithIndex.map {
+          case (a: Alias, idx) =>
+            val lcaWrapped = wrapLCAReference(a).asInstanceOf[Alias]
+            // Insert the LCA-resolved alias instead of the unresolved one into map. If it is
+            // resolved, it can be referenced as LCA by later expressions (chaining).
+            // Unresolved Alias is also added to the map to perform ambiguous name check, but only
+            // resolved alias can be LCA
+            aliasMap = insertIntoAliasMap(lcaWrapped, idx, aliasMap)
+            lcaWrapped
+          case (e, _) =>
+            wrapLCAReference(e)
+        }
+        p.copy(projectList = newProjectList)
+    }
+
+    // phase 2: unwrap
+    rewrittenPlan.resolveOperatorsUpWithPruning(
+      _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), UnknownRuleId) {
+      case p @ Project(projectList, child) if p.resolved
+        && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+        // build the map again in case the project list changes and index goes off
+        // TODO one risk: is there any rule that strips off /add the Alias? that the LCA is resolved
+        //  in the beginning, but when it comes to push down, it really can't find the matching one?

Review Comment:
   Right, I worried that the alias is turned into an attribute in the Project list (in non-top level, I assume LCA can happen in non-top level?). Glad there isn't such case, but even there is, as this PR does, restore back to `UnresolvedAttribute` should be safe to just throw exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1041324238


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {
+    // phase 1: wrap
+    val rewrittenPlan = plan.resolveOperatorsUpWithPruning(
+      _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) {
+      case p @ Project(projectList, child) if p.childrenResolved
+        && !Analyzer.containsStar(projectList)
+        && projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) =>
+
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        def wrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) {
+            case o: OuterReference
+              if aliasMap.contains(o.nameParts.map(_.head).getOrElse(o.name)) =>
+              val nameParts = o.nameParts.getOrElse(Seq(o.name))
+              val aliases = aliasMap.get(nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(nameParts, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, nameParts))
+                    .getOrElse(o)
+                case _ => o
+              }
+            case u: UnresolvedAttribute if aliasMap.contains(u.nameParts.head) &&
+              Analyzer.resolveExpressionByPlanChildren(u, p, resolver)
+                .isInstanceOf[UnresolvedAttribute] =>
+              val aliases = aliasMap.get(u.nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(u.name, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(u.nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, u.nameParts))
+                    .getOrElse(u)
+                case _ => u
+              }
+          }.asInstanceOf[NamedExpression]
+        }
+        val newProjectList = projectList.zipWithIndex.map {
+          case (a: Alias, idx) =>
+            val lcaWrapped = wrapLCAReference(a).asInstanceOf[Alias]
+            // Insert the LCA-resolved alias instead of the unresolved one into map. If it is
+            // resolved, it can be referenced as LCA by later expressions (chaining).
+            // Unresolved Alias is also added to the map to perform ambiguous name check, but only
+            // resolved alias can be LCA
+            aliasMap = insertIntoAliasMap(lcaWrapped, idx, aliasMap)
+            lcaWrapped
+          case (e, _) =>
+            wrapLCAReference(e)
+        }
+        p.copy(projectList = newProjectList)
+    }
+
+    // phase 2: unwrap
+    rewrittenPlan.resolveOperatorsUpWithPruning(
+      _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), UnknownRuleId) {
+      case p @ Project(projectList, child) if p.resolved
+        && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+        // build the map again in case the project list changes and index goes off
+        // TODO one risk: is there any rule that strips off /add the Alias? that the LCA is resolved
+        //  in the beginning, but when it comes to push down, it really can't find the matching one?

Review Comment:
   Any rule to add another layer of alias?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #38776:
URL: https://github.com/apache/spark/pull/38776#issuecomment-1348930565

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1044242093


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAliasReference.scala:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, LateralColumnAliasReference, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule is the second phase to resolve lateral column alias.
+ *
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise, it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
+  case class AliasEntry(alias: Alias, index: Int)
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    if (!conf.getConf(SQLConf.LATERAL_COLUMN_ALIAS_IMPLICIT_ENABLED)) {
+      plan
+    } else {
+      // phase 2: unwrap
+      plan.resolveOperatorsUpWithPruning(
+        _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), ruleId) {
+        case p @ Project(projectList, child) if p.resolved
+          && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+          var aliasMap = Map[Attribute, AliasEntry]()

Review Comment:
   we have a dedicated `AttributeMap`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1043649041


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {
+    // phase 1: wrap
+    val rewrittenPlan = plan.resolveOperatorsUpWithPruning(
+      _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) {
+      case p @ Project(projectList, child) if p.childrenResolved
+        && !Analyzer.containsStar(projectList)
+        && projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) =>
+
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        def wrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) {
+            case o: OuterReference
+              if aliasMap.contains(o.nameParts.map(_.head).getOrElse(o.name)) =>

Review Comment:
   The general process of resolving an `OuterReference` is, given an `UnresolvedAttribute` with name parts, try to use the outer plan to resolve it. Once succeeds, it is resolved to an `OuterReference`. In this PR, I extend the OuterReference case class with a new `nameParts` field, saving the original `nameParts` from `UnresolvedAttribute`. This is because the `nameParts` are still needed to resolve lateral aliases.
   
   On the reason why it is matched on the head, it is because the lateral alias reference can only be as the first str in the nameParts. For example, `SELECT named_struct('a', 1) AS foo, foo.a`, the `foo.a` has name parts `("foo", "a")`, only the head `foo` can reference the lateral alias. It can't be `a`, because lateral alias reference won't have any qualifier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
anchovYu commented on PR #38776:
URL: https://github.com/apache/spark/pull/38776#issuecomment-1330983195

   @cloud-fan @gengliangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on pull request #38776: [SPARK-41630][SQL] Support implicit lateral column alias resolution on Project

Posted by GitBox <gi...@apache.org>.
anchovYu commented on PR #38776:
URL: https://github.com/apache/spark/pull/38776#issuecomment-1359958872

   Hi @gengliangwang , I opened subtask for Project in the original parent JIRA: https://issues.apache.org/jira/browse/SPARK-41630. I also updated the title of this PR. Could you help me update the status of that JIRA (assignee, resolved, etc)? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1045697849


##########
sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala:
##########
@@ -49,7 +49,8 @@ class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)])
 
   override def contains(k: Attribute): Boolean = get(k).isDefined
 
-  override def + [B1 >: A](kv: (Attribute, B1)): Map[Attribute, B1] = baseMap.values.toMap + kv
+  override def + [B1 >: A](kv: (Attribute, B1)): AttributeMap[B1] =

Review Comment:
   we should update the one in `sql/catalyst/src/main/scala-2.12`



##########
sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala:
##########
@@ -49,7 +49,8 @@ class AttributeMap[A](val baseMap: Map[ExprId, (Attribute, A)])
 
   override def contains(k: Attribute): Boolean = get(k).isDefined
 
-  override def + [B1 >: A](kv: (Attribute, B1)): Map[Attribute, B1] = baseMap.values.toMap + kv
+  override def + [B1 >: A](kv: (Attribute, B1)): AttributeMap[B1] =

Review Comment:
   we should update the one in `sql/catalyst/src/main/scala-2.13`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1046592935


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -424,8 +424,51 @@ case class OuterReference(e: NamedExpression)
   override def qualifier: Seq[String] = e.qualifier
   override def exprId: ExprId = e.exprId
   override def toAttribute: Attribute = e.toAttribute
-  override def newInstance(): NamedExpression = OuterReference(e.newInstance())
+  override def newInstance(): NamedExpression =
+    OuterReference(e.newInstance()).setNameParts(nameParts)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
+
+  // optional field, the original name parts of UnresolvedAttribute before it is resolved to
+  // OuterReference. Used in rule ResolveLateralColumnAlias to convert OuterReference back to
+  // LateralColumnAliasReference.
+  var nameParts: Option[Seq[String]] = None

Review Comment:
   QQ: I didn't add it in the constructor of `OuterReference` due to binary compatibility. Is that concern valid? Actually, what is the risk to change the constructor, but also write another `unapply` function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1046022337


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -424,8 +424,51 @@ case class OuterReference(e: NamedExpression)
   override def qualifier: Seq[String] = e.qualifier
   override def exprId: ExprId = e.exprId
   override def toAttribute: Attribute = e.toAttribute
-  override def newInstance(): NamedExpression = OuterReference(e.newInstance())
+  override def newInstance(): NamedExpression =
+    OuterReference(e.newInstance()).setNameParts(nameParts)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
+
+  // optional field, the original name parts of UnresolvedAttribute before it is resolved to
+  // OuterReference. Used in rule ResolveLateralColumnAlias to convert OuterReference back to
+  // LateralColumnAliasReference.
+  var nameParts: Option[Seq[String]] = None

Review Comment:
   If we have to keep a mutable state, `TreeNodeTag` is a better choice. Directly using `var` in catalyst TreeNode is strongly discouraged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1041629600


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {
+    // phase 1: wrap
+    val rewrittenPlan = plan.resolveOperatorsUpWithPruning(
+      _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) {
+      case p @ Project(projectList, child) if p.childrenResolved
+        && !Analyzer.containsStar(projectList)
+        && projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) =>
+
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        def wrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) {
+            case o: OuterReference
+              if aliasMap.contains(o.nameParts.map(_.head).getOrElse(o.name)) =>
+              val nameParts = o.nameParts.getOrElse(Seq(o.name))
+              val aliases = aliasMap.get(nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(nameParts, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, nameParts))
+                    .getOrElse(o)
+                case _ => o
+              }
+            case u: UnresolvedAttribute if aliasMap.contains(u.nameParts.head) &&
+              Analyzer.resolveExpressionByPlanChildren(u, p, resolver)
+                .isInstanceOf[UnresolvedAttribute] =>
+              val aliases = aliasMap.get(u.nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(u.name, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(u.nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, u.nameParts))
+                    .getOrElse(u)
+                case _ => u
+              }
+          }.asInstanceOf[NamedExpression]
+        }
+        val newProjectList = projectList.zipWithIndex.map {
+          case (a: Alias, idx) =>
+            val lcaWrapped = wrapLCAReference(a).asInstanceOf[Alias]
+            // Insert the LCA-resolved alias instead of the unresolved one into map. If it is
+            // resolved, it can be referenced as LCA by later expressions (chaining).
+            // Unresolved Alias is also added to the map to perform ambiguous name check, but only
+            // resolved alias can be LCA
+            aliasMap = insertIntoAliasMap(lcaWrapped, idx, aliasMap)
+            lcaWrapped
+          case (e, _) =>
+            wrapLCAReference(e)
+        }
+        p.copy(projectList = newProjectList)
+    }
+
+    // phase 2: unwrap
+    rewrittenPlan.resolveOperatorsUpWithPruning(
+      _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), UnknownRuleId) {
+      case p @ Project(projectList, child) if p.resolved
+        && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+        // build the map again in case the project list changes and index goes off
+        // TODO one risk: is there any rule that strips off /add the Alias? that the LCA is resolved
+        //  in the beginning, but when it comes to push down, it really can't find the matching one?

Review Comment:
   I can't think of any



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1042951497


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {

Review Comment:
   I think wrapLCAReference and unwrapLCAReference can move out of this method..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1040959706


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {
+    // phase 1: wrap
+    val rewrittenPlan = plan.resolveOperatorsUpWithPruning(
+      _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) {
+      case p @ Project(projectList, child) if p.childrenResolved
+        && !Analyzer.containsStar(projectList)
+        && projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) =>
+
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        def wrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) {
+            case o: OuterReference
+              if aliasMap.contains(o.nameParts.map(_.head).getOrElse(o.name)) =>
+              val nameParts = o.nameParts.getOrElse(Seq(o.name))
+              val aliases = aliasMap.get(nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(nameParts, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, nameParts))
+                    .getOrElse(o)
+                case _ => o
+              }
+            case u: UnresolvedAttribute if aliasMap.contains(u.nameParts.head) &&
+              Analyzer.resolveExpressionByPlanChildren(u, p, resolver)
+                .isInstanceOf[UnresolvedAttribute] =>
+              val aliases = aliasMap.get(u.nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(u.name, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(u.nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, u.nameParts))
+                    .getOrElse(u)
+                case _ => u
+              }
+          }.asInstanceOf[NamedExpression]
+        }
+        val newProjectList = projectList.zipWithIndex.map {
+          case (a: Alias, idx) =>
+            val lcaWrapped = wrapLCAReference(a).asInstanceOf[Alias]
+            // Insert the LCA-resolved alias instead of the unresolved one into map. If it is
+            // resolved, it can be referenced as LCA by later expressions (chaining).
+            // Unresolved Alias is also added to the map to perform ambiguous name check, but only
+            // resolved alias can be LCA
+            aliasMap = insertIntoAliasMap(lcaWrapped, idx, aliasMap)
+            lcaWrapped
+          case (e, _) =>
+            wrapLCAReference(e)
+        }
+        p.copy(projectList = newProjectList)
+    }
+
+    // phase 2: unwrap
+    rewrittenPlan.resolveOperatorsUpWithPruning(
+      _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), UnknownRuleId) {
+      case p @ Project(projectList, child) if p.resolved
+        && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+        // build the map again in case the project list changes and index goes off
+        // TODO one risk: is there any rule that strips off /add the Alias? that the LCA is resolved
+        //  in the beginning, but when it comes to push down, it really can't find the matching one?
+        //  Restore back to UnresolvedAttribute.
+        //  Also, when resolving from bottom up should I worry about cases like:
+        //  Project [b AS c, c + 1 AS d]
+        //  +- Project [1 AS a, a AS b]
+        //  b AS c is resolved, even b refers to an alias contains the lateral alias?
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        val referencedAliases = collection.mutable.Set.empty[AliasEntry]
+        def unwrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+            case lcaRef: LateralColumnAliasReference if aliasMap.contains(lcaRef.nameParts.head) =>
+              val aliasEntry = aliasMap.get(lcaRef.nameParts.head).get.head
+              if (!aliasEntry.alias.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+                // If there is no chaining, push down the alias and resolve the attribute by
+                // constructing a dummy plan
+                referencedAliases += aliasEntry
+                // Implementation notes (to-delete):
+                // this is a design decision whether to restore the UnresolvedAttribute, or
+                // directly resolve by constructing a plan and using resolveExpressionByPlanChildren
+                resolveByLateralAlias(lcaRef.nameParts, aliasEntry.alias).getOrElse(lcaRef)

Review Comment:
   I'm confused, why not just return `lcaRef.ne`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1040956663


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {
+    // phase 1: wrap
+    val rewrittenPlan = plan.resolveOperatorsUpWithPruning(
+      _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) {
+      case p @ Project(projectList, child) if p.childrenResolved
+        && !Analyzer.containsStar(projectList)
+        && projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) =>
+
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        def wrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) {
+            case o: OuterReference
+              if aliasMap.contains(o.nameParts.map(_.head).getOrElse(o.name)) =>
+              val nameParts = o.nameParts.getOrElse(Seq(o.name))
+              val aliases = aliasMap.get(nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(nameParts, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, nameParts))
+                    .getOrElse(o)
+                case _ => o
+              }
+            case u: UnresolvedAttribute if aliasMap.contains(u.nameParts.head) &&
+              Analyzer.resolveExpressionByPlanChildren(u, p, resolver)
+                .isInstanceOf[UnresolvedAttribute] =>
+              val aliases = aliasMap.get(u.nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(u.name, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(u.nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, u.nameParts))
+                    .getOrElse(u)
+                case _ => u
+              }
+          }.asInstanceOf[NamedExpression]
+        }
+        val newProjectList = projectList.zipWithIndex.map {
+          case (a: Alias, idx) =>
+            val lcaWrapped = wrapLCAReference(a).asInstanceOf[Alias]
+            // Insert the LCA-resolved alias instead of the unresolved one into map. If it is
+            // resolved, it can be referenced as LCA by later expressions (chaining).
+            // Unresolved Alias is also added to the map to perform ambiguous name check, but only
+            // resolved alias can be LCA
+            aliasMap = insertIntoAliasMap(lcaWrapped, idx, aliasMap)
+            lcaWrapped
+          case (e, _) =>
+            wrapLCAReference(e)
+        }
+        p.copy(projectList = newProjectList)
+    }
+
+    // phase 2: unwrap
+    rewrittenPlan.resolveOperatorsUpWithPruning(
+      _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), UnknownRuleId) {
+      case p @ Project(projectList, child) if p.resolved
+        && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+        // build the map again in case the project list changes and index goes off
+        // TODO one risk: is there any rule that strips off /add the Alias? that the LCA is resolved
+        //  in the beginning, but when it comes to push down, it really can't find the matching one?
+        //  Restore back to UnresolvedAttribute.
+        //  Also, when resolving from bottom up should I worry about cases like:
+        //  Project [b AS c, c + 1 AS d]
+        //  +- Project [1 AS a, a AS b]
+        //  b AS c is resolved, even b refers to an alias contains the lateral alias?

Review Comment:
   If we resolve LCA bottom up, `Project [1 AS a, a AS b]` will become `Project [a, a AS b] <- Project [1 as a]` first?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1040031835


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -258,6 +417,17 @@ class Analyzer(override val catalogManager: CatalogManager)
     TypeCoercion.typeCoercionRules
   }
 
+  private def resolveExpressionByPlanOutput(

Review Comment:
   I alias in the class because the public method in Object needs a `resolver` as parameter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1037747864


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -258,6 +417,17 @@ class Analyzer(override val catalogManager: CatalogManager)
     TypeCoercion.typeCoercionRules
   }
 
+  private def resolveExpressionByPlanOutput(

Review Comment:
   seems we can just `import Analyzer._` in `class Analyzer`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1043652296


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {

Review Comment:
   For future refactoring of resolving columns by @cloud-fan , I will split this rule into two rules by phases for now. After that refactoring, the wrapping phase will be completely embeded into the new big column resolution rule.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1042909362


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {
+    // phase 1: wrap
+    val rewrittenPlan = plan.resolveOperatorsUpWithPruning(
+      _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) {
+      case p @ Project(projectList, child) if p.childrenResolved
+        && !Analyzer.containsStar(projectList)
+        && projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) =>
+
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        def wrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) {
+            case o: OuterReference
+              if aliasMap.contains(o.nameParts.map(_.head).getOrElse(o.name)) =>
+              val nameParts = o.nameParts.getOrElse(Seq(o.name))
+              val aliases = aliasMap.get(nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(nameParts, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, nameParts))
+                    .getOrElse(o)
+                case _ => o
+              }
+            case u: UnresolvedAttribute if aliasMap.contains(u.nameParts.head) &&
+              Analyzer.resolveExpressionByPlanChildren(u, p, resolver)
+                .isInstanceOf[UnresolvedAttribute] =>
+              val aliases = aliasMap.get(u.nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(u.name, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(u.nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, u.nameParts))
+                    .getOrElse(u)
+                case _ => u
+              }
+          }.asInstanceOf[NamedExpression]
+        }
+        val newProjectList = projectList.zipWithIndex.map {
+          case (a: Alias, idx) =>
+            val lcaWrapped = wrapLCAReference(a).asInstanceOf[Alias]
+            // Insert the LCA-resolved alias instead of the unresolved one into map. If it is
+            // resolved, it can be referenced as LCA by later expressions (chaining).
+            // Unresolved Alias is also added to the map to perform ambiguous name check, but only
+            // resolved alias can be LCA
+            aliasMap = insertIntoAliasMap(lcaWrapped, idx, aliasMap)
+            lcaWrapped
+          case (e, _) =>
+            wrapLCAReference(e)
+        }
+        p.copy(projectList = newProjectList)
+    }
+
+    // phase 2: unwrap
+    rewrittenPlan.resolveOperatorsUpWithPruning(
+      _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), UnknownRuleId) {
+      case p @ Project(projectList, child) if p.resolved
+        && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+        // build the map again in case the project list changes and index goes off
+        // TODO one risk: is there any rule that strips off /add the Alias? that the LCA is resolved
+        //  in the beginning, but when it comes to push down, it really can't find the matching one?
+        //  Restore back to UnresolvedAttribute.
+        //  Also, when resolving from bottom up should I worry about cases like:
+        //  Project [b AS c, c + 1 AS d]
+        //  +- Project [1 AS a, a AS b]
+        //  b AS c is resolved, even b refers to an alias contains the lateral alias?
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        val referencedAliases = collection.mutable.Set.empty[AliasEntry]
+        def unwrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+            case lcaRef: LateralColumnAliasReference if aliasMap.contains(lcaRef.nameParts.head) =>
+              val aliasEntry = aliasMap.get(lcaRef.nameParts.head).get.head
+              if (!aliasEntry.alias.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+                // If there is no chaining, push down the alias and resolve the attribute by
+                // constructing a dummy plan
+                referencedAliases += aliasEntry
+                // Implementation notes (to-delete):
+                // this is a design decision whether to restore the UnresolvedAttribute, or
+                // directly resolve by constructing a plan and using resolveExpressionByPlanChildren
+                resolveByLateralAlias(lcaRef.nameParts, aliasEntry.alias).getOrElse(lcaRef)
+              } else {
+                // If there is chaining, don't resolve and save to future rounds
+                lcaRef
+              }
+            case lcaRef: LateralColumnAliasReference if !aliasMap.contains(lcaRef.nameParts.head) =>

Review Comment:
   a.b.c -> LCATemp(resExpr, "a.b.c", attribute=alias.toAttribute)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1042522410


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {

Review Comment:
   The method is about 100 LOC after removing unnecessary TODO comments. But I can split it by phases into two methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1040868659


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {
+    // phase 1: wrap
+    val rewrittenPlan = plan.resolveOperatorsUpWithPruning(
+      _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) {
+      case p @ Project(projectList, child) if p.childrenResolved
+        && !Analyzer.containsStar(projectList)
+        && projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) =>
+
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        def wrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) {
+            case o: OuterReference
+              if aliasMap.contains(o.nameParts.map(_.head).getOrElse(o.name)) =>
+              val nameParts = o.nameParts.getOrElse(Seq(o.name))
+              val aliases = aliasMap.get(nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(nameParts, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, nameParts))
+                    .getOrElse(o)
+                case _ => o
+              }
+            case u: UnresolvedAttribute if aliasMap.contains(u.nameParts.head) &&
+              Analyzer.resolveExpressionByPlanChildren(u, p, resolver)
+                .isInstanceOf[UnresolvedAttribute] =>
+              val aliases = aliasMap.get(u.nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(u.name, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(u.nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, u.nameParts))
+                    .getOrElse(u)
+                case _ => u
+              }
+          }.asInstanceOf[NamedExpression]
+        }
+        val newProjectList = projectList.zipWithIndex.map {
+          case (a: Alias, idx) =>
+            val lcaWrapped = wrapLCAReference(a).asInstanceOf[Alias]
+            // Insert the LCA-resolved alias instead of the unresolved one into map. If it is
+            // resolved, it can be referenced as LCA by later expressions (chaining).
+            // Unresolved Alias is also added to the map to perform ambiguous name check, but only
+            // resolved alias can be LCA
+            aliasMap = insertIntoAliasMap(lcaWrapped, idx, aliasMap)
+            lcaWrapped
+          case (e, _) =>
+            wrapLCAReference(e)
+        }
+        p.copy(projectList = newProjectList)
+    }
+
+    // phase 2: unwrap
+    rewrittenPlan.resolveOperatorsUpWithPruning(
+      _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), UnknownRuleId) {
+      case p @ Project(projectList, child) if p.resolved
+        && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+        // build the map again in case the project list changes and index goes off
+        // TODO one risk: is there any rule that strips off /add the Alias? that the LCA is resolved
+        //  in the beginning, but when it comes to push down, it really can't find the matching one?

Review Comment:
   I don't think we will strip top-level alias. But if other rules split `Project`, then an alias may become an attribute in the upper `Project`. Fortunately I'm not aware of such rules.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1044807443


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -424,8 +424,51 @@ case class OuterReference(e: NamedExpression)
   override def qualifier: Seq[String] = e.qualifier
   override def exprId: ExprId = e.exprId
   override def toAttribute: Attribute = e.toAttribute
-  override def newInstance(): NamedExpression = OuterReference(e.newInstance())
+  override def newInstance(): NamedExpression =
+    OuterReference(e.newInstance()).setNameParts(nameParts)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
+
+  // optional field, the original name parts of UnresolvedAttribute before it is resolved to
+  // OuterReference. Used in rule ResolveLateralColumnAlias to convert OuterReference back to
+  // LateralColumnAliasReference.
+  var nameParts: Option[Seq[String]] = None
+  def setNameParts(newNameParts: Option[Seq[String]]): OuterReference = {

Review Comment:
   As we discussed before, I feel it is not safe to do so given the current solution in ResolveOuterReference that each rule is applied only once. I made up a query (it can't run, just for demonstration):
   ```
   SELECT *
   FROM range(1, 7)
   WHERE (
     SELECT id2
     FROM (SELECT dept * 2.0 AS id, id + 1 AS id2 FROM $testTable)) > 5
   ORDER BY id
   ```
   It is possible that `dept * 2.0` is not resolved because it needs type conversion, so the LCA rule doesn't apply. Then it just wraps the `id` in `id + 1 AS id2` as OuterReference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1044240476


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -424,8 +424,51 @@ case class OuterReference(e: NamedExpression)
   override def qualifier: Seq[String] = e.qualifier
   override def exprId: ExprId = e.exprId
   override def toAttribute: Attribute = e.toAttribute
-  override def newInstance(): NamedExpression = OuterReference(e.newInstance())
+  override def newInstance(): NamedExpression =
+    OuterReference(e.newInstance()).setNameParts(nameParts)
   final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE)
+
+  // optional field, the original name parts of UnresolvedAttribute before it is resolved to
+  // OuterReference. Used in rule ResolveLateralColumnAlias to convert OuterReference back to
+  // LateralColumnAliasReference.
+  var nameParts: Option[Seq[String]] = None
+  def setNameParts(newNameParts: Option[Seq[String]]): OuterReference = {

Review Comment:
   This is a bit tricky. Maybe we should invoke `WrapLateralColumnAliasReference` in `ResolveOuterReferences`, so that we don't need to re-resolve outer references and introduce this hack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project
URL: https://github.com/apache/spark/pull/38776


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer

Posted by GitBox <gi...@apache.org>.
anchovYu commented on code in PR #38776:
URL: https://github.com/apache/spark/pull/38776#discussion_r1041319746


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project}
+import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId}
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Resolve lateral column alias, which references the alias defined previously in the SELECT list.
+ * Plan-wise it handles two types of operators: Project and Aggregate.
+ * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve
+ *   the attributes referencing these aliases
+ * - in Aggregate TODO.
+ *
+ * The whole process is generally divided into two phases:
+ * 1) recognize resolved lateral alias, wrap the attributes referencing them with
+ *    [[LateralColumnAliasReference]]
+ * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]].
+ *    For Project, it further resolves the attributes and push down the referenced lateral aliases.
+ *    For Aggregate, TODO
+ *
+ * Example for Project:
+ * Before rewrite:
+ * Project [age AS a, 'a + 1]
+ * +- Child
+ *
+ * After phase 1:
+ * Project [age AS a, lateralalias(a) + 1]
+ * +- Child
+ *
+ * After phase 2:
+ * Project [a, a + 1]
+ * +- Project [child output, age AS a]
+ *    +- Child
+ *
+ * Example for Aggregate TODO
+ *
+ *
+ * The name resolution priority:
+ * local table column > local lateral column alias > outer reference
+ *
+ * Because lateral column alias has higher resolution priority than outer reference, it will try
+ * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an
+ * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with
+ * [[LateralColumnAliasReference]].
+ */
+// TODO revisit resolving order: top down, or bottom up
+object ResolveLateralColumnAlias extends Rule[LogicalPlan] {
+  def resolver: Resolver = conf.resolver
+
+  private case class AliasEntry(alias: Alias, index: Int)
+  private def insertIntoAliasMap(
+      a: Alias,
+      idx: Int,
+      aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = {
+    val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry])
+    aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx)))
+  }
+
+  /**
+   * Use the given the lateral alias candidate to resolve the name parts.
+   * @return The resolved attribute if succeeds. None if fails to resolve.
+   */
+  private def resolveByLateralAlias(
+      nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = {
+    val resolvedAttr = Analyzer.resolveExpressionByPlanOutput(
+      expr = UnresolvedAttribute(nameParts),
+      plan = Project(Seq(lateralAlias), OneRowRelation()),
+      resolver = resolver,
+      throws = false
+    ).asInstanceOf[NamedExpression]
+    if (resolvedAttr.resolved) Some(resolvedAttr) else None
+  }
+
+  private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = {
+    // phase 1: wrap
+    val rewrittenPlan = plan.resolveOperatorsUpWithPruning(
+      _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) {
+      case p @ Project(projectList, child) if p.childrenResolved
+        && !Analyzer.containsStar(projectList)
+        && projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) =>
+
+        var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]())
+        def wrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) {
+            case o: OuterReference
+              if aliasMap.contains(o.nameParts.map(_.head).getOrElse(o.name)) =>
+              val nameParts = o.nameParts.getOrElse(Seq(o.name))
+              val aliases = aliasMap.get(nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(nameParts, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, nameParts))
+                    .getOrElse(o)
+                case _ => o
+              }
+            case u: UnresolvedAttribute if aliasMap.contains(u.nameParts.head) &&
+              Analyzer.resolveExpressionByPlanChildren(u, p, resolver)
+                .isInstanceOf[UnresolvedAttribute] =>
+              val aliases = aliasMap.get(u.nameParts.head).get
+              aliases.size match {
+                case n if n > 1 =>
+                  throw QueryCompilationErrors.ambiguousLateralColumnAlias(u.name, n)
+                case n if n == 1 && aliases.head.alias.resolved =>
+                  // Only resolved alias can be the lateral column alias
+                  resolveByLateralAlias(u.nameParts, aliases.head.alias)
+                    .map(LateralColumnAliasReference(_, u.nameParts))
+                    .getOrElse(u)
+                case _ => u
+              }
+          }.asInstanceOf[NamedExpression]
+        }
+        val newProjectList = projectList.zipWithIndex.map {
+          case (a: Alias, idx) =>
+            val lcaWrapped = wrapLCAReference(a).asInstanceOf[Alias]
+            // Insert the LCA-resolved alias instead of the unresolved one into map. If it is
+            // resolved, it can be referenced as LCA by later expressions (chaining).
+            // Unresolved Alias is also added to the map to perform ambiguous name check, but only
+            // resolved alias can be LCA
+            aliasMap = insertIntoAliasMap(lcaWrapped, idx, aliasMap)
+            lcaWrapped
+          case (e, _) =>
+            wrapLCAReference(e)
+        }
+        p.copy(projectList = newProjectList)
+    }
+
+    // phase 2: unwrap
+    rewrittenPlan.resolveOperatorsUpWithPruning(
+      _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), UnknownRuleId) {
+      case p @ Project(projectList, child) if p.resolved
+        && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+        // build the map again in case the project list changes and index goes off
+        // TODO one risk: is there any rule that strips off /add the Alias? that the LCA is resolved
+        //  in the beginning, but when it comes to push down, it really can't find the matching one?
+        //  Restore back to UnresolvedAttribute.
+        //  Also, when resolving from bottom up should I worry about cases like:
+        //  Project [b AS c, c + 1 AS d]
+        //  +- Project [1 AS a, a AS b]
+        //  b AS c is resolved, even b refers to an alias contains the lateral alias?

Review Comment:
   Yes, my another worry is if there is some case that `a AS b` becomes unresolved due to the above comment (the `1 AS a` is gone, leaving only `a`, then `a AS b` can't find the alias, becoming unresolved). But the above `b AS c` remains resolved incorrectly .. But anyway seems it doesn't harm, will still fail with unresolved or missing input.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38776: [WIP][SQL] Refactor Analyzer by moving several helper public methods to the new Analyzer object

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38776:
URL: https://github.com/apache/spark/pull/38776#issuecomment-1327384458

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org