You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/11/29 17:17:46 UTC

spark git commit: [SPARK-22615][SQL] Handle more cases in PropagateEmptyRelation

Repository: spark
Updated Branches:
  refs/heads/master 20b239845 -> 57687280d


[SPARK-22615][SQL] Handle more cases in PropagateEmptyRelation

## What changes were proposed in this pull request?

Currently, in the optimize rule `PropagateEmptyRelation`, the following cases is not handled:
1.  empty relation as right child in left outer join
2. empty relation as left child in right outer join
3. empty relation as right child  in left semi join
4. empty relation as right child  in left anti join
5. only one empty relation in full outer join

case 1 / 2 / 5 can be treated as **Cartesian product** and cause exception. See the new test cases.

## How was this patch tested?
Unit test

Author: Wang Gengliang <lt...@gmail.com>

Closes #19825 from gengliangwang/SPARK-22615.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57687280
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57687280
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57687280

Branch: refs/heads/master
Commit: 57687280d4171db98d4d9404c7bd3374f51deac0
Parents: 20b2398
Author: Wang Gengliang <lt...@gmail.com>
Authored: Wed Nov 29 09:17:39 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Nov 29 09:17:39 2017 -0800

----------------------------------------------------------------------
 .../optimizer/PropagateEmptyRelation.scala      |  36 +++-
 .../optimizer/PropagateEmptyRelationSuite.scala |  16 +-
 .../sql-tests/inputs/join-empty-relation.sql    |  28 +++
 .../results/join-empty-relation.sql.out         | 194 +++++++++++++++++++
 4 files changed, 257 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/57687280/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
index 52fbb4d..a6e5aa6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala
@@ -41,6 +41,10 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
   private def empty(plan: LogicalPlan) =
     LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming)
 
+  // Construct a project list from plan's output, while the value is always NULL.
+  private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
+    plan.output.map{ a => Alias(Literal(null), a.name)(a.exprId) }
+
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case p: Union if p.children.forall(isEmptyLocalRelation) =>
       empty(p)
@@ -49,16 +53,28 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
     // as stateful streaming joins need to perform other state management operations other than
     // just processing the input data.
     case p @ Join(_, _, joinType, _)
-        if !p.children.exists(_.isStreaming) && p.children.exists(isEmptyLocalRelation) =>
-      joinType match {
-        case _: InnerLike => empty(p)
-        // Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
-        // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
-        case LeftOuter | LeftSemi | LeftAnti if isEmptyLocalRelation(p.left) => empty(p)
-        case RightOuter if isEmptyLocalRelation(p.right) => empty(p)
-        case FullOuter if p.children.forall(isEmptyLocalRelation) => empty(p)
-        case _ => p
-    }
+        if !p.children.exists(_.isStreaming) =>
+      val isLeftEmpty = isEmptyLocalRelation(p.left)
+      val isRightEmpty = isEmptyLocalRelation(p.right)
+      if (isLeftEmpty || isRightEmpty) {
+        joinType match {
+          case _: InnerLike => empty(p)
+          // Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule.
+          // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule.
+          case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p)
+          case LeftSemi if isRightEmpty => empty(p)
+          case LeftAnti if isRightEmpty => p.left
+          case FullOuter if isLeftEmpty && isRightEmpty => empty(p)
+          case LeftOuter | FullOuter if isRightEmpty =>
+            Project(p.left.output ++ nullValueProjectList(p.right), p.left)
+          case RightOuter if isRightEmpty => empty(p)
+          case RightOuter | FullOuter if isLeftEmpty =>
+            Project(nullValueProjectList(p.left) ++ p.right.output, p.right)
+          case _ => p
+        }
+      } else {
+        p
+      }
 
     case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match {
       case _: Project => empty(p)

http://git-wip-us.apache.org/repos/asf/spark/blob/57687280/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index bc1c48b..3964508 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -21,8 +21,9 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.types.StructType
 
@@ -78,17 +79,18 @@ class PropagateEmptyRelationSuite extends PlanTest {
 
       (true, false, Inner, Some(LocalRelation('a.int, 'b.int))),
       (true, false, Cross, Some(LocalRelation('a.int, 'b.int))),
-      (true, false, LeftOuter, None),
+      (true, false, LeftOuter, Some(Project(Seq('a, Literal(null).as('b)), testRelation1).analyze)),
       (true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
-      (true, false, FullOuter, None),
-      (true, false, LeftAnti, None),
-      (true, false, LeftSemi, None),
+      (true, false, FullOuter, Some(Project(Seq('a, Literal(null).as('b)), testRelation1).analyze)),
+      (true, false, LeftAnti, Some(testRelation1)),
+      (true, false, LeftSemi, Some(LocalRelation('a.int))),
 
       (false, true, Inner, Some(LocalRelation('a.int, 'b.int))),
       (false, true, Cross, Some(LocalRelation('a.int, 'b.int))),
       (false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
-      (false, true, RightOuter, None),
-      (false, true, FullOuter, None),
+      (false, true, RightOuter,
+        Some(Project(Seq(Literal(null).as('a), 'b), testRelation2).analyze)),
+      (false, true, FullOuter, Some(Project(Seq(Literal(null).as('a), 'b), testRelation2).analyze)),
       (false, true, LeftAnti, Some(LocalRelation('a.int))),
       (false, true, LeftSemi, Some(LocalRelation('a.int))),
 

http://git-wip-us.apache.org/repos/asf/spark/blob/57687280/sql/core/src/test/resources/sql-tests/inputs/join-empty-relation.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/join-empty-relation.sql b/sql/core/src/test/resources/sql-tests/inputs/join-empty-relation.sql
new file mode 100644
index 0000000..8afa327
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/join-empty-relation.sql
@@ -0,0 +1,28 @@
+CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1) AS GROUPING(a);
+CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1) AS GROUPING(a);
+
+CREATE TEMPORARY VIEW empty_table as SELECT a FROM t2 WHERE false;
+
+SELECT * FROM t1 INNER JOIN empty_table;
+SELECT * FROM t1 CROSS JOIN empty_table;
+SELECT * FROM t1 LEFT OUTER JOIN empty_table;
+SELECT * FROM t1 RIGHT OUTER JOIN empty_table;
+SELECT * FROM t1 FULL OUTER JOIN empty_table;
+SELECT * FROM t1 LEFT SEMI JOIN empty_table;
+SELECT * FROM t1 LEFT ANTI JOIN empty_table;
+
+SELECT * FROM empty_table INNER JOIN t1;
+SELECT * FROM empty_table CROSS JOIN t1;
+SELECT * FROM empty_table LEFT OUTER JOIN t1;
+SELECT * FROM empty_table RIGHT OUTER JOIN t1;
+SELECT * FROM empty_table FULL OUTER JOIN t1;
+SELECT * FROM empty_table LEFT SEMI JOIN t1;
+SELECT * FROM empty_table LEFT ANTI JOIN t1;
+
+SELECT * FROM empty_table INNER JOIN empty_table;
+SELECT * FROM empty_table CROSS JOIN empty_table;
+SELECT * FROM empty_table LEFT OUTER JOIN empty_table;
+SELECT * FROM empty_table RIGHT OUTER JOIN empty_table;
+SELECT * FROM empty_table FULL OUTER JOIN empty_table;
+SELECT * FROM empty_table LEFT SEMI JOIN empty_table;
+SELECT * FROM empty_table LEFT ANTI JOIN empty_table;

http://git-wip-us.apache.org/repos/asf/spark/blob/57687280/sql/core/src/test/resources/sql-tests/results/join-empty-relation.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/join-empty-relation.sql.out b/sql/core/src/test/resources/sql-tests/results/join-empty-relation.sql.out
new file mode 100644
index 0000000..857073a
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/join-empty-relation.sql.out
@@ -0,0 +1,194 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 24
+
+
+-- !query 0
+CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1) AS GROUPING(a)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1) AS GROUPING(a)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+CREATE TEMPORARY VIEW empty_table as SELECT a FROM t2 WHERE false
+-- !query 2 schema
+struct<>
+-- !query 2 output
+
+
+
+-- !query 3
+SELECT * FROM t1 INNER JOIN empty_table
+-- !query 3 schema
+struct<a:int,a:int>
+-- !query 3 output
+
+
+
+-- !query 4
+SELECT * FROM t1 CROSS JOIN empty_table
+-- !query 4 schema
+struct<a:int,a:int>
+-- !query 4 output
+
+
+
+-- !query 5
+SELECT * FROM t1 LEFT OUTER JOIN empty_table
+-- !query 5 schema
+struct<a:int,a:int>
+-- !query 5 output
+1	NULL
+
+
+-- !query 6
+SELECT * FROM t1 RIGHT OUTER JOIN empty_table
+-- !query 6 schema
+struct<a:int,a:int>
+-- !query 6 output
+
+
+
+-- !query 7
+SELECT * FROM t1 FULL OUTER JOIN empty_table
+-- !query 7 schema
+struct<a:int,a:int>
+-- !query 7 output
+1	NULL
+
+
+-- !query 8
+SELECT * FROM t1 LEFT SEMI JOIN empty_table
+-- !query 8 schema
+struct<a:int>
+-- !query 8 output
+
+
+
+-- !query 9
+SELECT * FROM t1 LEFT ANTI JOIN empty_table
+-- !query 9 schema
+struct<a:int>
+-- !query 9 output
+1
+
+
+-- !query 10
+SELECT * FROM empty_table INNER JOIN t1
+-- !query 10 schema
+struct<a:int,a:int>
+-- !query 10 output
+
+
+
+-- !query 11
+SELECT * FROM empty_table CROSS JOIN t1
+-- !query 11 schema
+struct<a:int,a:int>
+-- !query 11 output
+
+
+
+-- !query 12
+SELECT * FROM empty_table LEFT OUTER JOIN t1
+-- !query 12 schema
+struct<a:int,a:int>
+-- !query 12 output
+
+
+
+-- !query 13
+SELECT * FROM empty_table RIGHT OUTER JOIN t1
+-- !query 13 schema
+struct<a:int,a:int>
+-- !query 13 output
+NULL	1
+
+
+-- !query 14
+SELECT * FROM empty_table FULL OUTER JOIN t1
+-- !query 14 schema
+struct<a:int,a:int>
+-- !query 14 output
+NULL	1
+
+
+-- !query 15
+SELECT * FROM empty_table LEFT SEMI JOIN t1
+-- !query 15 schema
+struct<a:int>
+-- !query 15 output
+
+
+
+-- !query 16
+SELECT * FROM empty_table LEFT ANTI JOIN t1
+-- !query 16 schema
+struct<a:int>
+-- !query 16 output
+
+
+
+-- !query 17
+SELECT * FROM empty_table INNER JOIN empty_table
+-- !query 17 schema
+struct<a:int,a:int>
+-- !query 17 output
+
+
+
+-- !query 18
+SELECT * FROM empty_table CROSS JOIN empty_table
+-- !query 18 schema
+struct<a:int,a:int>
+-- !query 18 output
+
+
+
+-- !query 19
+SELECT * FROM empty_table LEFT OUTER JOIN empty_table
+-- !query 19 schema
+struct<a:int,a:int>
+-- !query 19 output
+
+
+
+-- !query 20
+SELECT * FROM empty_table RIGHT OUTER JOIN empty_table
+-- !query 20 schema
+struct<a:int,a:int>
+-- !query 20 output
+
+
+
+-- !query 21
+SELECT * FROM empty_table FULL OUTER JOIN empty_table
+-- !query 21 schema
+struct<a:int,a:int>
+-- !query 21 output
+
+
+
+-- !query 22
+SELECT * FROM empty_table LEFT SEMI JOIN empty_table
+-- !query 22 schema
+struct<a:int>
+-- !query 22 output
+
+
+
+-- !query 23
+SELECT * FROM empty_table LEFT ANTI JOIN empty_table
+-- !query 23 schema
+struct<a:int>
+-- !query 23 output
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org