You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/18 18:36:31 UTC

[GitHub] [spark] ahshahid opened a new pull request, #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it

ahshahid opened a new pull request, #38714:
URL: https://github.com/apache/spark/pull/38714

   ### What changes were proposed in this pull request?
   This is a PR for improvement
   When a subquery references the outer query's aggregate functions,  in some cases, it ends up introducing extra aggregate functions which are not needed. Though they would get eventually eliminated in the optimizer, but atleast in analyzer phase would add an extra project node etc.
   The change is in the code of identification of OuterReference in subquery.scala.
   Currently whenever an aggregate expression is found, it is assumed to be the Outer Reference.
   With this change,  the code checks whether the parent Expression can also be potentially part of the OuterReference too.
   So if we consider a query
   select cos (sum (a) ) , b from t1 having exists select 1 from t2 where x = cos ( sum(a) ) 
   
   the OuterReference detected would be cos ( sum(a) ) instead of just sum(a).
   As a result, no extra aggregate would be added.
   
   
   ### Why are the changes needed?
   To avoid adding unnecessary aggregate in outer query thereby reducing the number of expressions to analyze, clone and also avoid adding an extra project node.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Ran the precheckin tests and added new tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ahshahid commented on a diff in pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it

Posted by GitBox <gi...@apache.org>.
ahshahid commented on code in PR #38714:
URL: https://github.com/apache/spark/pull/38714#discussion_r1029760100


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala:
##########
@@ -270,4 +277,66 @@ class ResolveSubquerySuite extends AnalysisTest {
       ), Seq(a, b)).as("sub") :: Nil, t1)
     )
   }
+
+  test("SPARK-41141 aggregates of outer query referenced in subquery should not create" +
+    " new aggregates if possible") {
+    withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> s"${PropagateEmptyRelation.ruleName}") {
+      val a = 'a.int
+      val b = 'b.int
+      val c = 'c.int
+      val d = 'd.int
+
+      val t1 = LocalRelation(a, b)
+      val t2 = LocalRelation(c, d)
+      val optimizer = new SimpleTestOptimizer()
+
+      val plansToTest = Seq(
+        t1.select($"a", $"b").
+          having($"b")(Cos(sum($"a")))(Exists(t2.select($"c").
+            where($"d" === Cos(sum($"a"))))) -> 1,
+
+        t1.select($"a", $"b").
+          having($"b")(sum($"a"))(Exists(t2.select($"c").
+            where($"d" === Cos(sum($"a"))))) -> 1,
+        t1.select($"a", $"b").
+          having($"b")(Cos(sum($"a")))(Exists(t2.select($"c").
+            where($"d" === sum($"a")))) -> 2,
+        t1.select($"a", $"b").
+          having($"b")(sum($"a"), Cos(sum($"b")))(Exists(t2.select($"c").
+            where($"d" === Cos(sum($"a")) + sum($"a") + sum($"b") + Cos(sum($"b"))))) -> 3
+      )
+
+      plansToTest.foreach {
+        case (logicalPlan: LogicalPlan, numAggFunctions) =>
+          assertAnalysis(logicalPlan, numAggFunctions)
+      }
+
+      def assertAnalysis(logicalPlan: LogicalPlan, expectedAggregateFunctions: Int): Unit = {
+        val analyzedQuery = logicalPlan.analyze
+        Assert.assertTrue(analyzedQuery.analyzed)
+        val optimizedQuery = optimizer.execute(analyzedQuery)

Review Comment:
   I recently experienced in different work ( about DataSourceV2 non required projects ) that even though the analyzed plan's analyzed boolean was true, but the plan did not have correct resolution and exception showed up during optimization. so this is just a safety check..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #38714:
URL: https://github.com/apache/spark/pull/38714#issuecomment-1475045067

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ahshahid commented on a diff in pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it

Posted by GitBox <gi...@apache.org>.
ahshahid commented on code in PR #38714:
URL: https://github.com/apache/spark/pull/38714#discussion_r1030004911


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala:
##########
@@ -17,13 +17,20 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
+import org.junit.Assert
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{CreateArray, Expression, GetStructField, InSubquery, LateralSubquery, ListQuery, OuterReference, ScalarSubquery}
-import org.apache.spark.sql.catalyst.expressions.aggregate.Count
+import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Cos, CreateArray, Exists, Expression, GetStructField, InSubquery, LateralSubquery, ListQuery, OuterReference, ScalarSubquery, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, Count}
+import org.apache.spark.sql.catalyst.optimizer.{PropagateEmptyRelation, SimpleTestOptimizer}
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+

Review Comment:
   done



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala:
##########
@@ -17,13 +17,20 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
+import org.junit.Assert

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ahshahid commented on a diff in pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it

Posted by GitBox <gi...@apache.org>.
ahshahid commented on code in PR #38714:
URL: https://github.com/apache/spark/pull/38714#discussion_r1030005359


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala:
##########
@@ -208,20 +208,33 @@ object SubExprUtils extends PredicateHelper {
    */
   def getOuterReferences(expr: Expression): Seq[Expression] = {
     val outerExpressions = ArrayBuffer.empty[Expression]
-    def collectOutRefs(input: Expression): Unit = input match {
+
+    def collectOutRefs(input: Expression): Boolean = input match {
       case a: AggregateExpression if containsOuter(a) =>
         if (a.references.nonEmpty) {
           throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql, a.origin)
         } else {
           // Collect and update the sub-tree so that outer references inside this aggregate
           // expression will not be collected. For example: min(outer(a)) -> min(a).
-          val newExpr = stripOuterReference(a)
-          outerExpressions += newExpr
+          // delay collecting outer expression as we want to go as much up as possible

Review Comment:
   But will add some more tests to validate that whole arithmetic expression is replaced by outer ref & not just a subexpr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] peter-toth commented on a diff in pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it

Posted by GitBox <gi...@apache.org>.
peter-toth commented on code in PR #38714:
URL: https://github.com/apache/spark/pull/38714#discussion_r1029660173


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala:
##########
@@ -270,4 +277,66 @@ class ResolveSubquerySuite extends AnalysisTest {
       ), Seq(a, b)).as("sub") :: Nil, t1)
     )
   }
+
+  test("SPARK-41141 aggregates of outer query referenced in subquery should not create" +
+    " new aggregates if possible") {
+    withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> s"${PropagateEmptyRelation.ruleName}") {
+      val a = 'a.int
+      val b = 'b.int
+      val c = 'c.int
+      val d = 'd.int
+
+      val t1 = LocalRelation(a, b)
+      val t2 = LocalRelation(c, d)
+      val optimizer = new SimpleTestOptimizer()
+
+      val plansToTest = Seq(
+        t1.select($"a", $"b").
+          having($"b")(Cos(sum($"a")))(Exists(t2.select($"c").
+            where($"d" === Cos(sum($"a"))))) -> 1,
+
+        t1.select($"a", $"b").
+          having($"b")(sum($"a"))(Exists(t2.select($"c").
+            where($"d" === Cos(sum($"a"))))) -> 1,
+        t1.select($"a", $"b").
+          having($"b")(Cos(sum($"a")))(Exists(t2.select($"c").
+            where($"d" === sum($"a")))) -> 2,
+        t1.select($"a", $"b").
+          having($"b")(sum($"a"), Cos(sum($"b")))(Exists(t2.select($"c").
+            where($"d" === Cos(sum($"a")) + sum($"a") + sum($"b") + Cos(sum($"b"))))) -> 3
+      )
+
+      plansToTest.foreach {
+        case (logicalPlan: LogicalPlan, numAggFunctions) =>
+          assertAnalysis(logicalPlan, numAggFunctions)
+      }
+
+      def assertAnalysis(logicalPlan: LogicalPlan, expectedAggregateFunctions: Int): Unit = {
+        val analyzedQuery = logicalPlan.analyze
+        Assert.assertTrue(analyzedQuery.analyzed)
+        val optimizedQuery = optimizer.execute(analyzedQuery)

Review Comment:
   Why do we need `optimizedQuery` in this test?
   



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala:
##########
@@ -17,13 +17,20 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
+import org.junit.Assert

Review Comment:
   Scalatest assert should be available, no need for junit here.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala:
##########
@@ -17,13 +17,20 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
+import org.junit.Assert
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{CreateArray, Expression, GetStructField, InSubquery, LateralSubquery, ListQuery, OuterReference, ScalarSubquery}
-import org.apache.spark.sql.catalyst.expressions.aggregate.Count
+import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Cos, CreateArray, Exists, Expression, GetStructField, InSubquery, LateralSubquery, ListQuery, OuterReference, ScalarSubquery, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, Count}
+import org.apache.spark.sql.catalyst.optimizer.{PropagateEmptyRelation, SimpleTestOptimizer}
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+

Review Comment:
   No need for this extra line.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala:
##########
@@ -208,20 +208,33 @@ object SubExprUtils extends PredicateHelper {
    */
   def getOuterReferences(expr: Expression): Seq[Expression] = {
     val outerExpressions = ArrayBuffer.empty[Expression]
-    def collectOutRefs(input: Expression): Unit = input match {
+
+    def collectOutRefs(input: Expression): Boolean = input match {
       case a: AggregateExpression if containsOuter(a) =>
         if (a.references.nonEmpty) {
           throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql, a.origin)
         } else {
           // Collect and update the sub-tree so that outer references inside this aggregate
           // expression will not be collected. For example: min(outer(a)) -> min(a).
-          val newExpr = stripOuterReference(a)
-          outerExpressions += newExpr
+          // delay collecting outer expression as we want to go as much up as possible

Review Comment:
   I wonder if it would make sense to handle like `sum(a) + 1` like outer expressions too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ahshahid commented on a diff in pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it

Posted by GitBox <gi...@apache.org>.
ahshahid commented on code in PR #38714:
URL: https://github.com/apache/spark/pull/38714#discussion_r1029763899


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala:
##########
@@ -208,20 +208,33 @@ object SubExprUtils extends PredicateHelper {
    */
   def getOuterReferences(expr: Expression): Seq[Expression] = {
     val outerExpressions = ArrayBuffer.empty[Expression]
-    def collectOutRefs(input: Expression): Unit = input match {
+
+    def collectOutRefs(input: Expression): Boolean = input match {
       case a: AggregateExpression if containsOuter(a) =>
         if (a.references.nonEmpty) {
           throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql, a.origin)
         } else {
           // Collect and update the sub-tree so that outer references inside this aggregate
           // expression will not be collected. For example: min(outer(a)) -> min(a).
-          val newExpr = stripOuterReference(a)
-          outerExpressions += newExpr
+          // delay collecting outer expression as we want to go as much up as possible

Review Comment:
   I thought about that, but there are situations which I am not sure if they are correct or meaningful, related to binary operators.
   for eg projection as (sum(a) + sum(b) ) .. 
   Not sure if this is a valid expression involving aggregates in project and having clause.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ahshahid commented on a diff in pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it

Posted by GitBox <gi...@apache.org>.
ahshahid commented on code in PR #38714:
URL: https://github.com/apache/spark/pull/38714#discussion_r1029822940


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala:
##########
@@ -208,20 +208,33 @@ object SubExprUtils extends PredicateHelper {
    */
   def getOuterReferences(expr: Expression): Seq[Expression] = {
     val outerExpressions = ArrayBuffer.empty[Expression]
-    def collectOutRefs(input: Expression): Unit = input match {
+
+    def collectOutRefs(input: Expression): Boolean = input match {
       case a: AggregateExpression if containsOuter(a) =>
         if (a.references.nonEmpty) {
           throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql, a.origin)
         } else {
           // Collect and update the sub-tree so that outer references inside this aggregate
           // expression will not be collected. For example: min(outer(a)) -> min(a).
-          val newExpr = stripOuterReference(a)
-          outerExpressions += newExpr
+          // delay collecting outer expression as we want to go as much up as possible

Review Comment:
   But you are right.. I suppose so long as any expression is such that  so long as only one of the child returns "true", then the current expression can be part of the outer ref.
   so basically it means that as long as the situation is not like.  (sum(a) + sum(b)  +  8, types... the expression can be considered to be referencable using outer ref.
   Will make that change..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it
URL: https://github.com/apache/spark/pull/38714


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38714:
URL: https://github.com/apache/spark/pull/38714#issuecomment-1321252021

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ahshahid commented on a diff in pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it

Posted by GitBox <gi...@apache.org>.
ahshahid commented on code in PR #38714:
URL: https://github.com/apache/spark/pull/38714#discussion_r1030005062


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala:
##########
@@ -208,20 +208,33 @@ object SubExprUtils extends PredicateHelper {
    */
   def getOuterReferences(expr: Expression): Seq[Expression] = {
     val outerExpressions = ArrayBuffer.empty[Expression]
-    def collectOutRefs(input: Expression): Unit = input match {
+
+    def collectOutRefs(input: Expression): Boolean = input match {
       case a: AggregateExpression if containsOuter(a) =>
         if (a.references.nonEmpty) {
           throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql, a.origin)
         } else {
           // Collect and update the sub-tree so that outer references inside this aggregate
           // expression will not be collected. For example: min(outer(a)) -> min(a).
-          val newExpr = stripOuterReference(a)
-          outerExpressions += newExpr
+          // delay collecting outer expression as we want to go as much up as possible

Review Comment:
   implemented..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ahshahid commented on a diff in pull request #38714: [WIP][SPARK-41141]. avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it

Posted by GitBox <gi...@apache.org>.
ahshahid commented on code in PR #38714:
URL: https://github.com/apache/spark/pull/38714#discussion_r1043850142


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala:
##########
@@ -270,4 +277,66 @@ class ResolveSubquerySuite extends AnalysisTest {
       ), Seq(a, b)).as("sub") :: Nil, t1)
     )
   }
+
+  test("SPARK-41141 aggregates of outer query referenced in subquery should not create" +
+    " new aggregates if possible") {
+    withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> s"${PropagateEmptyRelation.ruleName}") {
+      val a = 'a.int
+      val b = 'b.int
+      val c = 'c.int
+      val d = 'd.int
+
+      val t1 = LocalRelation(a, b)
+      val t2 = LocalRelation(c, d)
+      val optimizer = new SimpleTestOptimizer()
+
+      val plansToTest = Seq(
+        t1.select($"a", $"b").
+          having($"b")(Cos(sum($"a")))(Exists(t2.select($"c").
+            where($"d" === Cos(sum($"a"))))) -> 1,
+
+        t1.select($"a", $"b").
+          having($"b")(sum($"a"))(Exists(t2.select($"c").
+            where($"d" === Cos(sum($"a"))))) -> 1,
+        t1.select($"a", $"b").
+          having($"b")(Cos(sum($"a")))(Exists(t2.select($"c").
+            where($"d" === sum($"a")))) -> 2,
+        t1.select($"a", $"b").
+          having($"b")(sum($"a"), Cos(sum($"b")))(Exists(t2.select($"c").
+            where($"d" === Cos(sum($"a")) + sum($"a") + sum($"b") + Cos(sum($"b"))))) -> 3
+      )
+
+      plansToTest.foreach {
+        case (logicalPlan: LogicalPlan, numAggFunctions) =>
+          assertAnalysis(logicalPlan, numAggFunctions)
+      }
+
+      def assertAnalysis(logicalPlan: LogicalPlan, expectedAggregateFunctions: Int): Unit = {
+        val analyzedQuery = logicalPlan.analyze
+        Assert.assertTrue(analyzedQuery.analyzed)
+        val optimizedQuery = optimizer.execute(analyzedQuery)

Review Comment:
   @peter-toth .. updated the test & implemented the feedback for handling cases like  (1 + sum(x) ) being used in inner query. pls check..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org