You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/27 06:47:36 UTC

spark git commit: [SPARK-24288][SQL] Add a JDBC Option to enable preventing predicate pushdown

Repository: spark
Updated Branches:
  refs/heads/master e6e9031d7 -> 21fcac164


[SPARK-24288][SQL] Add a JDBC Option to enable preventing predicate pushdown

## What changes were proposed in this pull request?

Add a JDBC Option "pushDownPredicate" (default `true`) to allow/disallow predicate push-down in JDBC data source.

## How was this patch tested?

Add a test in `JDBCSuite`

Author: maryannxue <ma...@apache.org>

Closes #21875 from maryannxue/spark-24288.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21fcac16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21fcac16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21fcac16

Branch: refs/heads/master
Commit: 21fcac1645bf01c453ddd4cb64c566895e66ea4f
Parents: e6e9031
Author: maryannxue <ma...@apache.org>
Authored: Thu Jul 26 23:47:32 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Thu Jul 26 23:47:32 2018 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  7 +++
 .../datasources/jdbc/JDBCOptions.scala          |  4 ++
 .../datasources/jdbc/JDBCRelation.scala         |  6 +-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 66 ++++++++++++++------
 4 files changed, 63 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/21fcac16/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index e815e5b..4b013c6 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1435,6 +1435,13 @@ the following case-insensitive options:
      The custom schema to use for reading data from JDBC connectors. For example, <code>"id DECIMAL(38, 0), name STRING"</code>. You can also specify partial fields, and the others use the default type mapping. For example, <code>"id DECIMAL(38, 0)"</code>. The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
     </td>
   </tr>
+
+  <tr>
+    <td><code>pushDownPredicate</code></td>
+    <td>
+     The option to enable or disable predicate push-down into the JDBC data source. The default value is true, in which case Spark will push down filters to the JDBC data source as much as possible. Otherwise, if set to false, no filter will be pushed down to the JDBC data source and thus all filters will be handled by Spark. Predicate push-down is usually turned off when the predicate filtering is performed faster by Spark than by the JDBC data source.
+    </td>
+  </tr>
 </table>
 
 <div class="codetabs">

http://git-wip-us.apache.org/repos/asf/spark/blob/21fcac16/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index 574aed4..d80efce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -183,6 +183,9 @@ class JDBCOptions(
     }
   // An option to execute custom SQL before fetching data from the remote DB
   val sessionInitStatement = parameters.get(JDBC_SESSION_INIT_STATEMENT)
+
+  // An option to allow/disallow pushing down predicate into JDBC data source
+  val pushDownPredicate = parameters.getOrElse(JDBC_PUSHDOWN_PREDICATE, "true").toBoolean
 }
 
 class JdbcOptionsInWrite(
@@ -234,4 +237,5 @@ object JDBCOptions {
   val JDBC_BATCH_INSERT_SIZE = newOption("batchsize")
   val JDBC_TXN_ISOLATION_LEVEL = newOption("isolationLevel")
   val JDBC_SESSION_INIT_STATEMENT = newOption("sessionInitStatement")
+  val JDBC_PUSHDOWN_PREDICATE = newOption("pushDownPredicate")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/21fcac16/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 97e2d25..4f78f59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -172,7 +172,11 @@ private[sql] case class JDBCRelation(
 
   // Check if JDBCRDD.compileFilter can accept input filters
   override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
-    filters.filter(JDBCRDD.compileFilter(_, JdbcDialects.get(jdbcOptions.url)).isEmpty)
+    if (jdbcOptions.pushDownPredicate) {
+      filters.filter(JDBCRDD.compileFilter(_, JdbcDialects.get(jdbcOptions.url)).isEmpty)
+    } else {
+      filters
+    }
   }
 
   override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/21fcac16/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 09facb9..0edbd3a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -261,21 +261,32 @@ class JDBCSuite extends QueryTest
       s"Expecting a JDBCRelation with $expectedNumPartitions partitions, but got:`$jdbcRelations`")
   }
 
+  private def checkPushdown(df: DataFrame): DataFrame = {
+    val parentPlan = df.queryExecution.executedPlan
+    // Check if SparkPlan Filter is removed in a physical plan and
+    // the plan only has PhysicalRDD to scan JDBCRelation.
+    assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+    val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
+    assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
+    assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
+    df
+  }
+
+  private def checkNotPushdown(df: DataFrame): DataFrame = {
+    val parentPlan = df.queryExecution.executedPlan
+    // Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
+    // cannot compile given predicates.
+    assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+    val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
+    assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
+    df
+  }
+
   test("SELECT *") {
     assert(sql("SELECT * FROM foobar").collect().size === 3)
   }
 
   test("SELECT * WHERE (simple predicates)") {
-    def checkPushdown(df: DataFrame): DataFrame = {
-      val parentPlan = df.queryExecution.executedPlan
-      // Check if SparkPlan Filter is removed in a physical plan and
-      // the plan only has PhysicalRDD to scan JDBCRelation.
-      assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
-      val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
-      assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
-      assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
-      df
-    }
     assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0)
     assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID != 2")).collect().size == 2)
     assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID = 1")).collect().size == 1)
@@ -308,15 +319,6 @@ class JDBCSuite extends QueryTest
       "WHERE (THEID > 0 AND TRIM(NAME) = 'mary') OR (NAME = 'fred')")
     assert(df2.collect.toSet === Set(Row("fred", 1), Row("mary", 2)))
 
-    def checkNotPushdown(df: DataFrame): DataFrame = {
-      val parentPlan = df.queryExecution.executedPlan
-      // Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD
-      // cannot compile given predicates.
-      assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
-      val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
-      assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
-      df
-    }
     assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0)
     assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 2) != 4")).collect().size == 2)
   }
@@ -1375,4 +1377,30 @@ class JDBCSuite extends QueryTest
       Row("fred", 1) :: Nil)
 
   }
+
+  test("SPARK-24288: Enable preventing predicate pushdown") {
+    val table = "test.people"
+
+    val df = spark.read.format("jdbc")
+      .option("Url", urlWithUserAndPass)
+      .option("dbTable", table)
+      .option("pushDownPredicate", false)
+      .load()
+      .filter("theid = 1")
+      .select("name", "theid")
+    checkAnswer(
+      checkNotPushdown(df),
+      Row("fred", 1) :: Nil)
+
+    // pushDownPredicate option in the create table path.
+    sql(
+      s"""
+         |CREATE OR REPLACE TEMPORARY VIEW predicateOption
+         |USING org.apache.spark.sql.jdbc
+         |OPTIONS (url '$urlWithUserAndPass', dbTable '$table', pushDownPredicate 'false')
+       """.stripMargin.replaceAll("\n", " "))
+    checkAnswer(
+      checkNotPushdown(sql("SELECT name, theid FROM predicateOption WHERE theid = 1")),
+      Row("fred", 1) :: Nil)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org