You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/29 00:47:37 UTC

spark git commit: [SPARK-21222] Move elimination of Distinct clause from analyzer to optimizer

Repository: spark
Updated Branches:
  refs/heads/master e68aed70f -> b72b8521d


[SPARK-21222] Move elimination of Distinct clause from analyzer to optimizer

## What changes were proposed in this pull request?

Move elimination of Distinct clause from analyzer to optimizer

Distinct clause is useless after MAX/MIN clause. For example,
"Select MAX(distinct a) FROM src from"
is equivalent of
"Select MAX(a) FROM src from"
However, this optimization is implemented in analyzer. It should be in optimizer.

## How was this patch tested?

Unit test

gatorsmile cloud-fan

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Wang Gengliang <lt...@gmail.com>

Closes #18429 from gengliangwang/distinct_opt.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b72b8521
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b72b8521
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b72b8521

Branch: refs/heads/master
Commit: b72b8521d9cad878a1a4e4dbb19cf980169dcbc7
Parents: e68aed7
Author: Wang Gengliang <lt...@gmail.com>
Authored: Thu Jun 29 08:47:31 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jun 29 08:47:31 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  5 --
 .../apache/spark/sql/catalyst/dsl/package.scala |  2 +
 .../sql/catalyst/optimizer/Optimizer.scala      | 15 ++++++
 .../optimizer/EliminateDistinctSuite.scala      | 56 ++++++++++++++++++++
 4 files changed, 73 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b72b8521/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 434b6ff..5353649 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1197,11 +1197,6 @@ class Analyzer(
           case u @ UnresolvedFunction(funcId, children, isDistinct) =>
             withPosition(u) {
               catalog.lookupFunction(funcId, children) match {
-                // DISTINCT is not meaningful for a Max or a Min.
-                case max: Max if isDistinct =>
-                  AggregateExpression(max, Complete, isDistinct = false)
-                case min: Min if isDistinct =>
-                  AggregateExpression(min, Complete, isDistinct = false)
                 // AggregateWindowFunctions are AggregateFunctions that can only be evaluated within
                 // the context of a Window clause. They do not need to be wrapped in an
                 // AggregateExpression.

http://git-wip-us.apache.org/repos/asf/spark/blob/b72b8521/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index beee93d..f679256 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -159,7 +159,9 @@ package object dsl {
     def first(e: Expression): Expression = new First(e).toAggregateExpression()
     def last(e: Expression): Expression = new Last(e).toAggregateExpression()
     def min(e: Expression): Expression = Min(e).toAggregateExpression()
+    def minDistinct(e: Expression): Expression = Min(e).toAggregateExpression(isDistinct = true)
     def max(e: Expression): Expression = Max(e).toAggregateExpression()
+    def maxDistinct(e: Expression): Expression = Max(e).toAggregateExpression(isDistinct = true)
     def upper(e: Expression): Expression = Upper(e)
     def lower(e: Expression): Expression = Lower(e)
     def sqrt(e: Expression): Expression = Sqrt(e)

http://git-wip-us.apache.org/repos/asf/spark/blob/b72b8521/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index b410312..946fa7b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -40,6 +40,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf)
   protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations)
 
   def batches: Seq[Batch] = {
+    Batch("Eliminate Distinct", Once, EliminateDistinct) ::
     // Technically some of the rules in Finish Analysis are not optimizer rules and belong more
     // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
     // However, because we also use the analyzer to canonicalized queries (for view definition),
@@ -152,6 +153,20 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf)
 }
 
 /**
+ * Remove useless DISTINCT for MAX and MIN.
+ * This rule should be applied before RewriteDistinctAggregates.
+ */
+object EliminateDistinct extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions  {
+    case ae: AggregateExpression if ae.isDistinct =>
+      ae.aggregateFunction match {
+        case _: Max | _: Min => ae.copy(isDistinct = false)
+        case _ => ae
+      }
+  }
+}
+
+/**
  * An optimizer used in test code.
  *
  * To ensure extendability, we leave the standard rules in the abstract optimizer rules, while

http://git-wip-us.apache.org/repos/asf/spark/blob/b72b8521/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
new file mode 100644
index 0000000..f40691b
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+class EliminateDistinctSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Operator Optimizations", Once,
+        EliminateDistinct) :: Nil
+  }
+
+  val testRelation = LocalRelation('a.int)
+
+  test("Eliminate Distinct in Max") {
+    val query = testRelation
+      .select(maxDistinct('a).as('result))
+      .analyze
+    val answer = testRelation
+      .select(max('a).as('result))
+      .analyze
+    assert(query != answer)
+    comparePlans(Optimize.execute(query), answer)
+  }
+
+  test("Eliminate Distinct in Min") {
+    val query = testRelation
+      .select(minDistinct('a).as('result))
+      .analyze
+    val answer = testRelation
+      .select(min('a).as('result))
+      .analyze
+    assert(query != answer)
+    comparePlans(Optimize.execute(query), answer)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org