You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2021/02/26 12:59:35 UTC

[spark] branch master updated: [SPARK-33971][SQL] Eliminate distinct from more aggregates

This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 67ec4f7  [SPARK-33971][SQL] Eliminate distinct from more aggregates
67ec4f7 is described below

commit 67ec4f7f67dc494c2619b7faf1b1145f2200b65c
Author: tanel.kiis@gmail.com <ta...@gmail.com>
AuthorDate: Fri Feb 26 21:59:02 2021 +0900

    [SPARK-33971][SQL] Eliminate distinct from more aggregates
    
    ### What changes were proposed in this pull request?
    
    Add more aggregate expressions to `EliminateDistinct` rule.
    
    ### Why are the changes needed?
    
    Distinct aggregation can add a significant overhead. It's better to remove distinct whenever possible.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UT
    
    Closes #30999 from tanelk/SPARK-33971_eliminate_distinct.
    
    Authored-by: tanel.kiis@gmail.com <ta...@gmail.com>
    Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 16 ++++++---
 .../optimizer/EliminateDistinctSuite.scala         | 41 +++++++++++-----------
 2 files changed, 32 insertions(+), 25 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 717770f..cb24180 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -352,11 +352,17 @@ abstract class Optimizer(catalogManager: CatalogManager)
  */
 object EliminateDistinct extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions  {
-    case ae: AggregateExpression if ae.isDistinct =>
-      ae.aggregateFunction match {
-        case _: Max | _: Min => ae.copy(isDistinct = false)
-        case _ => ae
-      }
+    case ae: AggregateExpression if ae.isDistinct && isDuplicateAgnostic(ae.aggregateFunction) =>
+      ae.copy(isDistinct = false)
+  }
+
+  private def isDuplicateAgnostic(af: AggregateFunction): Boolean = af match {
+    case _: Max => true
+    case _: Min => true
+    case _: BitAndAgg => true
+    case _: BitOrAgg => true
+    case _: CollectSet => true
+    case _ => false
   }
 }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
index 51c7519..0848d56 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
@@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -32,25 +34,24 @@ class EliminateDistinctSuite extends PlanTest {
 
   val testRelation = LocalRelation('a.int)
 
-  test("Eliminate Distinct in Max") {
-    val query = testRelation
-      .select(maxDistinct('a).as('result))
-      .analyze
-    val answer = testRelation
-      .select(max('a).as('result))
-      .analyze
-    assert(query != answer)
-    comparePlans(Optimize.execute(query), answer)
-  }
-
-  test("Eliminate Distinct in Min") {
-    val query = testRelation
-      .select(minDistinct('a).as('result))
-      .analyze
-    val answer = testRelation
-      .select(min('a).as('result))
-      .analyze
-    assert(query != answer)
-    comparePlans(Optimize.execute(query), answer)
+  Seq(
+    Max(_),
+    Min(_),
+    BitAndAgg(_),
+    BitOrAgg(_),
+    CollectSet(_: Expression)
+  ).foreach {
+    aggBuilder =>
+      val agg = aggBuilder('a)
+      test(s"Eliminate Distinct in ${agg.prettyName}") {
+        val query = testRelation
+          .select(agg.toAggregateExpression(isDistinct = true).as('result))
+          .analyze
+        val answer = testRelation
+          .select(agg.toAggregateExpression(isDistinct = false).as('result))
+          .analyze
+        assert(query != answer)
+        comparePlans(Optimize.execute(query), answer)
+      }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org