You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/05/01 03:19:00 UTC

spark git commit: [SPARK-7109] [SQL] Push down left side filter for left semi join

Repository: spark
Updated Branches:
  refs/heads/master 079733817 -> a0d8a61ab


[SPARK-7109] [SQL] Push down left side filter for left semi join

Now in spark sql optimizer we only push down right side filter for left semi join, actually we can push down left side filter because left semi join is doing filter on left table essentially.

Author: wangfei <wa...@huawei.com>
Author: scwf <wa...@huawei.com>

Closes #5677 from scwf/leftsemi and squashes the following commits:

483d205 [wangfei] update with master to fix compile issue
82df0e1 [wangfei] Merge branch 'master' of https://github.com/apache/spark into leftsemi
d68a053 [wangfei] added apply
8f48a3d [scwf] added test
ebadaa9 [wangfei] left filter push down for left semi join


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0d8a61a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0d8a61a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0d8a61a

Branch: refs/heads/master
Commit: a0d8a61ab198b8c0ddbb3072bbe1d0e1dabc3e45
Parents: 0797338
Author: wangfei <wa...@huawei.com>
Authored: Thu Apr 30 18:18:54 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Apr 30 18:18:54 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  8 ++++----
 .../optimizer/FilterPushdownSuite.scala         | 21 +++++++++++++++++++-
 2 files changed, 24 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a0d8a61a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 63669e9..709f7d6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -571,7 +571,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
         split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
       joinType match {
-        case Inner =>
+        case _ @ (Inner | LeftSemi) =>
           // push down the single side only join filter for both sides sub queries
           val newLeft = leftJoinConditions.
             reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -579,7 +579,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
             reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
           val newJoinCond = commonJoinCondition.reduceLeftOption(And)
 
-          Join(newLeft, newRight, Inner, newJoinCond)
+          Join(newLeft, newRight, joinType, newJoinCond)
         case RightOuter =>
           // push down the left side only join filter for left side sub query
           val newLeft = leftJoinConditions.
@@ -588,14 +588,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
           val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
 
           Join(newLeft, newRight, RightOuter, newJoinCond)
-        case _ @ (LeftOuter | LeftSemi) =>
+        case LeftOuter =>
           // push down the right side only join filter for right sub query
           val newLeft = left
           val newRight = rightJoinConditions.
             reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
           val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
 
-          Join(newLeft, newRight, joinType, newJoinCond)
+          Join(newLeft, newRight, LeftOuter, newJoinCond)
         case FullOuter => f
       }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a0d8a61a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 2ad7394..58d415d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis
 import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.expressions.{Count, Explode}
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -43,6 +43,8 @@ class FilterPushdownSuite extends PlanTest {
 
   val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
 
+  val testRelation1 = LocalRelation('d.int)
+
   // This test already passes.
   test("eliminate subqueries") {
     val originalQuery =
@@ -213,6 +215,23 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("joins: push down left semi join") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+
+    val originalQuery = {
+      x.join(y, LeftSemi, Option("x.a".attr === "y.d".attr && "x.b".attr >= 1 && "y.d".attr >= 2))
+    }
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val left = testRelation.where('b >= 1)
+    val right = testRelation1.where('d >= 2)
+    val correctAnswer =
+      left.join(right, LeftSemi, Option("a".attr === "d".attr)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("joins: push down left outer join #1") {
     val x = testRelation.subquery('x)
     val y = testRelation.subquery('y)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org