You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/09/29 21:30:29 UTC
spark git commit: [SPARK-17653][SQL] Remove unnecessary distincts in
multiple unions
Repository: spark
Updated Branches:
refs/heads/master fe33121a5 -> 566d7f282
[SPARK-17653][SQL] Remove unnecessary distincts in multiple unions
## What changes were proposed in this pull request?
Currently for `Union [Distinct]`, a `Distinct` operator is necessary to be on the top of `Union`. Once there are adjacent `Union [Distinct]`, there will be multiple `Distinct` in the query plan.
E.g.,
For a query like: select 1 a union select 2 b union select 3 c
Before this patch, its physical plan looks like:
*HashAggregate(keys=[a#13], functions=[])
+- Exchange hashpartitioning(a#13, 200)
+- *HashAggregate(keys=[a#13], functions=[])
+- Union
:- *HashAggregate(keys=[a#13], functions=[])
: +- Exchange hashpartitioning(a#13, 200)
: +- *HashAggregate(keys=[a#13], functions=[])
: +- Union
: :- *Project [1 AS a#13]
: : +- Scan OneRowRelation[]
: +- *Project [2 AS b#14]
: +- Scan OneRowRelation[]
+- *Project [3 AS c#15]
+- Scan OneRowRelation[]
Only the top distinct should be necessary.
After this patch, the physical plan looks like:
*HashAggregate(keys=[a#221], functions=[], output=[a#221])
+- Exchange hashpartitioning(a#221, 5)
+- *HashAggregate(keys=[a#221], functions=[], output=[a#221])
+- Union
:- *Project [1 AS a#221]
: +- Scan OneRowRelation[]
:- *Project [2 AS b#222]
: +- Scan OneRowRelation[]
+- *Project [3 AS c#223]
+- Scan OneRowRelation[]
## How was this patch tested?
Jenkins tests.
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #15238 from viirya/remove-extra-distinct-union.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/566d7f28
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/566d7f28
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/566d7f28
Branch: refs/heads/master
Commit: 566d7f28275f90f7b9bed6a75e90989ad0c59931
Parents: fe33121
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Thu Sep 29 14:30:23 2016 -0700
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Sep 29 14:30:23 2016 -0700
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 24 ++++++-
.../spark/sql/catalyst/planning/patterns.scala | 27 --------
.../catalyst/optimizer/SetOperationSuite.scala | 68 ++++++++++++++++++++
3 files changed, 89 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/566d7f28/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 4952ba3..9df8ce1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
import scala.collection.immutable.HashSet
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.api.java.function.FilterFunction
@@ -29,7 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
-import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, Unions}
+import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -579,8 +580,25 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
* Combines all adjacent [[Union]] operators into a single [[Union]].
*/
object CombineUnions extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan transform {
- case Unions(children) => Union(children)
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
+ case u: Union => flattenUnion(u, false)
+ case Distinct(u: Union) => Distinct(flattenUnion(u, true))
+ }
+
+ private def flattenUnion(union: Union, flattenDistinct: Boolean): Union = {
+ val stack = mutable.Stack[LogicalPlan](union)
+ val flattened = mutable.ArrayBuffer.empty[LogicalPlan]
+ while (stack.nonEmpty) {
+ stack.pop() match {
+ case Distinct(Union(children)) if flattenDistinct =>
+ stack.pushAll(children.reverse)
+ case Union(children) =>
+ stack.pushAll(children.reverse)
+ case child =>
+ flattened += child
+ }
+ }
+ Union(flattened)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/566d7f28/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 41cabb8..bdae568 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -188,33 +188,6 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper {
}
}
-
-/**
- * A pattern that collects all adjacent unions and returns their children as a Seq.
- */
-object Unions {
- def unapply(plan: LogicalPlan): Option[Seq[LogicalPlan]] = plan match {
- case u: Union => Some(collectUnionChildren(mutable.Stack(u), Seq.empty[LogicalPlan]))
- case _ => None
- }
-
- // Doing a depth-first tree traversal to combine all the union children.
- @tailrec
- private def collectUnionChildren(
- plans: mutable.Stack[LogicalPlan],
- children: Seq[LogicalPlan]): Seq[LogicalPlan] = {
- if (plans.isEmpty) children
- else {
- plans.pop match {
- case Union(grandchildren) =>
- grandchildren.reverseMap(plans.push(_))
- collectUnionChildren(plans, children)
- case other => collectUnionChildren(plans, children :+ other)
- }
- }
- }
-}
-
/**
* An extractor used when planning the physical execution of an aggregation. Compared with a logical
* aggregation, the following transformations are performed:
http://git-wip-us.apache.org/repos/asf/spark/blob/566d7f28/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
index 7227706..21b7f49 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
@@ -76,4 +77,71 @@ class SetOperationSuite extends PlanTest {
testRelation3.select('g) :: Nil).analyze
comparePlans(unionOptimized, unionCorrectAnswer)
}
+
+ test("Remove unnecessary distincts in multiple unions") {
+ val query1 = OneRowRelation
+ .select(Literal(1).as('a))
+ val query2 = OneRowRelation
+ .select(Literal(2).as('b))
+ val query3 = OneRowRelation
+ .select(Literal(3).as('c))
+
+ // D - U - D - U - query1
+ // | |
+ // query3 query2
+ val unionQuery1 = Distinct(Union(Distinct(Union(query1, query2)), query3)).analyze
+ val optimized1 = Optimize.execute(unionQuery1)
+ val distinctUnionCorrectAnswer1 =
+ Distinct(Union(query1 :: query2 :: query3 :: Nil)).analyze
+ comparePlans(distinctUnionCorrectAnswer1, optimized1)
+
+ // query1
+ // |
+ // D - U - U - query2
+ // |
+ // D - U - query2
+ // |
+ // query3
+ val unionQuery2 = Distinct(Union(Union(query1, query2),
+ Distinct(Union(query2, query3)))).analyze
+ val optimized2 = Optimize.execute(unionQuery2)
+ val distinctUnionCorrectAnswer2 =
+ Distinct(Union(query1 :: query2 :: query2 :: query3 :: Nil)).analyze
+ comparePlans(distinctUnionCorrectAnswer2, optimized2)
+ }
+
+ test("Keep necessary distincts in multiple unions") {
+ val query1 = OneRowRelation
+ .select(Literal(1).as('a))
+ val query2 = OneRowRelation
+ .select(Literal(2).as('b))
+ val query3 = OneRowRelation
+ .select(Literal(3).as('c))
+ val query4 = OneRowRelation
+ .select(Literal(4).as('d))
+
+ // U - D - U - query1
+ // | |
+ // query3 query2
+ val unionQuery1 = Union(Distinct(Union(query1, query2)), query3).analyze
+ val optimized1 = Optimize.execute(unionQuery1)
+ val distinctUnionCorrectAnswer1 =
+ Union(Distinct(Union(query1 :: query2 :: Nil)) :: query3 :: Nil).analyze
+ comparePlans(distinctUnionCorrectAnswer1, optimized1)
+
+ // query1
+ // |
+ // U - D - U - query2
+ // |
+ // D - U - query3
+ // |
+ // query4
+ val unionQuery2 =
+ Union(Distinct(Union(query1, query2)), Distinct(Union(query3, query4))).analyze
+ val optimized2 = Optimize.execute(unionQuery2)
+ val distinctUnionCorrectAnswer2 =
+ Union(Distinct(Union(query1 :: query2 :: Nil)),
+ Distinct(Union(query3 :: query4 :: Nil))).analyze
+ comparePlans(distinctUnionCorrectAnswer2, optimized2)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org