You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/29 21:05:59 UTC
[spark] branch master updated: [SPARK-27314][SQL] Deduplicate
exprIds for Union.
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f176dd3 [SPARK-27314][SQL] Deduplicate exprIds for Union.
f176dd3 is described below
commit f176dd3f2827bc2d0ad18bccb5e63414e3fa5ffe
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Fri Mar 29 14:05:38 2019 -0700
[SPARK-27314][SQL] Deduplicate exprIds for Union.
## What changes were proposed in this pull request?
We have been having a potential problem with `Union` when the children have the same expression id in their outputs, which happens when self-union.
## How was this patch tested?
Modified some tests to adjust plan changes.
Closes #24236 from ueshin/issues/SPARK-27314/dedup_union.
Authored-by: Takuya UESHIN <ue...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/sql/catalyst/analysis/Analyzer.scala | 16 ++++++++++++++++
.../catalyst/plans/logical/basicLogicalOperators.scala | 5 +++++
.../catalyst/optimizer/FoldablePropagationSuite.scala | 4 ++--
.../catalyst/optimizer/NestedColumnAliasingSuite.scala | 10 +++++++---
.../catalyst/optimizer/OptimizerRuleExclusionSuite.scala | 6 +++++-
.../spark/sql/catalyst/optimizer/SetOperationSuite.scala | 10 +++++-----
6 files changed, 40 insertions(+), 11 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e216234..01e40e6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -956,6 +956,22 @@ class Analyzer(
i.copy(right = dedupRight(left, right))
case e @ Except(left, right, _) if !e.duplicateResolved =>
e.copy(right = dedupRight(left, right))
+ case u @ Union(children) if !u.duplicateResolved =>
+ // Use projection-based de-duplication for Union to avoid breaking the checkpoint sharing
+ // feature in streaming.
+ val newChildren = children.foldRight(Seq.empty[LogicalPlan]) { (head, tail) =>
+ head +: tail.map {
+ case child if head.outputSet.intersect(child.outputSet).isEmpty =>
+ child
+ case child =>
+ val projectList = child.output.map { attr =>
+ Alias(attr, attr.name)()
+ }
+ Project(projectList, child)
+ }
+ }
+ u.copy(children = newChildren)
+
// When resolve `SortOrder`s in Sort based on child, don't report errors as
// we still have chance to resolve it based on its descendants
case s @ Sort(ordering, global, child) if child.resolved && !s.resolved =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index f7f701c..f4cc79c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -229,6 +229,11 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
}
}
+ def duplicateResolved: Boolean = {
+ children.map(_.outputSet.size).sum ==
+ AttributeSet.fromAttributeSets(children.map(_.outputSet)).size
+ }
+
// updating nullability to make all the children consistent
override def output: Seq[Attribute] =
children.map(_.output).transpose.map(attrs =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
index c288446..0d48ecb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
@@ -132,10 +132,10 @@ class FoldablePropagationSuite extends PlanTest {
test("Propagate in inner join") {
val ta = testRelation.select('a, Literal(1).as('tag))
- .union(testRelation.select('a, Literal(2).as('tag)))
+ .union(testRelation.select('a.as('a), Literal(2).as('tag)))
.subquery('ta)
val tb = testRelation.select('a, Literal(1).as('tag))
- .union(testRelation.select('a, Literal(2).as('tag)))
+ .union(testRelation.select('a.as('a), Literal(2).as('tag)))
.subquery('tb)
val query = ta.join(tb, Inner,
Some("ta.a".attr === "tb.a".attr && "ta.tag".attr === "tb.tag".attr))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
index 87f7d19..abb340e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
@@ -140,7 +140,7 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
}
test("Pushing a single nested field projection - negative") {
- val ops = Array(
+ val ops = Seq(
(input: LogicalPlan) => input.distribute('name)(1),
(input: LogicalPlan) => input.distribute($"name.middle")(1),
(input: LogicalPlan) => input.orderBy('name.asc),
@@ -156,11 +156,15 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
.analyze
}
- val optimizedQueries = queries.map(Optimize.execute)
- val expectedQueries = queries
+ val optimizedQueries :+ optimizedUnion = queries.map(Optimize.execute)
+ val expectedQueries = queries.init
optimizedQueries.zip(expectedQueries).foreach { case (optimized, expected) =>
comparePlans(optimized, expected)
}
+ val expectedUnion =
+ contact.select('name).union(contact.select('name.as('name)))
+ .select(GetStructField('name, 1, Some("middle"))).analyze
+ comparePlans(optimizedUnion, expectedUnion)
}
test("Pushing a single nested field projection through filters - negative") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
index 4fa4a7a..7587776 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
@@ -121,10 +121,14 @@ class OptimizerRuleExclusionSuite extends PlanTest {
PropagateEmptyRelation.ruleName,
CombineUnions.ruleName)
+ val testRelation1 = LocalRelation('a.int, 'b.int, 'c.int)
+ val testRelation2 = LocalRelation('a.int, 'b.int, 'c.int)
+ val testRelation3 = LocalRelation('a.int, 'b.int, 'c.int)
+
withSQLConf(
OPTIMIZER_EXCLUDED_RULES.key -> excludedRules.foldLeft("")((l, r) => l + "," + r)) {
val optimizer = new SimpleTestOptimizer()
- val originalQuery = testRelation.union(testRelation.union(testRelation)).analyze
+ val originalQuery = testRelation1.union(testRelation2.union(testRelation3)).analyze
val optimized = optimizer.execute(originalQuery)
comparePlans(originalQuery, optimized)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
index 17e00c9..3d3e361 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
@@ -31,7 +31,7 @@ class SetOperationSuite extends PlanTest {
val batches =
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
- Batch("Union Pushdown", Once,
+ Batch("Union Pushdown", FixedPoint(5),
CombineUnions,
PushProjectionThroughUnion,
PushDownPredicate,
@@ -44,8 +44,8 @@ class SetOperationSuite extends PlanTest {
val testUnion = Union(testRelation :: testRelation2 :: testRelation3 :: Nil)
test("union: combine unions into one unions") {
- val unionQuery1 = Union(Union(testRelation, testRelation2), testRelation)
- val unionQuery2 = Union(testRelation, Union(testRelation2, testRelation))
+ val unionQuery1 = Union(Union(testRelation, testRelation2), testRelation3)
+ val unionQuery2 = Union(testRelation, Union(testRelation2, testRelation3))
val unionOptimized1 = Optimize.execute(unionQuery1.analyze)
val unionOptimized2 = Optimize.execute(unionQuery2.analyze)
@@ -93,7 +93,7 @@ class SetOperationSuite extends PlanTest {
val unionQuery1 = Distinct(Union(Distinct(Union(query1, query2)), query3)).analyze
val optimized1 = Optimize.execute(unionQuery1)
val distinctUnionCorrectAnswer1 =
- Distinct(Union(query1 :: query2 :: query3 :: Nil)).analyze
+ Distinct(Union(query1 :: query2 :: query3 :: Nil))
comparePlans(distinctUnionCorrectAnswer1, optimized1)
// query1
@@ -107,7 +107,7 @@ class SetOperationSuite extends PlanTest {
Distinct(Union(query2, query3)))).analyze
val optimized2 = Optimize.execute(unionQuery2)
val distinctUnionCorrectAnswer2 =
- Distinct(Union(query1 :: query2 :: query2 :: query3 :: Nil)).analyze
+ Distinct(Union(query1 :: query2 :: query2 :: query3 :: Nil))
comparePlans(distinctUnionCorrectAnswer2, optimized2)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org