You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "anchovYu (via GitHub)" <gi...@apache.org> on 2023/08/08 01:35:52 UTC

[GitHub] [spark] anchovYu commented on a diff in pull request #42276: [SPARK-44714] Ease restriction of LCA resolution regarding queries with having

anchovYu commented on code in PR #42276:
URL: https://github.com/apache/spark/pull/42276#discussion_r1285473731


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAliasReference.scala:
##########
@@ -131,138 +131,166 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
       (pList.exists(hasWindowExpression) && p.expressions.forall(_.resolved) && p.childrenResolved)
   }
 
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    if (!conf.getConf(SQLConf.LATERAL_COLUMN_ALIAS_IMPLICIT_ENABLED)) {
-      plan
-    } else if (plan.containsAnyPattern(TEMP_RESOLVED_COLUMN, UNRESOLVED_HAVING)) {
-      // It should not change the plan if `TempResolvedColumn` or `UnresolvedHaving` is present in
-      // the query plan. These plans need certain plan shape to get recognized and resolved by other
-      // rules, such as Filter/Sort + Aggregate to be matched by ResolveAggregateFunctions.
-      // LCA resolution can break the plan shape, like adding Project above Aggregate.
-      plan
-    } else {
-      // phase 2: unwrap
-      plan.resolveOperatorsUpWithPruning(
-        _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), ruleId) {
-        case p @ Project(projectList, child) if ruleApplicableOnOperator(p, projectList)
-          && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
-          var aliasMap = AttributeMap.empty[AliasEntry]
-          val referencedAliases = collection.mutable.Set.empty[AliasEntry]
-          def unwrapLCAReference(e: NamedExpression): NamedExpression = {
-            e.transformWithPruning(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
-              case lcaRef: LateralColumnAliasReference if aliasMap.contains(lcaRef.a) =>
-                val aliasEntry = aliasMap.get(lcaRef.a).get
-                // If there is no chaining of lateral column alias reference, push down the alias
-                // and unwrap the LateralColumnAliasReference to the NamedExpression inside
-                // If there is chaining, don't resolve and save to future rounds
-                if (!aliasEntry.alias.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
-                  referencedAliases += aliasEntry
-                  lcaRef.ne
-                } else {
-                  lcaRef
-                }
-              case lcaRef: LateralColumnAliasReference if !aliasMap.contains(lcaRef.a) =>
-                // It shouldn't happen, but restore to unresolved attribute to be safe.
-                UnresolvedAttribute(lcaRef.nameParts)
-            }.asInstanceOf[NamedExpression]
-          }
-          val newProjectList = projectList.zipWithIndex.map {
-            case (a: Alias, idx) =>
-              val lcaResolved = unwrapLCAReference(a)
-              // Insert the original alias instead of rewritten one to detect chained LCA
-              aliasMap += (a.toAttribute -> AliasEntry(a, idx))
-              lcaResolved
-            case (e, _) =>
-              unwrapLCAReference(e)
-          }
+  /** Internal application method. A hand-written top down recursive traverse. */
+  private def apply0(plan: LogicalPlan): LogicalPlan = {
+    plan match {
+      case p: LogicalPlan if !p.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE) =>
+        p
 
-          if (referencedAliases.isEmpty) {
-            p
-          } else {
-            val outerProjectList = collection.mutable.Seq(newProjectList: _*)
-            val innerProjectList =
-              collection.mutable.ArrayBuffer(child.output.map(_.asInstanceOf[NamedExpression]): _*)
-            referencedAliases.foreach { case AliasEntry(alias: Alias, idx) =>
-              outerProjectList.update(idx, alias.toAttribute)
-              innerProjectList += alias
-            }
-            p.copy(
-              projectList = outerProjectList.toSeq,
-              child = Project(innerProjectList.toSeq, child)
-            )
-          }
+      // It should not change the Aggregate (and thus the plan shape) if its parent is an
+      // UnresolvedHaving, to avoid breaking the shape pattern  `UnresolvedHaving - Aggregate`
+      // matched by ResolveAggregateFunctions. See SPARK-42936 for more details.
+      case u @ UnresolvedHaving(_, agg: Aggregate)
+        if agg.aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+        u.copy(child = agg.mapChildren(apply0))
 
-        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _)
-          if ruleApplicableOnOperator(agg, aggregateExpressions)
-            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+      case p @ Project(projectList, child) if ruleApplicableOnOperator(p, projectList)

Review Comment:
   Completely copied the original code except adding the final line to recursively iterating children



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAliasReference.scala:
##########
@@ -131,138 +131,166 @@ object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] {
       (pList.exists(hasWindowExpression) && p.expressions.forall(_.resolved) && p.childrenResolved)
   }
 
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    if (!conf.getConf(SQLConf.LATERAL_COLUMN_ALIAS_IMPLICIT_ENABLED)) {
-      plan
-    } else if (plan.containsAnyPattern(TEMP_RESOLVED_COLUMN, UNRESOLVED_HAVING)) {
-      // It should not change the plan if `TempResolvedColumn` or `UnresolvedHaving` is present in
-      // the query plan. These plans need certain plan shape to get recognized and resolved by other
-      // rules, such as Filter/Sort + Aggregate to be matched by ResolveAggregateFunctions.
-      // LCA resolution can break the plan shape, like adding Project above Aggregate.
-      plan
-    } else {
-      // phase 2: unwrap
-      plan.resolveOperatorsUpWithPruning(
-        _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), ruleId) {
-        case p @ Project(projectList, child) if ruleApplicableOnOperator(p, projectList)
-          && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
-          var aliasMap = AttributeMap.empty[AliasEntry]
-          val referencedAliases = collection.mutable.Set.empty[AliasEntry]
-          def unwrapLCAReference(e: NamedExpression): NamedExpression = {
-            e.transformWithPruning(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
-              case lcaRef: LateralColumnAliasReference if aliasMap.contains(lcaRef.a) =>
-                val aliasEntry = aliasMap.get(lcaRef.a).get
-                // If there is no chaining of lateral column alias reference, push down the alias
-                // and unwrap the LateralColumnAliasReference to the NamedExpression inside
-                // If there is chaining, don't resolve and save to future rounds
-                if (!aliasEntry.alias.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
-                  referencedAliases += aliasEntry
-                  lcaRef.ne
-                } else {
-                  lcaRef
-                }
-              case lcaRef: LateralColumnAliasReference if !aliasMap.contains(lcaRef.a) =>
-                // It shouldn't happen, but restore to unresolved attribute to be safe.
-                UnresolvedAttribute(lcaRef.nameParts)
-            }.asInstanceOf[NamedExpression]
-          }
-          val newProjectList = projectList.zipWithIndex.map {
-            case (a: Alias, idx) =>
-              val lcaResolved = unwrapLCAReference(a)
-              // Insert the original alias instead of rewritten one to detect chained LCA
-              aliasMap += (a.toAttribute -> AliasEntry(a, idx))
-              lcaResolved
-            case (e, _) =>
-              unwrapLCAReference(e)
-          }
+  /** Internal application method. A hand-written top down recursive traverse. */
+  private def apply0(plan: LogicalPlan): LogicalPlan = {
+    plan match {
+      case p: LogicalPlan if !p.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE) =>
+        p
 
-          if (referencedAliases.isEmpty) {
-            p
-          } else {
-            val outerProjectList = collection.mutable.Seq(newProjectList: _*)
-            val innerProjectList =
-              collection.mutable.ArrayBuffer(child.output.map(_.asInstanceOf[NamedExpression]): _*)
-            referencedAliases.foreach { case AliasEntry(alias: Alias, idx) =>
-              outerProjectList.update(idx, alias.toAttribute)
-              innerProjectList += alias
-            }
-            p.copy(
-              projectList = outerProjectList.toSeq,
-              child = Project(innerProjectList.toSeq, child)
-            )
-          }
+      // It should not change the Aggregate (and thus the plan shape) if its parent is an
+      // UnresolvedHaving, to avoid breaking the shape pattern  `UnresolvedHaving - Aggregate`
+      // matched by ResolveAggregateFunctions. See SPARK-42936 for more details.
+      case u @ UnresolvedHaving(_, agg: Aggregate)
+        if agg.aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+        u.copy(child = agg.mapChildren(apply0))
 
-        case agg @ Aggregate(groupingExpressions, aggregateExpressions, _)
-          if ruleApplicableOnOperator(agg, aggregateExpressions)
-            && aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+      case p @ Project(projectList, child) if ruleApplicableOnOperator(p, projectList)
+        && projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
+        var aliasMap = AttributeMap.empty[AliasEntry]
+        val referencedAliases = collection.mutable.Set.empty[AliasEntry]
+        def unwrapLCAReference(e: NamedExpression): NamedExpression = {
+          e.transformWithPruning(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+            case lcaRef: LateralColumnAliasReference if aliasMap.contains(lcaRef.a) =>
+              val aliasEntry = aliasMap.get(lcaRef.a).get
+              // If there is no chaining of lateral column alias reference, push down the alias
+              // and unwrap the LateralColumnAliasReference to the NamedExpression inside
+              // If there is chaining, don't resolve and save to future rounds
+              if (!aliasEntry.alias.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
+                referencedAliases += aliasEntry
+                lcaRef.ne
+              } else {
+                lcaRef
+              }
+            case lcaRef: LateralColumnAliasReference if !aliasMap.contains(lcaRef.a) =>
+              // It shouldn't happen, but restore to unresolved attribute to be safe.
+              UnresolvedAttribute(lcaRef.nameParts)
+          }.asInstanceOf[NamedExpression]
+        }
+        val newProjectList = projectList.zipWithIndex.map {
+          case (a: Alias, idx) =>
+            val lcaResolved = unwrapLCAReference(a)
+            // Insert the original alias instead of rewritten one to detect chained LCA
+            aliasMap += (a.toAttribute -> AliasEntry(a, idx))
+            lcaResolved
+          case (e, _) =>
+            unwrapLCAReference(e)
+        }
 
-          // Check if current Aggregate is eligible to lift up with Project: the aggregate
-          // expression only contains: 1) aggregate functions, 2) grouping expressions, 3) leaf
-          // expressions excluding attributes not in grouping expressions
-          // This check is to prevent unnecessary transformation on invalid plan, to guarantee it
-          // throws the same exception. For example, cases like non-aggregate expressions not
-          // in group by, once transformed, will throw a different exception: missing input.
-          def eligibleToLiftUp(exp: Expression): Boolean = {
-            exp match {
-              case _: AggregateExpression => true
-              case e if groupingExpressions.exists(_.semanticEquals(e)) => true
-              case a: Attribute => false
-              case s: ScalarSubquery if s.children.nonEmpty
-                && !groupingExpressions.exists(_.semanticEquals(s)) => false
-              // Manually skip detection on function itself because it can be an aggregate function.
-              // This is to avoid expressions like sum(salary) over () eligible to lift up.
-              case WindowExpression(function, spec) =>
-                function.children.forall(eligibleToLiftUp) && eligibleToLiftUp(spec)
-              case e => e.children.forall(eligibleToLiftUp)
-            }
+        val newPlan = if (referencedAliases.isEmpty) {
+          p
+        } else {
+          val outerProjectList = collection.mutable.Seq(newProjectList: _*)
+          val innerProjectList =
+            collection.mutable.ArrayBuffer(child.output.map(_.asInstanceOf[NamedExpression]): _*)
+          referencedAliases.foreach { case AliasEntry(alias: Alias, idx) =>
+            outerProjectList.update(idx, alias.toAttribute)
+            innerProjectList += alias
           }
-          if (!aggregateExpressions.forall(eligibleToLiftUp)) {
-            return agg
+          p.copy(
+            projectList = outerProjectList.toSeq,
+            child = Project(innerProjectList.toSeq, child)
+          )
+        }
+        newPlan.mapChildren(apply0)
+
+      case agg @ Aggregate(groupingExpressions, aggregateExpressions, _)

Review Comment:
   Completely copied the original code except adding the final line to recursively applying on the new Project



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org