You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/16 02:21:34 UTC
spark git commit: [SPARK-18300][SQL] Do not apply foldable
propagation with expand as a child [BRANCH-2.0]
Repository: spark
Updated Branches:
refs/heads/branch-2.0 e2452c632 -> 8d55886aa
[SPARK-18300][SQL] Do not apply foldable propagation with expand as a child [BRANCH-2.0]
## What changes were proposed in this pull request?
The `FoldablePropagation` optimizer rule, pulls foldable values out from under an `Expand`. This breaks the `Expand` in two ways:
- It rewrites the output attributes of the `Expand`. We explicitly define output attributes for `Expand`, these are (unfortunately) considered as part of the expressions of the `Expand` and can be rewritten.
- Expand can actually change the column (it will typically re-use the attributes or the underlying plan). This means that we cannot safely propagate the expressions from under an `Expand`.
This PR fixes this and (hopefully) other issues by explicitly whitelisting allowed operators.
This is a backport of https://github.com/apache/spark/pull/15857
## How was this patch tested?
Added tests to `FoldablePropagationSuite` and to `SQLQueryTestSuite`.
Author: Herman van Hovell <hv...@databricks.com>
Closes #15892 from hvanhovell/SPARK-18300-branch-2.0.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d55886a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d55886a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d55886a
Branch: refs/heads/branch-2.0
Commit: 8d55886aaa781f3b9f09de1a2d6b422c95dcb4d2
Parents: e2452c6
Author: Herman van Hovell <hv...@databricks.com>
Authored: Tue Nov 15 18:21:26 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Nov 15 18:21:26 2016 -0800
----------------------------------------------------------------------
.../sql/catalyst/optimizer/Optimizer.scala | 78 +++++++++++++-------
.../optimizer/FoldablePropagationSuite.scala | 28 ++++++-
.../resources/sql-tests/inputs/group-by.sql | 3 +
.../sql-tests/results/group-by.sql.out | 10 ++-
4 files changed, 88 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8d55886a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f0992b3..0a28ef4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -646,46 +646,72 @@ object FoldablePropagation extends Rule[LogicalPlan] {
}
case _ => Nil
})
+ val replaceFoldable: PartialFunction[Expression, Expression] = {
+ case a: AttributeReference if foldableMap.contains(a) => foldableMap(a)
+ }
if (foldableMap.isEmpty) {
plan
} else {
var stop = false
CleanupAliases(plan.transformUp {
- case u: Union =>
- stop = true
- u
- case c: Command =>
- stop = true
- c
- // For outer join, although its output attributes are derived from its children, they are
- // actually different attributes: the output of outer join is not always picked from its
- // children, but can also be null.
+ // A leaf node should not stop the folding process (note that we are traversing up the
+ // tree, starting at the leaf nodes); so we are allowing it.
+ case l: LeafNode =>
+ l
+
+ // We can only propagate foldables for a subset of unary nodes.
+ case u: UnaryNode if !stop && canPropagateFoldables(u) =>
+ u.transformExpressions(replaceFoldable)
+
+ // Allow inner joins. We do not allow outer join, although its output attributes are
+ // derived from its children, they are actually different attributes: the output of outer
+ // join is not always picked from its children, but can also be null.
// TODO(cloud-fan): It seems more reasonable to use new attributes as the output attributes
// of outer join.
- case j @ Join(_, _, LeftOuter | RightOuter | FullOuter, _) =>
+ case j @ Join(_, _, Inner, _) =>
+ j.transformExpressions(replaceFoldable)
+
+ // We can fold the projections an expand holds. However expand changes the output columns
+ // and often reuses the underlying attributes; so we cannot assume that a column is still
+ // foldable after the expand has been applied.
+ // TODO(hvanhovell): Expand should use new attributes as the output attributes.
+ case expand: Expand if !stop =>
+ val newExpand = expand.copy(projections = expand.projections.map { projection =>
+ projection.map(_.transform(replaceFoldable))
+ })
stop = true
- j
+ newExpand
- // These 3 operators take attributes as constructor parameters, and these attributes
- // can't be replaced by alias.
- case m: MapGroups =>
- stop = true
- m
- case f: FlatMapGroupsInR =>
- stop = true
- f
- case c: CoGroup =>
+ case other =>
stop = true
- c
-
- case p: LogicalPlan if !stop => p.transformExpressions {
- case a: AttributeReference if foldableMap.contains(a) =>
- foldableMap(a)
- }
+ other
})
}
}
+
+ /**
+ * Whitelist of all [[UnaryNode]]s for which we allow foldable propagation.
+ */
+ private def canPropagateFoldables(u: UnaryNode): Boolean = u match {
+ case _: Project => true
+ case _: Filter => true
+ case _: SubqueryAlias => true
+ case _: Aggregate => true
+ case _: Window => true
+ case _: Sample => true
+ case _: GlobalLimit => true
+ case _: LocalLimit => true
+ case _: Generate => true
+ case _: Distinct => true
+ case _: AppendColumns => true
+ case _: AppendColumnsWithObject => true
+ case _: BroadcastHint => true
+ case _: RedistributeData => true
+ case _: Repartition => true
+ case _: Sort => true
+ case _ => false
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/8d55886a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
index 355b3fc..bbef212 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
@@ -116,16 +116,36 @@ class FoldablePropagationSuite extends PlanTest {
test("Propagate in subqueries of Union queries") {
val query = Union(
Seq(
- testRelation.select(Literal(1).as('x), 'a).select('x + 'a),
- testRelation.select(Literal(2).as('x), 'a).select('x + 'a)))
+ testRelation.select(Literal(1).as('x), 'a).select('x, 'x + 'a),
+ testRelation.select(Literal(2).as('x), 'a).select('x, 'x + 'a)))
.select('x)
val optimized = Optimize.execute(query.analyze)
val correctAnswer = Union(
Seq(
- testRelation.select(Literal(1).as('x), 'a).select((Literal(1).as('x) + 'a).as("(x + a)")),
- testRelation.select(Literal(2).as('x), 'a).select((Literal(2).as('x) + 'a).as("(x + a)"))))
+ testRelation.select(Literal(1).as('x), 'a)
+ .select(Literal(1).as('x), (Literal(1).as('x) + 'a).as("(x + a)")),
+ testRelation.select(Literal(2).as('x), 'a)
+ .select(Literal(2).as('x), (Literal(2).as('x) + 'a).as("(x + a)"))))
.select('x).analyze
comparePlans(optimized, correctAnswer)
}
+
+ test("Propagate in expand") {
+ val c1 = Literal(1).as('a)
+ val c2 = Literal(2).as('b)
+ val a1 = c1.toAttribute.withNullability(true)
+ val a2 = c2.toAttribute.withNullability(true)
+ val expand = Expand(
+ Seq(Seq(Literal(null), 'b), Seq('a, Literal(null))),
+ Seq(a1, a2),
+ OneRowRelation.select(c1, c2))
+ val query = expand.where(a1.isNotNull).select(a1, a2).analyze
+ val optimized = Optimize.execute(query)
+ val correctExpand = expand.copy(projections = Seq(
+ Seq(Literal(null), c2),
+ Seq(c1, Literal(null))))
+ val correctAnswer = correctExpand.where(a1.isNotNull).select(a1, a2).analyze
+ comparePlans(optimized, correctAnswer)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8d55886a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
index d950ec8..4d0ed43 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
@@ -32,3 +32,6 @@ SELECT a + 1 + 1, COUNT(b) FROM testData GROUP BY a + 1;
-- Aggregate with nulls.
SELECT SKEWNESS(a), KURTOSIS(a), MIN(a), MAX(a), AVG(a), VARIANCE(a), STDDEV(a), SUM(a), COUNT(a)
FROM testData;
+
+-- Aggregate with foldable input and multiple distinct groups.
+SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) GROUP BY a;
http://git-wip-us.apache.org/repos/asf/spark/blob/8d55886a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
index a91f04e..62ac69a 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 14
+-- Number of queries: 15
-- !query 0
@@ -131,3 +131,11 @@ FROM testData
struct<skewness(CAST(a AS DOUBLE)):double,kurtosis(CAST(a AS DOUBLE)):double,min(a):int,max(a):int,avg(a):double,var_samp(CAST(a AS DOUBLE)):double,stddev_samp(CAST(a AS DOUBLE)):double,sum(a):bigint,count(a):bigint>
-- !query 13 output
-0.2723801058145729 -1.5069204152249134 1 3 2.142857142857143 0.8095238095238094 0.8997354108424372 15 7
+
+
+-- !query 14
+SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) GROUP BY a
+-- !query 14 schema
+struct<count(DISTINCT b):bigint,count(DISTINCT b, c):bigint>
+-- !query 14 output
+1 1
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org