You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "cloud-fan (via GitHub)" <gi...@apache.org> on 2023/11/23 12:22:04 UTC

[PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

cloud-fan opened a new pull request, #43978:
URL: https://github.com/apache/spark/pull/43978

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This is a followup of https://github.com/apache/spark/pull/43623 to fix a regression. For `With` inside conditional branches, they may not be evaluated at all and we should not pull out the common expressions into a `Project`, but just inline.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   avoid perf regression
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   new test
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1404186491


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))

Review Comment:
   I'm not sure. E.g. if we have `With(With(x + x, Seq(x = y + y)), Seq(y = a + 1))` where `x` and `y` are references and `a` is an attribute and we would recurse into `With(x + x, Seq(x = y + y))` before replacing the `y` references to actual attributes that aliases `a + 1` then the `childProjectionIndex` calculation for `y + y` won't find the right child, will it? But an UT covering this case would be good. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1405081903


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))

Review Comment:
   oh correlated nested `With`! I'm not sure if we want to support it or not... But at least we should fail if we don't want to support it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1403662967


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))

Review Comment:
   Actually, the current logic seems to behave correctly if there is an inner `With` in an outer `With`'s `child` and the inner has a definition with a reference to an outer definition . (The previous `transformExpressionsUp()` had issues in that case.) But the rule is not idempotent, so maybe we should recurse into `w.child` after replacing `CommonExpressionRef`s?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1404186491


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))

Review Comment:
   I'm not sure. E.g. if we have `With(With(x + x, Seq(x = y + y)), Seq(y = a + 1))` where `x` and `y` are references and `a` is an attribute and we would recurse into `With(x + x, Seq(x = y + y))` before replacing the `y` references to actual attributes, that aliases `a + 1`, then the `childProjectionIndex` calculation for `y + y` won't find the right child, will it? But an UT covering this case would be good. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "Kimahriman (via GitHub)" <gi...@apache.org>.
Kimahriman commented on PR #43978:
URL: https://github.com/apache/spark/pull/43978#issuecomment-1828770494

   Trying to follow along with this since duplicate expression evaluations have been a huge pain for us for a while (mostly on the execution side, but having incomprehensible explain strings isn't fun either). Am I understanding this correctly that this effectively negates the `NullIf` use of the `With` expression? And this still won't have any real uses until some follow up (like "a general rule that find shared common expressions")?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #43978:
URL: https://github.com/apache/spark/pull/43978#issuecomment-1824341945

   @viirya @peter-toth 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1403808226


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))
+        val refToExpr = mutable.HashMap.empty[Long, Expression]
+        val childProjections = Array.fill(inputPlans.length)(mutable.ArrayBuffer.empty[Alias])
 
-            newChildren = newChildren.zip(childProjections).map { case (child, projections) =>
-              if (projections.nonEmpty) {
-                Project(child.output ++ projections, child)
-              } else {
-                child
-              }
+        defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
+          if (CollapseProject.isCheap(child)) {
+            refToExpr(id) = child
+          } else {
+            val childProjectionIndex = inputPlans.indexWhere(
+              c => child.references.subsetOf(c.outputSet)
+            )
+            if (childProjectionIndex == -1) {
+              // When we cannot rewrite the common expressions, force to inline them so that the
+              // query can still run. This can happen if the join condition contains `With` and
+              // the common expression references columns from both join sides.
+              // TODO: things can go wrong if the common expression is nondeterministic. We
+              //       don't fix it for now to match the old buggy behavior when certain
+              //       `RuntimeReplaceable` did not use the `With` expression.
+              // TODO: we should calculate the ref count and also inline the common expression
+              //       if it's ref count is 1.
+              refToExpr(id) = child
+            } else {
+              val alias = Alias(child, s"_common_expr_$index")()
+              childProjections(childProjectionIndex) += alias
+              refToExpr(id) = alias.toAttribute
             }
+          }
+        }
 
+        for (i <- inputPlans.indices) {
+          val projectList = childProjections(i)
+          if (projectList.nonEmpty) {
+            inputPlans(i) = Project(inputPlans(i).output ++ projectList, inputPlans(i))
+          }
+        }
+
+        w.child.transformWithPruning(_.containsPattern(COMMON_EXPR_REF)) {
+          case ref: CommonExpressionRef => refToExpr(ref.id)
+        }
+
+      case c: ConditionalExpression =>
+        val newAlwaysEvaluatedInputs = c.alwaysEvaluatedInputs.map(
+          rewriteWithExprAndInputPlans(_, inputPlans))
+        val newExpr = c.withNewAlwaysEvaluatedInputs(newAlwaysEvaluatedInputs)
+        // Use transformUp to handle nested With.
+        newExpr.transformUpWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+          case With(child, defs) =>
+            // For With in the conditional branches, they may not be evaluated at all and we can't
+            // pull the common expressions into a project which will always be evaluated. Inline it.

Review Comment:
   Hmm, for specific conditional expression, e.g., `If`, we can still extract common expression shared on both branches which will be evaluated for sure?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1404186491


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))

Review Comment:
   I'm not sure. E.g. if we have `With(With(x + x, Seq(x = y + y)), Seq(y = a + 1))` where `x` and `y` are references and `a` is an attribute and we would recurse into `With(x + x, Seq(x = y + y))` before replacing the `y` references to actual attributes that aliases `a + 1` then the `childProjectionIndex` won't find the right child, will it? But an UT covering this case would be good. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1403635984


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))

Review Comment:
   Now that we have "manual" recursion (instead of `transformExpressionsUp()`), shall we deal with nested `With`s in `w.child` too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "Kimahriman (via GitHub)" <gi...@apache.org>.
Kimahriman commented on PR #43978:
URL: https://github.com/apache/spark/pull/43978#issuecomment-1828784610

   > `NullIf` uses it already. We can avoid duplicating expressions in `NullIf` if it's not used in unsupported plan nodes like join condition.
   
   So is this just saying not to recurse into conditional expressions to find `With` expressions, but a `With` wrapping a conditional expression will still have the common expression pulled into a new project?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #43978:
URL: https://github.com/apache/spark/pull/43978#issuecomment-1828777454

   > And this still won't have any real uses until some follow up (like "a general rule that find shared common expressions")?
   
   `NullIf` uses it already. We can avoid duplicating expressions in `NullIf` if it's not used in unsupported plan nodes like join condition.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1403392772


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))
+        val refToExpr = mutable.HashMap.empty[Long, Expression]
+        val childProjections = Array.fill(inputPlans.length)(mutable.ArrayBuffer.empty[Alias])
 
-            newChildren = newChildren.zip(childProjections).map { case (child, projections) =>
-              if (projections.nonEmpty) {
-                Project(child.output ++ projections, child)
-              } else {
-                child
-              }
+        defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
+          if (CollapseProject.isCheap(child)) {
+            refToExpr(id) = child
+          } else {
+            val childProjectionIndex = inputPlans.indexWhere(
+              c => child.references.subsetOf(c.outputSet)
+            )
+            if (childProjectionIndex == -1) {
+              // When we cannot rewrite the common expressions, force to inline them so that the
+              // query can still run. This can happen if the join condition contains `With` and
+              // the common expression references columns from both join sides.
+              // TODO: things can go wrong if the common expression is nondeterministic. We
+              //       don't fix it for now to match the old buggy behavior when certain
+              //       `RuntimeReplaceable` did not use the `With` expression.
+              // TODO: we should calculate the ref count and also inline the common expression
+              //       if it's ref count is 1.
+              refToExpr(id) = child
+            } else {
+              val alias = Alias(child, s"_common_expr_$index")()
+              childProjections(childProjectionIndex) += alias
+              refToExpr(id) = alias.toAttribute
             }
+          }
+        }
 
+        for (i <- inputPlans.indices) {
+          val projectList = childProjections(i)
+          if (projectList.nonEmpty) {
+            inputPlans(i) = Project(inputPlans(i).output ++ projectList, inputPlans(i))
+          }
+        }
+
+        w.child.transformWithPruning(_.containsPattern(COMMON_EXPR_REF)) {
+          case ref: CommonExpressionRef => refToExpr(ref.id)
+        }
+
+      case c: ConditionalExpression =>
+        val newAlwaysEvaluatedInputs = c.alwaysEvaluatedInputs.map(
+          rewriteWithExprAndInputPlans(_, inputPlans))

Review Comment:
   I thought about it before. The problem is that it's hard to update the original `ConditionalExpression` with the new `shared common expressions`. `alwaysEvaluatedInputs` is static so that I can let every `ConditionalExpression` to implement a method to update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1403851820


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))
+        val refToExpr = mutable.HashMap.empty[Long, Expression]
+        val childProjections = Array.fill(inputPlans.length)(mutable.ArrayBuffer.empty[Alias])
 
-            newChildren = newChildren.zip(childProjections).map { case (child, projections) =>
-              if (projections.nonEmpty) {
-                Project(child.output ++ projections, child)
-              } else {
-                child
-              }
+        defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
+          if (CollapseProject.isCheap(child)) {
+            refToExpr(id) = child
+          } else {
+            val childProjectionIndex = inputPlans.indexWhere(
+              c => child.references.subsetOf(c.outputSet)
+            )
+            if (childProjectionIndex == -1) {
+              // When we cannot rewrite the common expressions, force to inline them so that the
+              // query can still run. This can happen if the join condition contains `With` and
+              // the common expression references columns from both join sides.
+              // TODO: things can go wrong if the common expression is nondeterministic. We
+              //       don't fix it for now to match the old buggy behavior when certain
+              //       `RuntimeReplaceable` did not use the `With` expression.
+              // TODO: we should calculate the ref count and also inline the common expression
+              //       if it's ref count is 1.
+              refToExpr(id) = child
+            } else {
+              val alias = Alias(child, s"_common_expr_$index")()
+              childProjections(childProjectionIndex) += alias
+              refToExpr(id) = alias.toAttribute
             }
+          }
+        }
 
+        for (i <- inputPlans.indices) {
+          val projectList = childProjections(i)
+          if (projectList.nonEmpty) {
+            inputPlans(i) = Project(inputPlans(i).output ++ projectList, inputPlans(i))
+          }
+        }
+
+        w.child.transformWithPruning(_.containsPattern(COMMON_EXPR_REF)) {
+          case ref: CommonExpressionRef => refToExpr(ref.id)
+        }
+
+      case c: ConditionalExpression =>
+        val newAlwaysEvaluatedInputs = c.alwaysEvaluatedInputs.map(
+          rewriteWithExprAndInputPlans(_, inputPlans))
+        val newExpr = c.withNewAlwaysEvaluatedInputs(newAlwaysEvaluatedInputs)
+        // Use transformUp to handle nested With.
+        newExpr.transformUpWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+          case With(child, defs) =>
+            // For With in the conditional branches, they may not be evaluated at all and we can't
+            // pull the common expressions into a project which will always be evaluated. Inline it.

Review Comment:
   same as https://github.com/apache/spark/pull/43978/files#r1403392772 .
   
   It's easy to find these common expressions shared on both branches, but it's hard to put them back to `If`. I think it's better to do it when we make it into a general rule that find shared common expressions and create `With` to deduplicate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1403861827


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))

Review Comment:
   maybe before is better, as the expression tree may be much larger after replacing  `CommonExpressionRef`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #43978:
URL: https://github.com/apache/spark/pull/43978#issuecomment-1829688156

   The doc issue is unrelated
   ```
   don't know which module to import for autodocumenting 'apply_batch' (try placing a "module" or "currentmodule" directive in the document, or giving an explicit module name)
   make: *** [Makefile:35: html] Error 2
   ```
   cc @HyukjinKwon @LuciferYang 
   
   I'm merging it to master, thanks for review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #43978:
URL: https://github.com/apache/spark/pull/43978#issuecomment-1829685275

   @Kimahriman Yes. `With` is a very conservative solution for now and fixes the problem only for `NullIf` in certain cases. We will improve it and expand the usage later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1403808226


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))
+        val refToExpr = mutable.HashMap.empty[Long, Expression]
+        val childProjections = Array.fill(inputPlans.length)(mutable.ArrayBuffer.empty[Alias])
 
-            newChildren = newChildren.zip(childProjections).map { case (child, projections) =>
-              if (projections.nonEmpty) {
-                Project(child.output ++ projections, child)
-              } else {
-                child
-              }
+        defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
+          if (CollapseProject.isCheap(child)) {
+            refToExpr(id) = child
+          } else {
+            val childProjectionIndex = inputPlans.indexWhere(
+              c => child.references.subsetOf(c.outputSet)
+            )
+            if (childProjectionIndex == -1) {
+              // When we cannot rewrite the common expressions, force to inline them so that the
+              // query can still run. This can happen if the join condition contains `With` and
+              // the common expression references columns from both join sides.
+              // TODO: things can go wrong if the common expression is nondeterministic. We
+              //       don't fix it for now to match the old buggy behavior when certain
+              //       `RuntimeReplaceable` did not use the `With` expression.
+              // TODO: we should calculate the ref count and also inline the common expression
+              //       if it's ref count is 1.
+              refToExpr(id) = child
+            } else {
+              val alias = Alias(child, s"_common_expr_$index")()
+              childProjections(childProjectionIndex) += alias
+              refToExpr(id) = alias.toAttribute
             }
+          }
+        }
 
+        for (i <- inputPlans.indices) {
+          val projectList = childProjections(i)
+          if (projectList.nonEmpty) {
+            inputPlans(i) = Project(inputPlans(i).output ++ projectList, inputPlans(i))
+          }
+        }
+
+        w.child.transformWithPruning(_.containsPattern(COMMON_EXPR_REF)) {
+          case ref: CommonExpressionRef => refToExpr(ref.id)
+        }
+
+      case c: ConditionalExpression =>
+        val newAlwaysEvaluatedInputs = c.alwaysEvaluatedInputs.map(
+          rewriteWithExprAndInputPlans(_, inputPlans))
+        val newExpr = c.withNewAlwaysEvaluatedInputs(newAlwaysEvaluatedInputs)
+        // Use transformUp to handle nested With.
+        newExpr.transformUpWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+          case With(child, defs) =>
+            // For With in the conditional branches, they may not be evaluated at all and we can't
+            // pull the common expressions into a project which will always be evaluated. Inline it.

Review Comment:
   Hmm, for specific conditional expression, e.g., `If`, we can still extract common expression shared on both branches?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1405671315


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))

Review Comment:
   Then we may need a test for that (either supported or failed if not).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #43978: [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches
URL: https://github.com/apache/spark/pull/43978


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1403362008


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))
+        val refToExpr = mutable.HashMap.empty[Long, Expression]
+        val childProjections = Array.fill(inputPlans.length)(mutable.ArrayBuffer.empty[Alias])
 
-            newChildren = newChildren.zip(childProjections).map { case (child, projections) =>
-              if (projections.nonEmpty) {
-                Project(child.output ++ projections, child)
-              } else {
-                child
-              }
+        defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
+          if (CollapseProject.isCheap(child)) {
+            refToExpr(id) = child
+          } else {
+            val childProjectionIndex = inputPlans.indexWhere(
+              c => child.references.subsetOf(c.outputSet)
+            )
+            if (childProjectionIndex == -1) {
+              // When we cannot rewrite the common expressions, force to inline them so that the
+              // query can still run. This can happen if the join condition contains `With` and
+              // the common expression references columns from both join sides.
+              // TODO: things can go wrong if the common expression is nondeterministic. We
+              //       don't fix it for now to match the old buggy behavior when certain
+              //       `RuntimeReplaceable` did not use the `With` expression.
+              // TODO: we should calculate the ref count and also inline the common expression
+              //       if it's ref count is 1.
+              refToExpr(id) = child
+            } else {
+              val alias = Alias(child, s"_common_expr_$index")()
+              childProjections(childProjectionIndex) += alias
+              refToExpr(id) = alias.toAttribute
             }
+          }
+        }
 
+        for (i <- inputPlans.indices) {
+          val projectList = childProjections(i)
+          if (projectList.nonEmpty) {
+            inputPlans(i) = Project(inputPlans(i).output ++ projectList, inputPlans(i))
+          }
+        }
+
+        w.child.transformWithPruning(_.containsPattern(COMMON_EXPR_REF)) {
+          case ref: CommonExpressionRef => refToExpr(ref.id)
+        }
+
+      case c: ConditionalExpression =>
+        val newAlwaysEvaluatedInputs = c.alwaysEvaluatedInputs.map(
+          rewriteWithExprAndInputPlans(_, inputPlans))

Review Comment:
   This is dealing with common expressions only in always evaluated input e.g., predicate of `If`.
   
   How about common expressions shared between predicate and branches?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "peter-toth (via GitHub)" <gi...@apache.org>.
peter-toth commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1403662967


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))

Review Comment:
   Actually, the current logic seems to behave correctly if there is an inner `With` in an outer `With`'s `child` and the inner has a definition with a reference to an outer definition . (The previous `transformExpressionsUp()` had issues in that case.) But the rule is not idempotent now, so maybe we should recurse into `w.child` after replacing `CommonExpressionRef`s?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45760][SQL][FOLLOWUP] Inline With inside conditional branches [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #43978:
URL: https://github.com/apache/spark/pull/43978#discussion_r1403851044


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -35,56 +35,82 @@ object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
       case p if p.expressions.exists(_.containsPattern(WITH_EXPRESSION)) =>
-        var newChildren = p.children
-        var newPlan: LogicalPlan = p.transformExpressionsUp {
-          case With(child, defs) =>
-            val refToExpr = mutable.HashMap.empty[Long, Expression]
-            val childProjections = Array.fill(newChildren.size)(mutable.ArrayBuffer.empty[Alias])
+        val inputPlans = p.children.toArray
+        var newPlan: LogicalPlan = p.mapExpressions { expr =>
+          rewriteWithExprAndInputPlans(expr, inputPlans)
+        }
+        newPlan = newPlan.withNewChildren(inputPlans.toIndexedSeq)
+        if (p.output == newPlan.output) {
+          newPlan
+        } else {
+          Project(p.output, newPlan)
+        }
+    }
+  }
 
-            defs.zipWithIndex.foreach { case (CommonExpressionDef(child, id), index) =>
-              if (CollapseProject.isCheap(child)) {
-                refToExpr(id) = child
-              } else {
-                val childProjectionIndex = newChildren.indexWhere(
-                  c => child.references.subsetOf(c.outputSet)
-                )
-                if (childProjectionIndex == -1) {
-                  // When we cannot rewrite the common expressions, force to inline them so that the
-                  // query can still run. This can happen if the join condition contains `With` and
-                  // the common expression references columns from both join sides.
-                  // TODO: things can go wrong if the common expression is nondeterministic. We
-                  //       don't fix it for now to match the old buggy behavior when certain
-                  //       `RuntimeReplaceable` did not use the `With` expression.
-                  // TODO: we should calculate the ref count and also inline the common expression
-                  //       if it's ref count is 1.
-                  refToExpr(id) = child
-                } else {
-                  val alias = Alias(child, s"_common_expr_$index")()
-                  childProjections(childProjectionIndex) += alias
-                  refToExpr(id) = alias.toAttribute
-                }
-              }
-            }
+  private def rewriteWithExprAndInputPlans(
+      e: Expression,
+      inputPlans: Array[LogicalPlan]): Expression = {
+    if (!e.containsPattern(WITH_EXPRESSION)) return e
+    e match {
+      case w: With =>
+        // Rewrite nested With expression in CommonExpressionDef first.
+        val defs = w.defs.map(rewriteWithExprAndInputPlans(_, inputPlans))

Review Comment:
   This is a good catch! It seems doesn't matter when to recurse into `w.child`, either before replacing `CommonExpressionRef` or after is fine?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org