You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "jchen5 (via GitHub)" <gi...@apache.org> on 2023/10/16 20:32:42 UTC

Re: [PR] [SPARK-45507][SQL] Correctness fix for nested correlated scalar subqueries with COUNT aggregates [spark]

jchen5 commented on code in PR #43341:
URL: https://github.com/apache/spark/pull/43341#discussion_r1361234951


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala:
##########
@@ -360,8 +360,40 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper
     plan.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
       case ScalarSubquery(sub, children, exprId, conditions, hint, mayHaveCountBugOld)
         if children.nonEmpty =>
-        val (newPlan, newCond) = decorrelate(sub, plan)
-        val mayHaveCountBug = if (mayHaveCountBugOld.isEmpty) {
+
+        def mayHaveCountBugAgg(a: Aggregate): Boolean = {
+          a.groupingExpressions.isEmpty && a.aggregateExpressions.exists(_.exists {
+            case a: AggregateExpression => a.aggregateFunction.defaultResult.isDefined
+            case _ => false
+          })
+        }
+
+        // The below logic controls handling count bug for scalar subqueries in
+        // [[DecorrelateInnerQuery]], and if we don't handle it here, we handle it in
+        // [[RewriteCorrelatedScalarSubquery#constructLeftJoins]]. Note that handling it in
+        // [[DecorrelateInnerQuery]] is always correct, and turning it off to handle it in
+        // constructLeftJoins is an optimization, so that additional, redundant left outer joins are
+        // not introduced.
+        val handleCountBugInDecorrelate = !conf.getConf(
+          SQLConf.LEGACY_SCALAR_SUBQUERY_COUNT_BUG_HANDLING) && !(sub match {
+          // Handle count bug only if there exists lower level Aggs with count bugs. It does not
+          // matter if the top level agg is count bug vulnerable or not, because:
+          // 1. If the top level agg is count bug vulnerable, it can be handled in
+          // constructLeftJoins, unless there are lower aggs that are count bug vulnerable.
+          // E.g. COUNT(COUNT + COUNT)
+          // 2. If the top level agg is not count bug vulnerable, it can be count bug vulnerable if
+          // there are lower aggs that are count bug vulnerable. E.g. SUM(COUNT)
+          case agg: Aggregate => !agg.child.exists {
+            case lowerAgg: Aggregate => mayHaveCountBugAgg(lowerAgg)
+            case _ => false
+          }
+          case _ => false
+        })
+        val (newPlan, newCond) = decorrelate(sub, plan, handleCountBugInDecorrelate)
+        val mayHaveCountBug = if (handleCountBugInDecorrelate) {
+          // Count bug was already handled in the above decorrelate function call.
+          false
+        } else if (mayHaveCountBugOld.isEmpty) {

Review Comment:
   But if mayHaveCountBugOld is set, the code just keeps that value - decorrelation wouldn't actually run again. IIRC this doesn't come up in real queries, but there are unit tests that check that rules are idempotent which is where that came up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org