You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/02/10 02:45:20 UTC
spark git commit: [SPARK-12476][SQL] Implement
JdbcRelation#unhandledFilters for removing unnecessary Spark Filter
Repository: spark
Updated Branches:
refs/heads/master 9267bc68f -> 6f710f9fd
[SPARK-12476][SQL] Implement JdbcRelation#unhandledFilters for removing unnecessary Spark Filter
Input: SELECT * FROM jdbcTable WHERE col0 = 'xxx'
Current plan:
```
== Optimized Logical Plan ==
Project [col0#0,col1#1]
+- Filter (col0#0 = xxx)
+- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})
== Physical Plan ==
+- Filter (col0#0 = xxx)
+- Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)]
```
This patch enables a plan below;
```
== Optimized Logical Plan ==
Project [col0#0,col1#1]
+- Filter (col0#0 = xxx)
+- Relation[col0#0,col1#1] JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})
== Physical Plan ==
Scan JDBCRelation(jdbc:postgresql:postgres,testRel,[Lorg.apache.spark.Partition;2ac7c683,{user=maropu, password=, driver=org.postgresql.Driver})[col0#0,col1#1] PushedFilters: [EqualTo(col0,xxx)]
```
Author: Takeshi YAMAMURO <li...@gmail.com>
Closes #10427 from maropu/RemoveFilterInJdbcScan.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f710f9f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f710f9f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f710f9f
Branch: refs/heads/master
Commit: 6f710f9fd4f85370557b7705020ff16f2385e645
Parents: 9267bc6
Author: Takeshi YAMAMURO <li...@gmail.com>
Authored: Wed Feb 10 09:45:13 2016 +0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Feb 10 09:45:13 2016 +0800
----------------------------------------------------------------------
.../execution/datasources/jdbc/JDBCRDD.scala | 2 +-
.../datasources/jdbc/JDBCRelation.scala | 5 ++
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 70 ++++++++++++++------
3 files changed, 56 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6f710f9f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index d867e14..befba86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -189,7 +189,7 @@ private[sql] object JDBCRDD extends Logging {
* Turns a single Filter into a String representing a SQL expression.
* Returns None for an unhandled filter.
*/
- private def compileFilter(f: Filter): Option[String] = {
+ private[jdbc] def compileFilter(f: Filter): Option[String] = {
Option(f match {
case EqualTo(attr, value) => s"$attr = ${compileValue(value)}"
case EqualNullSafe(attr, value) =>
http://git-wip-us.apache.org/repos/asf/spark/blob/6f710f9f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 572be82..ee6373d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -90,6 +90,11 @@ private[sql] case class JDBCRelation(
override val schema: StructType = JDBCRDD.resolveTable(url, table, properties)
+ // Check if JDBCRDD.compileFilter can accept input filters
+ override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
+ filters.filter(JDBCRDD.compileFilter(_).isEmpty)
+ }
+
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
// Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
JDBCRDD.scanTable(
http://git-wip-us.apache.org/repos/asf/spark/blob/6f710f9f/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 5186075..7a0f7ab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -22,12 +22,12 @@ import java.sql.{Date, DriverManager, Timestamp}
import java.util.{Calendar, GregorianCalendar, Properties}
import org.h2.jdbc.JdbcSQLException
-import org.scalatest.BeforeAndAfter
-import org.scalatest.PrivateMethodTester
+import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.ExplainCommand
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.execution.PhysicalRDD
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
import org.apache.spark.sql.sources._
@@ -183,26 +183,34 @@ class JDBCSuite extends SparkFunSuite
}
test("SELECT * WHERE (simple predicates)") {
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')"))
+ def checkPushdown(df: DataFrame): DataFrame = {
+ val parentPlan = df.queryExecution.executedPlan
+ // Check if SparkPlan Filter is removed in a physical plan and
+ // the plan only has PhysicalRDD to scan JDBCRelation.
+ assert(parentPlan.isInstanceOf[PhysicalRDD])
+ assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
+ df
+ }
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME = 'fred'")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME <=> 'fred'")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME > 'fred'")).collect().size == 2)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME != 'fred'")).collect().size == 2)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME IN ('mary', 'fred')"))
.collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')"))
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME NOT IN ('fred')"))
.collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'"))
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary'"))
.collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' "
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1 OR NAME = 'mary' "
+ "AND THEID = 2")).collect().size == 2)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1)
- assert(stripSparkFilter(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE 'fr%'")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%ed'")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM foobar WHERE NAME LIKE '%re%'")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NULL")).collect().size == 1)
+ assert(checkPushdown(sql("SELECT * FROM nulltypes WHERE A IS NOT NULL")).collect().size == 0)
// This is a test to reflect discussion in SPARK-12218.
// The older versions of spark have this kind of bugs in parquet data source.
@@ -210,6 +218,28 @@ class JDBCSuite extends SparkFunSuite
val df2 = sql("SELECT * FROM foobar WHERE NOT (THEID != 2) OR NOT (NAME != 'mary')")
assert(df1.collect.toSet === Set(Row("mary", 2)))
assert(df2.collect.toSet === Set(Row("mary", 2)))
+
+ def checkNotPushdown(df: DataFrame): DataFrame = {
+ val parentPlan = df.queryExecution.executedPlan
+ // Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
+ // cannot compile given predicates.
+ assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
+ val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
+ assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.Filter])
+ df
+ }
+ assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0)
+ assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2)
+ }
+
+ test("SELECT COUNT(1) WHERE (predicates)") {
+ // Check if an answer is correct when Filter is removed from operations such as count() which
+ // does not require any columns. In some data sources, e.g., Parquet, `requiredColumns` in
+ // org.apache.spark.sql.sources.interfaces is not given in logical plans, but some filters
+ // are applied for columns with Filter producing wrong results. On the other hand, JDBCRDD
+ // correctly handles this case by assigning `requiredColumns` properly. See PR 10427 for more
+ // discussions.
+ assert(sql("SELECT COUNT(1) FROM foobar WHERE NAME = 'mary'").collect.toSet === Set(Row(1)))
}
test("SELECT * WHERE (quoted strings)") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org