You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "Hisoka-X (via GitHub)" <gi...@apache.org> on 2023/05/28 05:54:46 UTC

[GitHub] [spark] Hisoka-X opened a new pull request, #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Hisoka-X opened a new pull request, #41347:
URL: https://github.com/apache/spark/pull/41347

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   
   Eg:
   ```scala
   sql("create view t(c1, c2) as values (0, 1), (0, 2), (1, 2)")
   
   sql("select c1, c2, (select count(*) cnt from t t2 where t1.c1 = t2.c1 " +
   "having cnt = 0) from t t1").show() 
   ```
   The error will throw:
   ``` 
   [PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule org.apache.spark.sql.catalyst.optimizer.RewriteCorrelatedScalarSubquery in batch Operator Optimization before Inferring Filters generated an invalid plan: The plan becomes unresolved: 'Project [toprettystring(c1#224, Some(America/Los_Angeles)) AS toprettystring(c1)#238, toprettystring(c2#225, Some(America/Los_Angeles)) AS toprettystring(c2)#239, toprettystring(cnt#246L, Some(America/Los_Angeles)) AS toprettystring(scalarsubquery(c1))#240]
   +- 'Project [c1#224, c2#225, CASE WHEN isnull(alwaysTrue#245) THEN 0 WHEN NOT (cnt#222L = 0) THEN null ELSE cnt#222L END AS cnt#246L]
      +- 'Join LeftOuter, (c1#224 = c1#224#244)
         :- Project [col1#226 AS c1#224, col2#227 AS c2#225]
         :  +- LocalRelation [col1#226, col2#227]
         +- Project [cnt#222L, c1#224#244, cnt#222L, c1#224, true AS alwaysTrue#245]
            +- Project [cnt#222L, c1#224 AS c1#224#244, cnt#222L, c1#224]
               +- Aggregate [c1#224], [count(1) AS cnt#222L, c1#224]
                  +- Project [col1#228 AS c1#224]
                     +- LocalRelation [col1#228, col2#229]The previous plan: Project [toprettystring(c1#224, Some(America/Los_Angeles)) AS toprettystring(c1)#238, toprettystring(c2#225, Some(America/Los_Angeles)) AS toprettystring(c2)#239, toprettystring(scalar-subquery#223 [c1#224 && (c1#224 = c1#224#244)], Some(America/Los_Angeles)) AS toprettystring(scalarsubquery(c1))#240]
   :  +- Project [cnt#222L, c1#224 AS c1#224#244]
   :     +- Filter (cnt#222L = 0)
   :        +- Aggregate [c1#224], [count(1) AS cnt#222L, c1#224]
   :           +- Project [col1#228 AS c1#224]
   :              +- LocalRelation [col1#228, col2#229]
   +- Project [col1#226 AS c1#224, col2#227 AS c2#225]
      +- LocalRelation [col1#226, col2#227] 
   ```
   
   The reason of error is the unresolved expression in `Join` node which generate by subquery decorrelation. The `duplicateResolved` in `Join` node are false. That's meaning the `Join` left and right have same `Attribute`, in this eg is `c1#224`. The right `c1#224` `Attribute` generated by having Inputs, because there are wrong having Inputs. 
   
   This problem only occurs when there contain having clause.
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   Fix subquery bug on single table when use having clause
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   Add new test
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1262375339


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala:
##########
@@ -271,7 +271,10 @@ case class ScalarSubquery(
     mayHaveCountBug: Option[Boolean] = None)
   extends SubqueryExpression(plan, outerAttrs, exprId, joinCond, hint) with Unevaluable {
   override def dataType: DataType = {
-    assert(plan.schema.fields.nonEmpty, "Scalar subquery should have only one column")
+    if (!plan.schema.fields.nonEmpty) {

Review Comment:
   Yes. Usually this error will be thrown by `checkAnalysis`, but we may call datatype in `DeduplicateRelations ` to cause this exception to be thrown. This change ensures that the thrown exception is consistent.
   ```log
   Caused by: sbt.ForkMain$ForkError: java.lang.AssertionError: assertion failed: Scalar subquery should have only one column
   	at scala.Predef$.assert(Predef.scala:223)
   	at org.apache.spark.sql.catalyst.expressions.ScalarSubquery.dataType(subquery.scala:274)
   	at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:194)
   	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$$anonfun$findAliases$1.applyOrElse(DeduplicateRelations.scala:530)
   	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$$anonfun$findAliases$1.applyOrElse(DeduplicateRelations.scala:530)
   	at scala.PartialFunction.$anonfun$runWith$1$adapted(PartialFunction.scala:145)
   	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
   	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
   	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
   	at scala.collection.TraversableLike.collect(TraversableLike.scala:407)
   	at scala.collection.TraversableLike.collect$(TraversableLike.scala:405)
   	at scala.collection.AbstractTraversable.collect(Traversable.scala:108)
   	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$.findAliases(DeduplicateRelations.scala:530)
   	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$.org$apache$spark$sql$catalyst$analysis$DeduplicateRelations$$renewDuplicatedRelations(DeduplicateRelations.scala:120)
   	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$.apply(DeduplicateRelations.scala:40)
   	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$.apply(DeduplicateRelations.scala:38)
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1273615032


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -95,59 +102,161 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
     case p: LogicalPlan if p.isStreaming => (plan, false)
 
     case m: MultiInstanceRelation =>
-      val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
-      if (existingRelations.contains(planWrapper)) {
-        val newNode = m.newInstance()
-        newNode.copyTagsFrom(m)
-        (newNode, true)
-      } else {
-        existingRelations.add(planWrapper)
-        (m, false)
-      }
+      deduplicateAndRenew[LogicalPlan with MultiInstanceRelation](
+        existingRelations,
+        m,
+        _.output.map(_.exprId.id),
+        node => node.newInstance().asInstanceOf[LogicalPlan with MultiInstanceRelation])
+
+    case p: Project =>

Review Comment:
   Oh, sorry about that. Let me add now!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1262350788


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala:
##########
@@ -271,7 +271,10 @@ case class ScalarSubquery(
     mayHaveCountBug: Option[Boolean] = None)
   extends SubqueryExpression(plan, outerAttrs, exprId, joinCond, hint) with Unevaluable {
   override def dataType: DataType = {
-    assert(plan.schema.fields.nonEmpty, "Scalar subquery should have only one column")
+    if (!plan.schema.fields.nonEmpty) {

Review Comment:
   Can we really reach this code branch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1267529186


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -95,59 +102,124 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
     case p: LogicalPlan if p.isStreaming => (plan, false)
 
     case m: MultiInstanceRelation =>
-      val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
-      if (existingRelations.contains(planWrapper)) {
-        val newNode = m.newInstance()
-        newNode.copyTagsFrom(m)
-        (newNode, true)
-      } else {
-        existingRelations.add(planWrapper)
-        (m, false)
-      }
+      deduplicateAndRenew(existingRelations, m)(_.output.map(_.exprId.id))(node =>
+        node.newInstance().asInstanceOf[LogicalPlan with MultiInstanceRelation])
+
+    case p: Project =>
+      deduplicateAndRenew(existingRelations, p)(newProject => findAliases(newProject.projectList)
+        .map(_.exprId.id).toSeq)(newProject => newProject.copy(newAliases(newProject.projectList)))

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1262375339


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala:
##########
@@ -271,7 +271,10 @@ case class ScalarSubquery(
     mayHaveCountBug: Option[Boolean] = None)
   extends SubqueryExpression(plan, outerAttrs, exprId, joinCond, hint) with Unevaluable {
   override def dataType: DataType = {
-    assert(plan.schema.fields.nonEmpty, "Scalar subquery should have only one column")
+    if (!plan.schema.fields.nonEmpty) {

Review Comment:
   Yes. Usually this error will be thrown by `checkAnalysis`, but we may call datatype in `DeduplicateRelations ` to cause this exception to be thrown. This change ensures that the thrown exception is consistent.
   
   Change before:
   ```log
   Caused by: sbt.ForkMain$ForkError: java.lang.AssertionError: assertion failed: Scalar subquery should have only one column
   	at scala.Predef$.assert(Predef.scala:223)
   	at org.apache.spark.sql.catalyst.expressions.ScalarSubquery.dataType(subquery.scala:274)
   	at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:194)
   	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$$anonfun$findAliases$1.applyOrElse(DeduplicateRelations.scala:530)
   	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$$anonfun$findAliases$1.applyOrElse(DeduplicateRelations.scala:530)
   	at scala.PartialFunction.$anonfun$runWith$1$adapted(PartialFunction.scala:145)
   	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
   	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
   	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
   	at scala.collection.TraversableLike.collect(TraversableLike.scala:407)
   	at scala.collection.TraversableLike.collect$(TraversableLike.scala:405)
   	at scala.collection.AbstractTraversable.collect(Traversable.scala:108)
   	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$.findAliases(DeduplicateRelations.scala:530)
   	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$.org$apache$spark$sql$catalyst$analysis$DeduplicateRelations$$renewDuplicatedRelations(DeduplicateRelations.scala:120)
   	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$.apply(DeduplicateRelations.scala:40)
   	at org.apache.spark.sql.catalyst.analysis.DeduplicateRelations$.apply(DeduplicateRelations.scala:38)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1263172474


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -117,37 +311,49 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
           }
         }
 
-        val planWithNewSubquery = plan.transformExpressions {
-          case subquery: SubqueryExpression =>
-            val (renewed, changed) = renewDuplicatedRelations(existingRelations, subquery.plan)
-            if (changed) planChanged = true
-            subquery.withNewPlan(renewed)
-        }
+        val (newPlan, changed) = getNewPlanWithNewChildren(existingRelations, newChildren.toArray,
+          plan, planChanged)
+        planChanged |= changed
+        newPlan
+      } else {
+        plan
+      }
+      (newPlan, planChanged)
+  }
 
-        if (planChanged) {
-          if (planWithNewSubquery.childrenResolved) {
-            val planWithNewChildren = planWithNewSubquery.withNewChildren(newChildren.toSeq)
-            val attrMap = AttributeMap(
-              plan
-                .children
-                .flatMap(_.output).zip(newChildren.flatMap(_.output))
-                .filter { case (a1, a2) => a1.exprId != a2.exprId }
-            )
-            if (attrMap.isEmpty) {
-              planWithNewChildren
-            } else {
-              planWithNewChildren.rewriteAttrs(attrMap)
-            }
-          } else {
-            planWithNewSubquery.withNewChildren(newChildren.toSeq)
-          }
-        } else {
+  private def getNewPlanWithNewChildren(

Review Comment:
   why is it hard? The original code can handle LogicalPlan with any number of children, and it can definitely take care of plans that only have one child.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1256456878


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -116,38 +131,48 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
             planChanged = true
           }
         }
+        val (newPlan, changed) = getNewPlanWithNewChildren(existingRelations, newChildren.toArray,

Review Comment:
   we don't need to invoke `getNewPlanWithNewChildren` if `planChanged` is false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1274273945


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -95,59 +102,161 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
     case p: LogicalPlan if p.isStreaming => (plan, false)
 
     case m: MultiInstanceRelation =>
-      val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
-      if (existingRelations.contains(planWrapper)) {
-        val newNode = m.newInstance()
-        newNode.copyTagsFrom(m)
-        (newNode, true)
-      } else {
-        existingRelations.add(planWrapper)
-        (m, false)
-      }
+      deduplicateAndRenew[LogicalPlan with MultiInstanceRelation](
+        existingRelations,
+        m,
+        _.output.map(_.exprId.id),
+        node => node.newInstance().asInstanceOf[LogicalPlan with MultiInstanceRelation])
+
+    case p: Project =>

Review Comment:
   https://github.com/apache/spark/pull/42160



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1262363453


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -117,37 +311,49 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
           }
         }
 
-        val planWithNewSubquery = plan.transformExpressions {
-          case subquery: SubqueryExpression =>
-            val (renewed, changed) = renewDuplicatedRelations(existingRelations, subquery.plan)
-            if (changed) planChanged = true
-            subquery.withNewPlan(renewed)
-        }
+        val (newPlan, changed) = getNewPlanWithNewChildren(existingRelations, newChildren.toArray,
+          plan, planChanged)
+        planChanged |= changed
+        newPlan
+      } else {
+        plan
+      }
+      (newPlan, planChanged)
+  }
 
-        if (planChanged) {
-          if (planWithNewSubquery.childrenResolved) {
-            val planWithNewChildren = planWithNewSubquery.withNewChildren(newChildren.toSeq)
-            val attrMap = AttributeMap(
-              plan
-                .children
-                .flatMap(_.output).zip(newChildren.flatMap(_.output))
-                .filter { case (a1, a2) => a1.exprId != a2.exprId }
-            )
-            if (attrMap.isEmpty) {
-              planWithNewChildren
-            } else {
-              planWithNewChildren.rewriteAttrs(attrMap)
-            }
-          } else {
-            planWithNewSubquery.withNewChildren(newChildren.toSeq)
-          }
-        } else {
+  private def getNewPlanWithNewChildren(

Review Comment:
   can we just add a big function `def getNewPlanWithRelationDeduplicated`, which renew duplicated relations in the children and subqueries?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #41347:
URL: https://github.com/apache/spark/pull/41347#issuecomment-1643006858

   Thanks @cloud-fan for your help and @allisonwang-db 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1262394603


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -117,37 +311,49 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
           }
         }
 
-        val planWithNewSubquery = plan.transformExpressions {
-          case subquery: SubqueryExpression =>
-            val (renewed, changed) = renewDuplicatedRelations(existingRelations, subquery.plan)
-            if (changed) planChanged = true
-            subquery.withNewPlan(renewed)
-        }
+        val (newPlan, changed) = getNewPlanWithNewChildren(existingRelations, newChildren.toArray,
+          plan, planChanged)
+        planChanged |= changed
+        newPlan
+      } else {
+        plan
+      }
+      (newPlan, planChanged)
+  }
 
-        if (planChanged) {
-          if (planWithNewSubquery.childrenResolved) {
-            val planWithNewChildren = planWithNewSubquery.withNewChildren(newChildren.toSeq)
-            val attrMap = AttributeMap(
-              plan
-                .children
-                .flatMap(_.output).zip(newChildren.flatMap(_.output))
-                .filter { case (a1, a2) => a1.exprId != a2.exprId }
-            )
-            if (attrMap.isEmpty) {
-              planWithNewChildren
-            } else {
-              planWithNewChildren.rewriteAttrs(attrMap)
-            }
-          } else {
-            planWithNewSubquery.withNewChildren(newChildren.toSeq)
-          }
-        } else {
+  private def getNewPlanWithNewChildren(

Review Comment:
   you mean put `renewDuplicatedRelations` and `getNewPlanWithNewChildren` together? (Hard to do that, if child not only one, we could invoke renewDuplicatedRelations more than once to get children, then can invoke getNewPlanWithNewChildren)
   
   Or create inner method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1266544609


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -95,59 +102,118 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
     case p: LogicalPlan if p.isStreaming => (plan, false)
 
     case m: MultiInstanceRelation =>
-      val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
-      if (existingRelations.contains(planWrapper)) {
-        val newNode = m.newInstance()
-        newNode.copyTagsFrom(m)
-        (newNode, true)
-      } else {
-        existingRelations.add(planWrapper)
-        (m, false)
-      }
+      deduplicate(existingRelations, m)(Some(_.output.map(_.exprId.id)))(node => node.newInstance()
+        .asInstanceOf[LogicalPlan with MultiInstanceRelation])

Review Comment:
   is this `asInstanceOf` necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1259082276


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala:
##########
@@ -271,7 +271,10 @@ case class ScalarSubquery(
     mayHaveCountBug: Option[Boolean] = None)
   extends SubqueryExpression(plan, outerAttrs, exprId, joinCond, hint) with Unevaluable {
   override def dataType: DataType = {
-    assert(plan.schema.fields.nonEmpty, "Scalar subquery should have only one column")
+    if (!plan.schema.fields.nonEmpty) {

Review Comment:
   make sure the `AnalysisException` will be throw, not `AssertionError`



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala:
##########
@@ -437,25 +437,6 @@ class LeftSemiAntiJoinPushDownSuite extends PlanTest {
     }
   }
 
-  Seq(LeftSemi, LeftAnti).foreach { case jt =>

Review Comment:
   This test unnecessary. Because we can deduplicate those attributes in anti-join / semi-join is a self-join. Please refer https://github.com/apache/spark/pull/39131



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on PR #41347:
URL: https://github.com/apache/spark/pull/41347#issuecomment-1622656145

   cc @jchen5 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1256454196


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -105,6 +105,21 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
         (m, false)
       }
 
+    case p @ Project(_, child) if p.resolved && p.projectList.forall(_.isInstanceOf[Alias]) =>
+      val (renewed, changed) = renewDuplicatedRelations(existingRelations, child)
+      val (newPlan, planChanged) = getNewPlanWithNewChildren(existingRelations, Array(renewed),
+        p, changed)
+      var newProject = newPlan.asInstanceOf[Project]
+      val planWrapper = RelationWrapper(p.getClass, newProject.projectList.map(_.exprId.id))

Review Comment:
   `Project` is a common node and let's not always create the `RelationWrapper` here.
   
   We can find the new attributes precisely from `Project`:
   ```
   projectList.collect {
     case a: Alias => a.exprId.id
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1262394603


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -117,37 +311,49 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
           }
         }
 
-        val planWithNewSubquery = plan.transformExpressions {
-          case subquery: SubqueryExpression =>
-            val (renewed, changed) = renewDuplicatedRelations(existingRelations, subquery.plan)
-            if (changed) planChanged = true
-            subquery.withNewPlan(renewed)
-        }
+        val (newPlan, changed) = getNewPlanWithNewChildren(existingRelations, newChildren.toArray,
+          plan, planChanged)
+        planChanged |= changed
+        newPlan
+      } else {
+        plan
+      }
+      (newPlan, planChanged)
+  }
 
-        if (planChanged) {
-          if (planWithNewSubquery.childrenResolved) {
-            val planWithNewChildren = planWithNewSubquery.withNewChildren(newChildren.toSeq)
-            val attrMap = AttributeMap(
-              plan
-                .children
-                .flatMap(_.output).zip(newChildren.flatMap(_.output))
-                .filter { case (a1, a2) => a1.exprId != a2.exprId }
-            )
-            if (attrMap.isEmpty) {
-              planWithNewChildren
-            } else {
-              planWithNewChildren.rewriteAttrs(attrMap)
-            }
-          } else {
-            planWithNewSubquery.withNewChildren(newChildren.toSeq)
-          }
-        } else {
+  private def getNewPlanWithNewChildren(

Review Comment:
   you mean put `renewDuplicatedRelations` and `getNewPlanWithNewChildren` together? Or create inner method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1267491914


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -95,59 +102,124 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
     case p: LogicalPlan if p.isStreaming => (plan, false)
 
     case m: MultiInstanceRelation =>
-      val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
-      if (existingRelations.contains(planWrapper)) {
-        val newNode = m.newInstance()
-        newNode.copyTagsFrom(m)
-        (newNode, true)
-      } else {
-        existingRelations.add(planWrapper)
-        (m, false)
-      }
+      deduplicateAndRenew(existingRelations, m)(_.output.map(_.exprId.id))(node =>
+        node.newInstance().asInstanceOf[LogicalPlan with MultiInstanceRelation])
+
+    case p: Project =>
+      deduplicateAndRenew(existingRelations, p)(newProject => findAliases(newProject.projectList)
+        .map(_.exprId.id).toSeq)(newProject => newProject.copy(newAliases(newProject.projectList)))
+
+    case s: SerializeFromObject =>
+      deduplicateAndRenew(existingRelations, s)(_.serializer.map(_.exprId.id))(newSer =>
+        newSer.copy(newSer.serializer.map(_.newInstance())))
+
+    case f: FlatMapGroupsInPandas =>
+      deduplicateAndRenew(existingRelations, f)(_.output.map(_.exprId.id))(newFlatMap =>
+        newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))
+
+    case f: FlatMapCoGroupsInPandas =>
+      deduplicateAndRenew(existingRelations, f)(_.output.map(_.exprId.id))(newFlatMap =>
+        newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))
+
+    case m: MapInPandas =>
+      deduplicateAndRenew(existingRelations, m)(_.output.map(_.exprId.id))(newMap =>
+        newMap.copy(output = newMap.output.map(_.newInstance())))
+
+    case p: PythonMapInArrow =>
+      deduplicateAndRenew(existingRelations, p)(_.output.map(_.exprId.id))(newMap =>
+        newMap.copy(output = newMap.output.map(_.newInstance())))
+
+    case a: AttachDistributedSequence =>
+      deduplicateAndRenew(existingRelations, a)(_.producedAttributes.map(_.exprId.id).toSeq)(
+        newAttach => newAttach.copy(sequenceAttr = newAttach.producedAttributes
+          .map(_.newInstance()).head))
+
+    case g: Generate =>
+      deduplicateAndRenew(existingRelations, g)(_.generatorOutput.map(_.exprId.id))(newGenerate =>
+        newGenerate.copy(generatorOutput = newGenerate.generatorOutput.map(_.newInstance())))
+
+    case e: Expand =>
+      deduplicateAndRenew(existingRelations, e)(_.producedAttributes.map(_.exprId.id).toSeq)(
+        newExpand => newExpand.copy(output = newExpand.output.map(_.newInstance())))
+
+    case w: Window =>
+      deduplicateAndRenew(existingRelations, w)(_.windowExpressions
+        .map(_.exprId.id))(newWindow => newWindow.copy(windowExpressions =
+        newWindow.windowExpressions.map(_.newInstance())))
+
+    case s: ScriptTransformation =>
+      deduplicateAndRenew(existingRelations, s)(_.output.map(_.exprId.id))(
+        newScript => newScript.copy(output = newScript.output.map(_.newInstance())))
 
     case plan: LogicalPlan =>
-      var planChanged = false
-      val newPlan = if (plan.children.nonEmpty) {
-        val newChildren = mutable.ArrayBuffer.empty[LogicalPlan]
-        for (c <- plan.children) {
-          val (renewed, changed) = renewDuplicatedRelations(existingRelations, c)
-          newChildren += renewed
-          if (changed) {
-            planChanged = true
-          }
-        }
+      deduplicate(existingRelations, plan)
+  }
 
-        val planWithNewSubquery = plan.transformExpressions {
-          case subquery: SubqueryExpression =>
-            val (renewed, changed) = renewDuplicatedRelations(existingRelations, subquery.plan)
-            if (changed) planChanged = true
-            subquery.withNewPlan(renewed)
+  private def deduplicate(existingRelations: mutable.HashSet[RelationWrapper], plan: LogicalPlan):
+  (LogicalPlan, Boolean) = {

Review Comment:
   ```suggestion
     private def deduplicate(
         existingRelations: mutable.HashSet[RelationWrapper],
         plan: LogicalPlan): (LogicalPlan, Boolean) = {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1267491713


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -95,59 +102,124 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
     case p: LogicalPlan if p.isStreaming => (plan, false)
 
     case m: MultiInstanceRelation =>
-      val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
-      if (existingRelations.contains(planWrapper)) {
-        val newNode = m.newInstance()
-        newNode.copyTagsFrom(m)
-        (newNode, true)
-      } else {
-        existingRelations.add(planWrapper)
-        (m, false)
-      }
+      deduplicateAndRenew(existingRelations, m)(_.output.map(_.exprId.id))(node =>
+        node.newInstance().asInstanceOf[LogicalPlan with MultiInstanceRelation])
+
+    case p: Project =>
+      deduplicateAndRenew(existingRelations, p)(newProject => findAliases(newProject.projectList)
+        .map(_.exprId.id).toSeq)(newProject => newProject.copy(newAliases(newProject.projectList)))

Review Comment:
   nit: I'd rather define `deduplicateAndRenew` normally instead of curry, so that the caller can be clearer
   ```
   deduplicateAndRenew(
     existingRelations,
     p,
     newProject => findAliases(newProject.projectList).map(_.exprId.id).toSeq,
     newProject => newProject.copy(newAliases(newProject.projectList))
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1266504189


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -117,37 +311,49 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
           }
         }
 
-        val planWithNewSubquery = plan.transformExpressions {
-          case subquery: SubqueryExpression =>
-            val (renewed, changed) = renewDuplicatedRelations(existingRelations, subquery.plan)
-            if (changed) planChanged = true
-            subquery.withNewPlan(renewed)
-        }
+        val (newPlan, changed) = getNewPlanWithNewChildren(existingRelations, newChildren.toArray,
+          plan, planChanged)
+        planChanged |= changed
+        newPlan
+      } else {
+        plan
+      }
+      (newPlan, planChanged)
+  }
 
-        if (planChanged) {
-          if (planWithNewSubquery.childrenResolved) {
-            val planWithNewChildren = planWithNewSubquery.withNewChildren(newChildren.toSeq)
-            val attrMap = AttributeMap(
-              plan
-                .children
-                .flatMap(_.output).zip(newChildren.flatMap(_.output))
-                .filter { case (a1, a2) => a1.exprId != a2.exprId }
-            )
-            if (attrMap.isEmpty) {
-              planWithNewChildren
-            } else {
-              planWithNewChildren.rewriteAttrs(attrMap)
-            }
-          } else {
-            planWithNewSubquery.withNewChildren(newChildren.toSeq)
-          }
-        } else {
+  private def getNewPlanWithNewChildren(

Review Comment:
   Thanks @cloud-fan , Your code change cleared my mind. I refactored the code based on your changes. Please continue review when you are not busy



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1266555252


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -95,59 +102,118 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
     case p: LogicalPlan if p.isStreaming => (plan, false)
 
     case m: MultiInstanceRelation =>
-      val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
-      if (existingRelations.contains(planWrapper)) {
-        val newNode = m.newInstance()
-        newNode.copyTagsFrom(m)
-        (newNode, true)
-      } else {
-        existingRelations.add(planWrapper)
-        (m, false)
-      }
+      deduplicate(existingRelations, m)(Some(_.output.map(_.exprId.id)))(node => node.newInstance()
+        .asInstanceOf[LogicalPlan with MultiInstanceRelation])
+
+    case p: Project =>
+      deduplicate(existingRelations, p)(Some(newProject => findAliases(newProject.projectList)
+        .map(_.exprId.id).toSeq))(newProject => newProject.copy(newAliases(newProject.projectList)))
+
+    case s: SerializeFromObject =>
+      deduplicate(existingRelations, s)(Some(_.serializer.map(_.exprId.id)))(newSer =>
+        newSer.copy(newSer.serializer.map(_.newInstance())))
+
+    case f: FlatMapGroupsInPandas =>
+      deduplicate(existingRelations, f)(Some(_.output.map(_.exprId.id)))(newFlatMap =>
+        newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))
+
+    case f: FlatMapCoGroupsInPandas =>
+      deduplicate(existingRelations, f)(Some(_.output.map(_.exprId.id)))(newFlatMap =>
+        newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))
+
+    case m: MapInPandas =>
+      deduplicate(existingRelations, m)(Some(_.output.map(_.exprId.id)))(newMap =>
+        newMap.copy(output = newMap.output.map(_.newInstance())))
+
+    case p: PythonMapInArrow =>
+      deduplicate(existingRelations, p)(Some(_.output.map(_.exprId.id)))(newMap =>
+        newMap.copy(output = newMap.output.map(_.newInstance())))
+
+    case a: AttachDistributedSequence =>
+      deduplicate(existingRelations, a)(Some(_.producedAttributes.map(_.exprId.id).toSeq))(
+        newAttach => newAttach.copy(sequenceAttr = newAttach.producedAttributes
+          .map(_.newInstance()).head))
+
+    case g: Generate =>
+      deduplicate(existingRelations, g)(Some(_.generatorOutput.map(_.exprId.id)))(newGenerate =>
+        newGenerate.copy(generatorOutput = newGenerate.generatorOutput.map(_.newInstance())))
+
+    case e: Expand =>
+      deduplicate(existingRelations, e)(Some(_.producedAttributes.map(_.exprId.id).toSeq))(
+        newExpand => newExpand.copy(output = newExpand.output.map(_.newInstance())))
+
+    case w: Window =>
+      deduplicate(existingRelations, w)(Some(_.windowExpressions
+        .map(_.exprId.id)))(newWindow => newWindow.copy(windowExpressions =
+        newWindow.windowExpressions.map(_.newInstance())))
+
+    case s: ScriptTransformation =>
+      deduplicate(existingRelations, s)(Some(_.output.map(_.exprId.id)))(
+        newScript => newScript.copy(output = newScript.output.map(_.newInstance())))
 
     case plan: LogicalPlan =>
-      var planChanged = false
-      val newPlan = if (plan.children.nonEmpty) {
-        val newChildren = mutable.ArrayBuffer.empty[LogicalPlan]
-        for (c <- plan.children) {
-          val (renewed, changed) = renewDuplicatedRelations(existingRelations, c)
-          newChildren += renewed
-          if (changed) {
-            planChanged = true
-          }
-        }
+      deduplicate(existingRelations, plan)()(p => p)
+  }
 
-        val planWithNewSubquery = plan.transformExpressions {
-          case subquery: SubqueryExpression =>
-            val (renewed, changed) = renewDuplicatedRelations(existingRelations, subquery.plan)
-            if (changed) planChanged = true
-            subquery.withNewPlan(renewed)
+  private def deduplicate[T <: LogicalPlan](
+      existingRelations: mutable.HashSet[RelationWrapper], plan: T)
+      (getExprIds: Option[T => Seq[Long]] = Option.empty)

Review Comment:
   most cases would invoke `deduplicateAndRenew`, `case LogicalPlan` invokes `deduplicate`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized
URL: https://github.com/apache/spark/pull/41347


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1258323196


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -105,6 +105,21 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
         (m, false)
       }
 
+    case p @ Project(_, child) if p.resolved && p.projectList.forall(_.isInstanceOf[Alias]) =>

Review Comment:
   I add all plan node which in `collectConflictPlans`. Please check again, but there are another problem. How to test it, seem produce an negative case not easy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #41347:
URL: https://github.com/apache/spark/pull/41347#issuecomment-1565912639

   cc @cloud-fan @jchen5 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1258323571


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -105,6 +105,21 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
         (m, false)
       }
 
+    case p @ Project(_, child) if p.resolved && p.projectList.forall(_.isInstanceOf[Alias]) =>
+      val (renewed, changed) = renewDuplicatedRelations(existingRelations, child)
+      val (newPlan, planChanged) = getNewPlanWithNewChildren(existingRelations, Array(renewed),
+        p, changed)
+      var newProject = newPlan.asInstanceOf[Project]
+      val planWrapper = RelationWrapper(p.getClass, newProject.projectList.map(_.exprId.id))

Review Comment:
   Thanks for remind. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1263227170


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -117,37 +311,49 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
           }
         }
 
-        val planWithNewSubquery = plan.transformExpressions {
-          case subquery: SubqueryExpression =>
-            val (renewed, changed) = renewDuplicatedRelations(existingRelations, subquery.plan)
-            if (changed) planChanged = true
-            subquery.withNewPlan(renewed)
-        }
+        val (newPlan, changed) = getNewPlanWithNewChildren(existingRelations, newChildren.toArray,
+          plan, planChanged)
+        planChanged |= changed
+        newPlan
+      } else {
+        plan
+      }
+      (newPlan, planChanged)
+  }
 
-        if (planChanged) {
-          if (planWithNewSubquery.childrenResolved) {
-            val planWithNewChildren = planWithNewSubquery.withNewChildren(newChildren.toSeq)
-            val attrMap = AttributeMap(
-              plan
-                .children
-                .flatMap(_.output).zip(newChildren.flatMap(_.output))
-                .filter { case (a1, a2) => a1.exprId != a2.exprId }
-            )
-            if (attrMap.isEmpty) {
-              planWithNewChildren
-            } else {
-              planWithNewChildren.rewriteAttrs(attrMap)
-            }
-          } else {
-            planWithNewSubquery.withNewChildren(newChildren.toSeq)
-          }
-        } else {
+  private def getNewPlanWithNewChildren(

Review Comment:
   It is different after the change. We need to recursively process the node and refresh its relationship. Before the change, there is only one `MultiInstanceRelation`, and it has no child, so you can do `getNewPlanWithNewChildren` in the LogicalPlan case (because it will match LogicalPlan except for `MultiInstanceRelation`, but this is not the case now) . But for example, for `Project`, we need to process the child of the `Project` first when we match the `Project`, and call `getNewPlanWithNewChildren` on the child of the `Project` at the same time. Then we can deal with the conflicts of the `Project`. It's hard to do this without extract out a function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #41347:
URL: https://github.com/apache/spark/pull/41347#issuecomment-1569900731

   I think a better fix is to let `DeduplicateRelations` handle `Project` with alias as well, which is also a source of conflicting attribute ids.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1266547531


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -95,59 +102,118 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
     case p: LogicalPlan if p.isStreaming => (plan, false)
 
     case m: MultiInstanceRelation =>
-      val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
-      if (existingRelations.contains(planWrapper)) {
-        val newNode = m.newInstance()
-        newNode.copyTagsFrom(m)
-        (newNode, true)
-      } else {
-        existingRelations.add(planWrapper)
-        (m, false)
-      }
+      deduplicate(existingRelations, m)(Some(_.output.map(_.exprId.id)))(node => node.newInstance()
+        .asInstanceOf[LogicalPlan with MultiInstanceRelation])

Review Comment:
   Yes, because `m` are `LogicalPlan with MultiInstanceRelation`, so we must return `LogicalPlan with MultiInstanceRelation`. Without `asInstanceOf`, the compile will fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1266553718


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -95,59 +102,118 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
     case p: LogicalPlan if p.isStreaming => (plan, false)
 
     case m: MultiInstanceRelation =>
-      val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
-      if (existingRelations.contains(planWrapper)) {
-        val newNode = m.newInstance()
-        newNode.copyTagsFrom(m)
-        (newNode, true)
-      } else {
-        existingRelations.add(planWrapper)
-        (m, false)
-      }
+      deduplicate(existingRelations, m)(Some(_.output.map(_.exprId.id)))(node => node.newInstance()
+        .asInstanceOf[LogicalPlan with MultiInstanceRelation])
+
+    case p: Project =>
+      deduplicate(existingRelations, p)(Some(newProject => findAliases(newProject.projectList)
+        .map(_.exprId.id).toSeq))(newProject => newProject.copy(newAliases(newProject.projectList)))
+
+    case s: SerializeFromObject =>
+      deduplicate(existingRelations, s)(Some(_.serializer.map(_.exprId.id)))(newSer =>
+        newSer.copy(newSer.serializer.map(_.newInstance())))
+
+    case f: FlatMapGroupsInPandas =>
+      deduplicate(existingRelations, f)(Some(_.output.map(_.exprId.id)))(newFlatMap =>
+        newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))
+
+    case f: FlatMapCoGroupsInPandas =>
+      deduplicate(existingRelations, f)(Some(_.output.map(_.exprId.id)))(newFlatMap =>
+        newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))
+
+    case m: MapInPandas =>
+      deduplicate(existingRelations, m)(Some(_.output.map(_.exprId.id)))(newMap =>
+        newMap.copy(output = newMap.output.map(_.newInstance())))
+
+    case p: PythonMapInArrow =>
+      deduplicate(existingRelations, p)(Some(_.output.map(_.exprId.id)))(newMap =>
+        newMap.copy(output = newMap.output.map(_.newInstance())))
+
+    case a: AttachDistributedSequence =>
+      deduplicate(existingRelations, a)(Some(_.producedAttributes.map(_.exprId.id).toSeq))(
+        newAttach => newAttach.copy(sequenceAttr = newAttach.producedAttributes
+          .map(_.newInstance()).head))
+
+    case g: Generate =>
+      deduplicate(existingRelations, g)(Some(_.generatorOutput.map(_.exprId.id)))(newGenerate =>
+        newGenerate.copy(generatorOutput = newGenerate.generatorOutput.map(_.newInstance())))
+
+    case e: Expand =>
+      deduplicate(existingRelations, e)(Some(_.producedAttributes.map(_.exprId.id).toSeq))(
+        newExpand => newExpand.copy(output = newExpand.output.map(_.newInstance())))
+
+    case w: Window =>
+      deduplicate(existingRelations, w)(Some(_.windowExpressions
+        .map(_.exprId.id)))(newWindow => newWindow.copy(windowExpressions =
+        newWindow.windowExpressions.map(_.newInstance())))
+
+    case s: ScriptTransformation =>
+      deduplicate(existingRelations, s)(Some(_.output.map(_.exprId.id)))(
+        newScript => newScript.copy(output = newScript.output.map(_.newInstance())))
 
     case plan: LogicalPlan =>
-      var planChanged = false
-      val newPlan = if (plan.children.nonEmpty) {
-        val newChildren = mutable.ArrayBuffer.empty[LogicalPlan]
-        for (c <- plan.children) {
-          val (renewed, changed) = renewDuplicatedRelations(existingRelations, c)
-          newChildren += renewed
-          if (changed) {
-            planChanged = true
-          }
-        }
+      deduplicate(existingRelations, plan)()(p => p)
+  }
 
-        val planWithNewSubquery = plan.transformExpressions {
-          case subquery: SubqueryExpression =>
-            val (renewed, changed) = renewDuplicatedRelations(existingRelations, subquery.plan)
-            if (changed) planChanged = true
-            subquery.withNewPlan(renewed)
+  private def deduplicate[T <: LogicalPlan](
+      existingRelations: mutable.HashSet[RelationWrapper], plan: T)
+      (getExprIds: Option[T => Seq[Long]] = Option.empty)

Review Comment:
   I find the code a bit hard to read with too many lambda parameters. How about the following code structure?
   ```
   def deduplicate(...) // deduplicate relations in children and update attriubte references
   def deduplicateAndRenew(...) // invokes  `deduplicate` and renew attributes in itself
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1266570258


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -95,59 +102,118 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
     case p: LogicalPlan if p.isStreaming => (plan, false)
 
     case m: MultiInstanceRelation =>
-      val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
-      if (existingRelations.contains(planWrapper)) {
-        val newNode = m.newInstance()
-        newNode.copyTagsFrom(m)
-        (newNode, true)
-      } else {
-        existingRelations.add(planWrapper)
-        (m, false)
-      }
+      deduplicate(existingRelations, m)(Some(_.output.map(_.exprId.id)))(node => node.newInstance()
+        .asInstanceOf[LogicalPlan with MultiInstanceRelation])
+
+    case p: Project =>
+      deduplicate(existingRelations, p)(Some(newProject => findAliases(newProject.projectList)
+        .map(_.exprId.id).toSeq))(newProject => newProject.copy(newAliases(newProject.projectList)))
+
+    case s: SerializeFromObject =>
+      deduplicate(existingRelations, s)(Some(_.serializer.map(_.exprId.id)))(newSer =>
+        newSer.copy(newSer.serializer.map(_.newInstance())))
+
+    case f: FlatMapGroupsInPandas =>
+      deduplicate(existingRelations, f)(Some(_.output.map(_.exprId.id)))(newFlatMap =>
+        newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))
+
+    case f: FlatMapCoGroupsInPandas =>
+      deduplicate(existingRelations, f)(Some(_.output.map(_.exprId.id)))(newFlatMap =>
+        newFlatMap.copy(output = newFlatMap.output.map(_.newInstance())))
+
+    case m: MapInPandas =>
+      deduplicate(existingRelations, m)(Some(_.output.map(_.exprId.id)))(newMap =>
+        newMap.copy(output = newMap.output.map(_.newInstance())))
+
+    case p: PythonMapInArrow =>
+      deduplicate(existingRelations, p)(Some(_.output.map(_.exprId.id)))(newMap =>
+        newMap.copy(output = newMap.output.map(_.newInstance())))
+
+    case a: AttachDistributedSequence =>
+      deduplicate(existingRelations, a)(Some(_.producedAttributes.map(_.exprId.id).toSeq))(
+        newAttach => newAttach.copy(sequenceAttr = newAttach.producedAttributes
+          .map(_.newInstance()).head))
+
+    case g: Generate =>
+      deduplicate(existingRelations, g)(Some(_.generatorOutput.map(_.exprId.id)))(newGenerate =>
+        newGenerate.copy(generatorOutput = newGenerate.generatorOutput.map(_.newInstance())))
+
+    case e: Expand =>
+      deduplicate(existingRelations, e)(Some(_.producedAttributes.map(_.exprId.id).toSeq))(
+        newExpand => newExpand.copy(output = newExpand.output.map(_.newInstance())))
+
+    case w: Window =>
+      deduplicate(existingRelations, w)(Some(_.windowExpressions
+        .map(_.exprId.id)))(newWindow => newWindow.copy(windowExpressions =
+        newWindow.windowExpressions.map(_.newInstance())))
+
+    case s: ScriptTransformation =>
+      deduplicate(existingRelations, s)(Some(_.output.map(_.exprId.id)))(
+        newScript => newScript.copy(output = newScript.output.map(_.newInstance())))
 
     case plan: LogicalPlan =>
-      var planChanged = false
-      val newPlan = if (plan.children.nonEmpty) {
-        val newChildren = mutable.ArrayBuffer.empty[LogicalPlan]
-        for (c <- plan.children) {
-          val (renewed, changed) = renewDuplicatedRelations(existingRelations, c)
-          newChildren += renewed
-          if (changed) {
-            planChanged = true
-          }
-        }
+      deduplicate(existingRelations, plan)()(p => p)
+  }
 
-        val planWithNewSubquery = plan.transformExpressions {
-          case subquery: SubqueryExpression =>
-            val (renewed, changed) = renewDuplicatedRelations(existingRelations, subquery.plan)
-            if (changed) planChanged = true
-            subquery.withNewPlan(renewed)
+  private def deduplicate[T <: LogicalPlan](
+      existingRelations: mutable.HashSet[RelationWrapper], plan: T)
+      (getExprIds: Option[T => Seq[Long]] = Option.empty)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #41347:
URL: https://github.com/apache/spark/pull/41347#issuecomment-1643004942

   The k8s failure is unrelated, I'm merging it to master, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1273601061


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -95,59 +102,161 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
     case p: LogicalPlan if p.isStreaming => (plan, false)
 
     case m: MultiInstanceRelation =>
-      val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
-      if (existingRelations.contains(planWrapper)) {
-        val newNode = m.newInstance()
-        newNode.copyTagsFrom(m)
-        (newNode, true)
-      } else {
-        existingRelations.add(planWrapper)
-        (m, false)
-      }
+      deduplicateAndRenew[LogicalPlan with MultiInstanceRelation](
+        existingRelations,
+        m,
+        _.output.map(_.exprId.id),
+        node => node.newInstance().asInstanceOf[LogicalPlan with MultiInstanceRelation])
+
+    case p: Project =>

Review Comment:
   `Aggregate` is missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #41347:
URL: https://github.com/apache/spark/pull/41347#issuecomment-1579935398

   > I think a better fix is to let `DeduplicateRelations` handle `Project` with alias as well, which is also a source of conflicting attribute ids.
   
   @cloud-fan Hi, I already implement `DeduplicateRelations` to handle with `Project`. Should I revert change from `RewriteCorrelatedScalarSubquery`? I think it's a hidden danger 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1256451094


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -105,6 +105,21 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
         (m, false)
       }
 
+    case p @ Project(_, child) if p.resolved && p.projectList.forall(_.isInstanceOf[Alias]) =>

Review Comment:
   I'm reading the doc of the `collectConflictPlans` function in this class. I think the problem we have now is there are more plan nodes than the leaf node that can produce new attributes, and we need to handle all of them. `Project` is not the only way, let's follow what plan nodes `collectConflictPlans` handles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Hisoka-X commented on a diff in pull request #41347: [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #41347:
URL: https://github.com/apache/spark/pull/41347#discussion_r1258309081


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -116,38 +131,48 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
             planChanged = true
           }
         }
+        val (newPlan, changed) = getNewPlanWithNewChildren(existingRelations, newChildren.toArray,

Review Comment:
   In `getNewPlanWithNewChildren` will check `SubqueryExpression`, so can't skip `getNewPlanWithNewChildren` at now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org