You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2021/02/26 12:59:35 UTC
[spark] branch master updated: [SPARK-33971][SQL] Eliminate
distinct from more aggregates
This is an automated email from the ASF dual-hosted git repository.
yamamuro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 67ec4f7 [SPARK-33971][SQL] Eliminate distinct from more aggregates
67ec4f7 is described below
commit 67ec4f7f67dc494c2619b7faf1b1145f2200b65c
Author: tanel.kiis@gmail.com <ta...@gmail.com>
AuthorDate: Fri Feb 26 21:59:02 2021 +0900
[SPARK-33971][SQL] Eliminate distinct from more aggregates
### What changes were proposed in this pull request?
Add more aggregate expressions to `EliminateDistinct` rule.
### Why are the changes needed?
Distinct aggregation can add a significant overhead. It's better to remove distinct whenever possible.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #30999 from tanelk/SPARK-33971_eliminate_distinct.
Authored-by: tanel.kiis@gmail.com <ta...@gmail.com>
Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 16 ++++++---
.../optimizer/EliminateDistinctSuite.scala | 41 +++++++++++-----------
2 files changed, 32 insertions(+), 25 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 717770f..cb24180 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -352,11 +352,17 @@ abstract class Optimizer(catalogManager: CatalogManager)
*/
object EliminateDistinct extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions {
- case ae: AggregateExpression if ae.isDistinct =>
- ae.aggregateFunction match {
- case _: Max | _: Min => ae.copy(isDistinct = false)
- case _ => ae
- }
+ case ae: AggregateExpression if ae.isDistinct && isDuplicateAgnostic(ae.aggregateFunction) =>
+ ae.copy(isDistinct = false)
+ }
+
+ private def isDuplicateAgnostic(af: AggregateFunction): Boolean = af match {
+ case _: Max => true
+ case _: Min => true
+ case _: BitAndAgg => true
+ case _: BitOrAgg => true
+ case _: CollectSet => true
+ case _ => false
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
index 51c7519..0848d56 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
@@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -32,25 +34,24 @@ class EliminateDistinctSuite extends PlanTest {
val testRelation = LocalRelation('a.int)
- test("Eliminate Distinct in Max") {
- val query = testRelation
- .select(maxDistinct('a).as('result))
- .analyze
- val answer = testRelation
- .select(max('a).as('result))
- .analyze
- assert(query != answer)
- comparePlans(Optimize.execute(query), answer)
- }
-
- test("Eliminate Distinct in Min") {
- val query = testRelation
- .select(minDistinct('a).as('result))
- .analyze
- val answer = testRelation
- .select(min('a).as('result))
- .analyze
- assert(query != answer)
- comparePlans(Optimize.execute(query), answer)
+ Seq(
+ Max(_),
+ Min(_),
+ BitAndAgg(_),
+ BitOrAgg(_),
+ CollectSet(_: Expression)
+ ).foreach {
+ aggBuilder =>
+ val agg = aggBuilder('a)
+ test(s"Eliminate Distinct in ${agg.prettyName}") {
+ val query = testRelation
+ .select(agg.toAggregateExpression(isDistinct = true).as('result))
+ .analyze
+ val answer = testRelation
+ .select(agg.toAggregateExpression(isDistinct = false).as('result))
+ .analyze
+ assert(query != answer)
+ comparePlans(Optimize.execute(query), answer)
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org