You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/29 00:47:37 UTC
spark git commit: [SPARK-21222] Move elimination of Distinct clause
from analyzer to optimizer
Repository: spark
Updated Branches:
refs/heads/master e68aed70f -> b72b8521d
[SPARK-21222] Move elimination of Distinct clause from analyzer to optimizer
## What changes were proposed in this pull request?
Move elimination of Distinct clause from analyzer to optimizer
Distinct clause is useless after MAX/MIN clause. For example,
"Select MAX(distinct a) FROM src from"
is equivalent of
"Select MAX(a) FROM src from"
However, this optimization is implemented in analyzer. It should be in optimizer.
## How was this patch tested?
Unit test
gatorsmile cloud-fan
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Wang Gengliang <lt...@gmail.com>
Closes #18429 from gengliangwang/distinct_opt.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b72b8521
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b72b8521
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b72b8521
Branch: refs/heads/master
Commit: b72b8521d9cad878a1a4e4dbb19cf980169dcbc7
Parents: e68aed7
Author: Wang Gengliang <lt...@gmail.com>
Authored: Thu Jun 29 08:47:31 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jun 29 08:47:31 2017 +0800
----------------------------------------------------------------------
.../spark/sql/catalyst/analysis/Analyzer.scala | 5 --
.../apache/spark/sql/catalyst/dsl/package.scala | 2 +
.../sql/catalyst/optimizer/Optimizer.scala | 15 ++++++
.../optimizer/EliminateDistinctSuite.scala | 56 ++++++++++++++++++++
4 files changed, 73 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b72b8521/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 434b6ff..5353649 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1197,11 +1197,6 @@ class Analyzer(
case u @ UnresolvedFunction(funcId, children, isDistinct) =>
withPosition(u) {
catalog.lookupFunction(funcId, children) match {
- // DISTINCT is not meaningful for a Max or a Min.
- case max: Max if isDistinct =>
- AggregateExpression(max, Complete, isDistinct = false)
- case min: Min if isDistinct =>
- AggregateExpression(min, Complete, isDistinct = false)
// AggregateWindowFunctions are AggregateFunctions that can only be evaluated within
// the context of a Window clause. They do not need to be wrapped in an
// AggregateExpression.
http://git-wip-us.apache.org/repos/asf/spark/blob/b72b8521/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index beee93d..f679256 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -159,7 +159,9 @@ package object dsl {
def first(e: Expression): Expression = new First(e).toAggregateExpression()
def last(e: Expression): Expression = new Last(e).toAggregateExpression()
def min(e: Expression): Expression = Min(e).toAggregateExpression()
+ def minDistinct(e: Expression): Expression = Min(e).toAggregateExpression(isDistinct = true)
def max(e: Expression): Expression = Max(e).toAggregateExpression()
+ def maxDistinct(e: Expression): Expression = Max(e).toAggregateExpression(isDistinct = true)
def upper(e: Expression): Expression = Upper(e)
def lower(e: Expression): Expression = Lower(e)
def sqrt(e: Expression): Expression = Sqrt(e)
http://git-wip-us.apache.org/repos/asf/spark/blob/b72b8521/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b410312..946fa7b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -40,6 +40,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf)
protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations)
def batches: Seq[Batch] = {
+ Batch("Eliminate Distinct", Once, EliminateDistinct) ::
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
@@ -152,6 +153,20 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf)
}
/**
+ * Remove useless DISTINCT for MAX and MIN.
+ * This rule should be applied before RewriteDistinctAggregates.
+ */
+object EliminateDistinct extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions {
+ case ae: AggregateExpression if ae.isDistinct =>
+ ae.aggregateFunction match {
+ case _: Max | _: Min => ae.copy(isDistinct = false)
+ case _ => ae
+ }
+ }
+}
+
+/**
* An optimizer used in test code.
*
* To ensure extendability, we leave the standard rules in the abstract optimizer rules, while
http://git-wip-us.apache.org/repos/asf/spark/blob/b72b8521/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
new file mode 100644
index 0000000..f40691b
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class EliminateDistinctSuite extends PlanTest {
+
+ object Optimize extends RuleExecutor[LogicalPlan] {
+ val batches =
+ Batch("Operator Optimizations", Once,
+ EliminateDistinct) :: Nil
+ }
+
+ val testRelation = LocalRelation('a.int)
+
+ test("Eliminate Distinct in Max") {
+ val query = testRelation
+ .select(maxDistinct('a).as('result))
+ .analyze
+ val answer = testRelation
+ .select(max('a).as('result))
+ .analyze
+ assert(query != answer)
+ comparePlans(Optimize.execute(query), answer)
+ }
+
+ test("Eliminate Distinct in Min") {
+ val query = testRelation
+ .select(minDistinct('a).as('result))
+ .analyze
+ val answer = testRelation
+ .select(min('a).as('result))
+ .analyze
+ assert(query != answer)
+ comparePlans(Optimize.execute(query), answer)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org