You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/29 21:05:59 UTC

[spark] branch master updated: [SPARK-27314][SQL] Deduplicate exprIds for Union.

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f176dd3  [SPARK-27314][SQL] Deduplicate exprIds for Union.
f176dd3 is described below

commit f176dd3f2827bc2d0ad18bccb5e63414e3fa5ffe
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Fri Mar 29 14:05:38 2019 -0700

    [SPARK-27314][SQL] Deduplicate exprIds for Union.
    
    ## What changes were proposed in this pull request?
    
    We have been having a potential problem with `Union` when the children have the same expression id in their outputs, which happens when self-union.
    
    ## How was this patch tested?
    
    Modified some tests to adjust plan changes.
    
    Closes #24236 from ueshin/issues/SPARK-27314/dedup_union.
    
    Authored-by: Takuya UESHIN <ue...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala    | 16 ++++++++++++++++
 .../catalyst/plans/logical/basicLogicalOperators.scala   |  5 +++++
 .../catalyst/optimizer/FoldablePropagationSuite.scala    |  4 ++--
 .../catalyst/optimizer/NestedColumnAliasingSuite.scala   | 10 +++++++---
 .../catalyst/optimizer/OptimizerRuleExclusionSuite.scala |  6 +++++-
 .../spark/sql/catalyst/optimizer/SetOperationSuite.scala | 10 +++++-----
 6 files changed, 40 insertions(+), 11 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e216234..01e40e6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -956,6 +956,22 @@ class Analyzer(
         i.copy(right = dedupRight(left, right))
       case e @ Except(left, right, _) if !e.duplicateResolved =>
         e.copy(right = dedupRight(left, right))
+      case u @ Union(children) if !u.duplicateResolved =>
+        // Use projection-based de-duplication for Union to avoid breaking the checkpoint sharing
+        // feature in streaming.
+        val newChildren = children.foldRight(Seq.empty[LogicalPlan]) { (head, tail) =>
+          head +: tail.map {
+            case child if head.outputSet.intersect(child.outputSet).isEmpty =>
+              child
+            case child =>
+              val projectList = child.output.map { attr =>
+                Alias(attr, attr.name)()
+              }
+              Project(projectList, child)
+          }
+        }
+        u.copy(children = newChildren)
+
       // When resolve `SortOrder`s in Sort based on child, don't report errors as
       // we still have chance to resolve it based on its descendants
       case s @ Sort(ordering, global, child) if child.resolved && !s.resolved =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index f7f701c..f4cc79c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -229,6 +229,11 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
     }
   }
 
+  def duplicateResolved: Boolean = {
+    children.map(_.outputSet.size).sum ==
+      AttributeSet.fromAttributeSets(children.map(_.outputSet)).size
+  }
+
   // updating nullability to make all the children consistent
   override def output: Seq[Attribute] =
     children.map(_.output).transpose.map(attrs =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
index c288446..0d48ecb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala
@@ -132,10 +132,10 @@ class FoldablePropagationSuite extends PlanTest {
 
   test("Propagate in inner join") {
     val ta = testRelation.select('a, Literal(1).as('tag))
-      .union(testRelation.select('a, Literal(2).as('tag)))
+      .union(testRelation.select('a.as('a), Literal(2).as('tag)))
       .subquery('ta)
     val tb = testRelation.select('a, Literal(1).as('tag))
-      .union(testRelation.select('a, Literal(2).as('tag)))
+      .union(testRelation.select('a.as('a), Literal(2).as('tag)))
       .subquery('tb)
     val query = ta.join(tb, Inner,
       Some("ta.a".attr === "tb.a".attr && "ta.tag".attr === "tb.tag".attr))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
index 87f7d19..abb340e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasingSuite.scala
@@ -140,7 +140,7 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
   }
 
   test("Pushing a single nested field projection - negative") {
-    val ops = Array(
+    val ops = Seq(
       (input: LogicalPlan) => input.distribute('name)(1),
       (input: LogicalPlan) => input.distribute($"name.middle")(1),
       (input: LogicalPlan) => input.orderBy('name.asc),
@@ -156,11 +156,15 @@ class NestedColumnAliasingSuite extends SchemaPruningTest {
         .analyze
     }
 
-    val optimizedQueries = queries.map(Optimize.execute)
-    val expectedQueries = queries
+    val optimizedQueries :+ optimizedUnion = queries.map(Optimize.execute)
+    val expectedQueries = queries.init
     optimizedQueries.zip(expectedQueries).foreach { case (optimized, expected) =>
       comparePlans(optimized, expected)
     }
+    val expectedUnion =
+      contact.select('name).union(contact.select('name.as('name)))
+        .select(GetStructField('name, 1, Some("middle"))).analyze
+    comparePlans(optimizedUnion, expectedUnion)
   }
 
   test("Pushing a single nested field projection through filters - negative") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
index 4fa4a7a..7587776 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
@@ -121,10 +121,14 @@ class OptimizerRuleExclusionSuite extends PlanTest {
       PropagateEmptyRelation.ruleName,
       CombineUnions.ruleName)
 
+    val testRelation1 = LocalRelation('a.int, 'b.int, 'c.int)
+    val testRelation2 = LocalRelation('a.int, 'b.int, 'c.int)
+    val testRelation3 = LocalRelation('a.int, 'b.int, 'c.int)
+
     withSQLConf(
       OPTIMIZER_EXCLUDED_RULES.key -> excludedRules.foldLeft("")((l, r) => l + "," + r)) {
       val optimizer = new SimpleTestOptimizer()
-      val originalQuery = testRelation.union(testRelation.union(testRelation)).analyze
+      val originalQuery = testRelation1.union(testRelation2.union(testRelation3)).analyze
       val optimized = optimizer.execute(originalQuery)
       comparePlans(originalQuery, optimized)
     }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
index 17e00c9..3d3e361 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
@@ -31,7 +31,7 @@ class SetOperationSuite extends PlanTest {
     val batches =
       Batch("Subqueries", Once,
         EliminateSubqueryAliases) ::
-      Batch("Union Pushdown", Once,
+      Batch("Union Pushdown", FixedPoint(5),
         CombineUnions,
         PushProjectionThroughUnion,
         PushDownPredicate,
@@ -44,8 +44,8 @@ class SetOperationSuite extends PlanTest {
   val testUnion = Union(testRelation :: testRelation2 :: testRelation3 :: Nil)
 
   test("union: combine unions into one unions") {
-    val unionQuery1 = Union(Union(testRelation, testRelation2), testRelation)
-    val unionQuery2 = Union(testRelation, Union(testRelation2, testRelation))
+    val unionQuery1 = Union(Union(testRelation, testRelation2), testRelation3)
+    val unionQuery2 = Union(testRelation, Union(testRelation2, testRelation3))
     val unionOptimized1 = Optimize.execute(unionQuery1.analyze)
     val unionOptimized2 = Optimize.execute(unionQuery2.analyze)
 
@@ -93,7 +93,7 @@ class SetOperationSuite extends PlanTest {
     val unionQuery1 = Distinct(Union(Distinct(Union(query1, query2)), query3)).analyze
     val optimized1 = Optimize.execute(unionQuery1)
     val distinctUnionCorrectAnswer1 =
-      Distinct(Union(query1 :: query2 :: query3 :: Nil)).analyze
+      Distinct(Union(query1 :: query2 :: query3 :: Nil))
     comparePlans(distinctUnionCorrectAnswer1, optimized1)
 
     //         query1
@@ -107,7 +107,7 @@ class SetOperationSuite extends PlanTest {
       Distinct(Union(query2, query3)))).analyze
     val optimized2 = Optimize.execute(unionQuery2)
     val distinctUnionCorrectAnswer2 =
-      Distinct(Union(query1 :: query2 :: query2 :: query3 :: Nil)).analyze
+      Distinct(Union(query1 :: query2 :: query2 :: query3 :: Nil))
     comparePlans(distinctUnionCorrectAnswer2, optimized2)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org