You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/08/19 13:11:43 UTC

spark git commit: [SPARK-16994][SQL] Whitelist operators for predicate pushdown

Repository: spark
Updated Branches:
  refs/heads/master 072acf5e1 -> 67e59d464


[SPARK-16994][SQL] Whitelist operators for predicate pushdown

## What changes were proposed in this pull request?
This patch changes predicate pushdown optimization rule (PushDownPredicate) from using a blacklist to a whitelist. That is to say, operators must be explicitly allowed. This approach is more future-proof: previously it was possible for us to introduce a new operator and then render the optimization rule incorrect.

This also fixes the bug that previously we allowed pushing filter beneath limit, which was incorrect. That is to say, before this patch, the optimizer would rewrite
```
select * from (select * from range(10) limit 5) where id > 3

to

select * from range(10) where id > 3 limit 5
```

## How was this patch tested?
- a unit test case in FilterPushdownSuite
- an end-to-end test in limit.sql

Author: Reynold Xin <rx...@databricks.com>

Closes #14713 from rxin/SPARK-16994.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67e59d46
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67e59d46
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67e59d46

Branch: refs/heads/master
Commit: 67e59d464f782ff5f509234212aa072a7653d7bf
Parents: 072acf5
Author: Reynold Xin <rx...@databricks.com>
Authored: Fri Aug 19 21:11:35 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Aug 19 21:11:35 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      | 23 +++++++++++++++-----
 .../optimizer/FilterPushdownSuite.scala         |  6 +++++
 .../test/resources/sql-tests/inputs/limit.sql   |  3 +++
 .../resources/sql-tests/results/limit.sql.out   | 10 ++++++++-
 4 files changed, 35 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/67e59d46/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f7aa6da..ce57f05 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1208,17 +1208,28 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
         filter
       }
 
-    // two filters should be combine together by other rules
-    case filter @ Filter(_, _: Filter) => filter
-    // should not push predicates through sample, or will generate different results.
-    case filter @ Filter(_, _: Sample) => filter
-
-    case filter @ Filter(condition, u: UnaryNode) if u.expressions.forall(_.deterministic) =>
+    case filter @ Filter(condition, u: UnaryNode)
+        if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
       pushDownPredicate(filter, u.child) { predicate =>
         u.withNewChildren(Seq(Filter(predicate, u.child)))
       }
   }
 
+  private def canPushThrough(p: UnaryNode): Boolean = p match {
+    // Note that some operators (e.g. project, aggregate, union) are being handled separately
+    // (earlier in this rule).
+    case _: AppendColumns => true
+    case _: BroadcastHint => true
+    case _: Distinct => true
+    case _: Generate => true
+    case _: Pivot => true
+    case _: RedistributeData => true
+    case _: Repartition => true
+    case _: ScriptTransformation => true
+    case _: Sort => true
+    case _ => false
+  }
+
   private def pushDownPredicate(
       filter: Filter,
       grandchild: LogicalPlan)(insertFilter: Expression => LogicalPlan): LogicalPlan = {

http://git-wip-us.apache.org/repos/asf/spark/blob/67e59d46/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 596b8fc..9f25e9d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -111,6 +111,12 @@ class FilterPushdownSuite extends PlanTest {
     assert(optimized == correctAnswer)
   }
 
+  test("SPARK-16994: filter should not be pushed through limit") {
+    val originalQuery = testRelation.limit(10).where('a === 1).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, originalQuery)
+  }
+
   test("can't push without rewrite") {
     val originalQuery =
       testRelation

http://git-wip-us.apache.org/repos/asf/spark/blob/67e59d46/sql/core/src/test/resources/sql-tests/inputs/limit.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/limit.sql b/sql/core/src/test/resources/sql-tests/inputs/limit.sql
index 892a1bb..2ea35f7 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/limit.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/limit.sql
@@ -18,3 +18,6 @@ select * from testdata limit key > 3;
 -- limit must be integer
 select * from testdata limit true;
 select * from testdata limit 'a';
+
+-- limit within a subquery
+select * from (select * from range(10) limit 5) where id > 3;

http://git-wip-us.apache.org/repos/asf/spark/blob/67e59d46/sql/core/src/test/resources/sql-tests/results/limit.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/limit.sql.out b/sql/core/src/test/resources/sql-tests/results/limit.sql.out
index b71b058..cb4e4d0 100644
--- a/sql/core/src/test/resources/sql-tests/results/limit.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/limit.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 9
+-- Number of queries: 10
 
 
 -- !query 0
@@ -81,3 +81,11 @@ struct<>
 -- !query 8 output
 org.apache.spark.sql.AnalysisException
 The limit expression must be integer type, but got string;
+
+
+-- !query 9
+select * from (select * from range(10) limit 5) where id > 3
+-- !query 9 schema
+struct<id:bigint>
+-- !query 9 output
+4


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org