You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zml1206 (via GitHub)" <gi...@apache.org> on 2024/02/01 02:38:22 UTC

Re: [PR] [SPARK-46487][SQL] Push down part of filter through aggregate with nondeterministic field [spark]

zml1206 commented on code in PR #44460:
URL: https://github.com/apache/spark/pull/44460#discussion_r1473727151


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1718,22 +1718,26 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe
     // implies that, for a given input row, the output are determined by the expression's initial
     // state and all the input rows processed before. In another word, the order of input rows
     // matters for non-deterministic expressions, while pushing down predicates changes the order.
-    // This also applies to Aggregate.
     case Filter(condition, project @ Project(fields, grandChild))
       if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) =>
       val aliasMap = getAliasMap(project)
       project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
 
+    // Push `Filter` operators through `Aggregate` operators. Parts of the predicates that can
+    // be beneath must satisfy the following conditions:
+    // 1. Grouping expressions are not empty.
+    // 2. Predicate expression is deterministic.
+    // 3. References of predicate expression are subset of aggregate's child.
     case filter @ Filter(condition, aggregate: Aggregate)
-      if aggregate.aggregateExpressions.forall(_.deterministic)
+      if aggregate.aggregateExpressions.exists(_.deterministic)
         && aggregate.groupingExpressions.nonEmpty =>
       val aliasMap = getAliasMap(aggregate)
 
       // For each filter, expand the alias and check if the filter can be evaluated using
       // attributes produced by the aggregate operator's child operator.
       val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
         val replaced = replaceAlias(cond, aliasMap)
-        cond.deterministic && !cond.throwable &&
+        replaced.deterministic && !cond.throwable &&

Review Comment:
   I think can push down throwable filter through aggregate, it seems does not affect exception thrown
   . What do you think? @cloud-fan 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org