You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "agubichev (via GitHub)" <gi...@apache.org> on 2024/02/15 17:57:08 UTC

[PR] Count bug in temp views [spark]

agubichev opened a new pull request, #45125:
URL: https://github.com/apache/spark/pull/45125

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'common/utils/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1513701628


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+    plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   why do we need this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1511830190


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -328,6 +328,30 @@ abstract class Optimizer(catalogManager: CatalogManager)
       // Do not optimize DPP subquery, as it was created from optimized plan and we should not
       // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse.
       case d: DynamicPruningSubquery => d
+      case s@ScalarSubquery(a@Aggregate(group, _, child), _, _, _, _, mayHaveCountBug)
+        if mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+        // This is a subquery with an aggregate that may suffer from a COUNT bug.
+        // Detailed COUNT bug detection is done at a later stage (e.g. in
+        // RewriteCorrelatedScalarSubquery).
+        // Make sure that the output plan always has the same aggregate node
+        // (i.e., it is not being constant folded).
+        // Note that this does not limit optimization opportunities for the subquery: after
+        // decorrelation is done, the subquery's body becomes part of the main plan and all
+        // optimization rules are applied again.
+        val groupRefs = group.flatMap(x => x.references)
+        val projectOverSubqueryBody = Project(groupRefs ++ a.references.toSeq, child)

Review Comment:
   renamed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on PR #45125:
URL: https://github.com/apache/spark/pull/45125#issuecomment-1977518691

   > Thanks for the fix, looks good overall.
   > 
   > Let's add a gating flag for this change just in case of any issues.
   
   added a flag


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1513261508


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -328,6 +328,31 @@ abstract class Optimizer(catalogManager: CatalogManager)
       // Do not optimize DPP subquery, as it was created from optimized plan and we should not
       // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse.
       case d: DynamicPruningSubquery => d
+      case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, mayHaveCountBug)
+        if conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG) &&
+          mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+        // This is a subquery with an aggregate that may suffer from a COUNT bug.
+        // Detailed COUNT bug detection is done at a later stage (e.g. in
+        // RewriteCorrelatedScalarSubquery).
+        // Make sure that the output plan always has the same aggregate node
+        // (i.e., it is not being constant folded).
+        // Note that this does not limit optimization opportunities for the subquery: after
+        // decorrelation is done, the subquery's body becomes part of the main plan and all
+        // optimization rules are applied again.
+        val groupRefs = group.flatMap(x => x.references)
+        val projectOverAggregateChild = Project(groupRefs ++ a.references.toSeq, child)

Review Comment:
   you are right, removed the groupRefs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1514762032


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+    plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   This is needed when an Aggregate (count-bug-susceptible) has a WITH expression in it, because we skip optimizing it in OptimizeSubqueries. This only matters because RewriteWithExpression appears very early on in the optimization, and it is not triggered after we unnest the subquery. Any "later" rule would apply on the rewritten subquery, so it would not be a problem. For this "early" rules there are two options: i) repeat them after the subquery is fully unnested (i.e., the count bug has been handled, and the subquery is replaced with a join), or ii) make sure they run on subqueries. I chose option ii).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743] Count bug after constant folding [spark]

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on PR #45125:
URL: https://github.com/apache/spark/pull/45125#issuecomment-1947380207

   @jchen5 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1514771822


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+    plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   To be more precise, this PR deals with the following subqueries:
   S:
   Aggregate (mayhavecountbug = true)
    |
   <subquery body>
   
   Before this PR, OptimizeSubqueries applied on S, with this PR OptimizeSubqueries applies on <subquery body>. 
   After we unnest the subquery, the rules apply on the entire query (including the aggregate), so we don't miss any optimization opportunities by excluding Aggregate in OptimizeSubqueries.
   However, a rule RewriteWithExpressions is done before OptimizeSubqueries, so we need to make sure subqueries are handled there (in case Aggregate node has a WITH expresion in it).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1512183617


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -328,6 +328,31 @@ abstract class Optimizer(catalogManager: CatalogManager)
       // Do not optimize DPP subquery, as it was created from optimized plan and we should not
       // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse.
       case d: DynamicPruningSubquery => d
+      case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, mayHaveCountBug)
+        if conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG) &&
+          mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+        // This is a subquery with an aggregate that may suffer from a COUNT bug.
+        // Detailed COUNT bug detection is done at a later stage (e.g. in
+        // RewriteCorrelatedScalarSubquery).
+        // Make sure that the output plan always has the same aggregate node
+        // (i.e., it is not being constant folded).
+        // Note that this does not limit optimization opportunities for the subquery: after
+        // decorrelation is done, the subquery's body becomes part of the main plan and all
+        // optimization rules are applied again.
+        val groupRefs = group.flatMap(x => x.references)
+        val projectOverAggregateChild = Project(groupRefs ++ a.references.toSeq, child)
+        val optimizedPlan = Optimizer.this.execute(Subquery.fromExpression(
+          s.withNewPlan(projectOverAggregateChild)))
+        assert(optimizedPlan.isInstanceOf[Subquery])
+        val optimizedInput = optimizedPlan.asInstanceOf[Subquery].child
+
+        assert(optimizedInput.output.size == projectOverAggregateChild.output.size)
+        val updatedProjectList = projectOverAggregateChild.output.zip(optimizedInput.output).map {
+          case (oldAttr, newAttr) => Alias(newAttr, newAttr.name)(exprId = oldAttr.exprId)

Review Comment:
   This is a bit weird. We don't do this for normal subquery optimization, why it's needed here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #45125: [SPARK-46743][SQL] Count bug after constant folding
URL: https://github.com/apache/spark/pull/45125


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "jchen5 (via GitHub)" <gi...@apache.org>.
jchen5 commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1498343024


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -328,6 +328,30 @@ abstract class Optimizer(catalogManager: CatalogManager)
       // Do not optimize DPP subquery, as it was created from optimized plan and we should not
       // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse.
       case d: DynamicPruningSubquery => d
+      case s@ScalarSubquery(a@Aggregate(group, _, child), _, _, _, _, mayHaveCountBug)
+        if mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+        // This is a subquery with an aggregate that may suffer from a COUNT bug.
+        // Detailed COUNT bug detection is done at a later stage (e.g. in
+        // RewriteCorrelatedScalarSubquery).
+        // Make sure that the output plan always has the same aggregate node
+        // (i.e., it is not being constant folded).
+        // Note that this does not limit optimization opportunities for the subquery: after
+        // decorrelation is done, the subquery's body becomes part of the main plan and all
+        // optimization rules are applied again.
+        val groupRefs = group.flatMap(x => x.references)
+        val projectOverSubqueryBody = Project(groupRefs ++ a.references.toSeq, child)

Review Comment:
   maybe projectOverAggregateChild would be clearer



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -328,6 +328,30 @@ abstract class Optimizer(catalogManager: CatalogManager)
       // Do not optimize DPP subquery, as it was created from optimized plan and we should not
       // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse.
       case d: DynamicPruningSubquery => d
+      case s@ScalarSubquery(a@Aggregate(group, _, child), _, _, _, _, mayHaveCountBug)
+        if mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+        // This is a subquery with an aggregate that may suffer from a COUNT bug.
+        // Detailed COUNT bug detection is done at a later stage (e.g. in
+        // RewriteCorrelatedScalarSubquery).
+        // Make sure that the output plan always has the same aggregate node
+        // (i.e., it is not being constant folded).
+        // Note that this does not limit optimization opportunities for the subquery: after
+        // decorrelation is done, the subquery's body becomes part of the main plan and all
+        // optimization rules are applied again.

Review Comment:
   There could potentially be changes in optimization due to different order in which rules get applied, right? I think that would be quite edge case, but can we confirm whether there are plan changes in benchmarks at least?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1513701474


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -328,6 +328,34 @@ abstract class Optimizer(catalogManager: CatalogManager)
       // Do not optimize DPP subquery, as it was created from optimized plan and we should not
       // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse.
       case d: DynamicPruningSubquery => d
+      case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, mayHaveCountBug)
+        if conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG) &&
+          mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+        // This is a subquery with an aggregate that may suffer from a COUNT bug.
+        // Detailed COUNT bug detection is done at a later stage (e.g. in
+        // RewriteCorrelatedScalarSubquery).
+        // Make sure that the output plan always has the same aggregate node
+        // (i.e., it is not being constant folded).
+        // Note that this does not limit optimization opportunities for the subquery: after
+        // decorrelation is done, the subquery's body becomes part of the main plan and all
+        // optimization rules are applied again.
+        val projectOverAggregateChild = Project(a.references.toSeq, child)
+        val optimizedPlan = Optimizer.this.execute(Subquery.fromExpression(
+          s.withNewPlan(projectOverAggregateChild)))
+        assert(optimizedPlan.isInstanceOf[Subquery])
+        val optimizedInput = optimizedPlan.asInstanceOf[Subquery].child
+
+        assert(optimizedInput.output.size == projectOverAggregateChild.output.size)
+        // We preserve the top aggregation, but its input has been optimized via
+        // Optimizer.execute().
+        // Make sure that the attributes still have IDs expected by the Aggregate node
+        // by inserting a project.
+        val updatedProjectList = projectOverAggregateChild.output.zip(optimizedInput.output).map {
+          case (oldAttr, newAttr) => Alias(newAttr, newAttr.name)(exprId = oldAttr.exprId)
+        }
+
+        val res = s.withNewPlan(a.withNewChildren(Seq(Project(updatedProjectList, optimizedInput))))
+        res

Review Comment:
   ```suggestion
           s.withNewPlan(a.withNewChildren(Seq(Project(updatedProjectList, optimizedInput))))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "jchen5 (via GitHub)" <gi...@apache.org>.
jchen5 commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1516503722


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+    plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   > Thinking about it more, why do we handle the count bug in two places that are far away?
   
   I wrote a doc about the reason for it and whether we can change it: https://docs.google.com/document/d/1YCce0DtJ6NkMP1QM1nnfYvkiH1CkHoGyMjKQ8MX5bUM/edit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1515561516


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+    plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   Then this rule becomes O(n^2) complexity as every level of subqueries runs this rule for all subqueries under its level.
   
   Thinking about it more, why do we handle the count bug in two places that are far away?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45125:
URL: https://github.com/apache/spark/pull/45125#issuecomment-2018411588

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1516770647


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+    plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   Why is it O(n^2)? The ReplaceWithExpression would replace all WITH expressions in one pass when we first call it, would not it? (I know that technically it is also called as part of optimization loop in OptimizeSubqueries, but by that point all the WITH expressions are replaced).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1511831412


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -328,6 +328,30 @@ abstract class Optimizer(catalogManager: CatalogManager)
       // Do not optimize DPP subquery, as it was created from optimized plan and we should not
       // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse.
       case d: DynamicPruningSubquery => d
+      case s@ScalarSubquery(a@Aggregate(group, _, child), _, _, _, _, mayHaveCountBug)
+        if mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+        // This is a subquery with an aggregate that may suffer from a COUNT bug.
+        // Detailed COUNT bug detection is done at a later stage (e.g. in
+        // RewriteCorrelatedScalarSubquery).
+        // Make sure that the output plan always has the same aggregate node
+        // (i.e., it is not being constant folded).
+        // Note that this does not limit optimization opportunities for the subquery: after
+        // decorrelation is done, the subquery's body becomes part of the main plan and all
+        // optimization rules are applied again.

Review Comment:
   verified that the only plan changes we have in TPCDS are trivial (e.g., order of fields in project changes, or Project[a, b] becomes Project[a, a, b], but that gets resolved in physical planning).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1513262881


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -328,6 +328,31 @@ abstract class Optimizer(catalogManager: CatalogManager)
       // Do not optimize DPP subquery, as it was created from optimized plan and we should not
       // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse.
       case d: DynamicPruningSubquery => d
+      case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, mayHaveCountBug)
+        if conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG) &&
+          mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+        // This is a subquery with an aggregate that may suffer from a COUNT bug.
+        // Detailed COUNT bug detection is done at a later stage (e.g. in
+        // RewriteCorrelatedScalarSubquery).
+        // Make sure that the output plan always has the same aggregate node
+        // (i.e., it is not being constant folded).
+        // Note that this does not limit optimization opportunities for the subquery: after
+        // decorrelation is done, the subquery's body becomes part of the main plan and all
+        // optimization rules are applied again.
+        val groupRefs = group.flatMap(x => x.references)
+        val projectOverAggregateChild = Project(groupRefs ++ a.references.toSeq, child)
+        val optimizedPlan = Optimizer.this.execute(Subquery.fromExpression(
+          s.withNewPlan(projectOverAggregateChild)))
+        assert(optimizedPlan.isInstanceOf[Subquery])
+        val optimizedInput = optimizedPlan.asInstanceOf[Subquery].child
+
+        assert(optimizedInput.output.size == projectOverAggregateChild.output.size)
+        val updatedProjectList = projectOverAggregateChild.output.zip(optimizedInput.output).map {
+          case (oldAttr, newAttr) => Alias(newAttr, newAttr.name)(exprId = oldAttr.exprId)

Review Comment:
   We are preserving the Aggregate, and optimizing its input via Optimizer.execute(). This project is meant to ensure that the Aggregate still gets the attributes with expected IDs. The "normal" subquery optimization path does not care about aggregates and therefore does not need it.
   I added a comment to clarify it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1513701849


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##########
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+    plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   I think all subqueries will be optimized recursively. There is no exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on PR #45125:
URL: https://github.com/apache/spark/pull/45125#issuecomment-1977503018

   > What about if there's another node above the aggregate in the subquery, such as a filter after the aggregate (having clause)?
   
   added a test, but any non-trivial node about the aggregate (such as filter) results in having a DomainJoin, so constant folding does not kick in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "agubichev (via GitHub)" <gi...@apache.org>.
agubichev commented on PR #45125:
URL: https://github.com/apache/spark/pull/45125#issuecomment-1977518871

   @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1512181287


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -328,6 +328,31 @@ abstract class Optimizer(catalogManager: CatalogManager)
       // Do not optimize DPP subquery, as it was created from optimized plan and we should not
       // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse.
       case d: DynamicPruningSubquery => d
+      case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, mayHaveCountBug)
+        if conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG) &&
+          mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+        // This is a subquery with an aggregate that may suffer from a COUNT bug.
+        // Detailed COUNT bug detection is done at a later stage (e.g. in
+        // RewriteCorrelatedScalarSubquery).
+        // Make sure that the output plan always has the same aggregate node
+        // (i.e., it is not being constant folded).
+        // Note that this does not limit optimization opportunities for the subquery: after
+        // decorrelation is done, the subquery's body becomes part of the main plan and all
+        // optimization rules are applied again.
+        val groupRefs = group.flatMap(x => x.references)
+        val projectOverAggregateChild = Project(groupRefs ++ a.references.toSeq, child)

Review Comment:
   Shouldn't `a.references` always contain `groupRefs`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org