You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/30 11:11:06 UTC

[spark] branch branch-3.2 updated: [SPARK-36574][SQL] pushDownPredicate=false should prevent push down filters to JDBC data source

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new d42536a  [SPARK-36574][SQL] pushDownPredicate=false should prevent push down filters to JDBC data source
d42536a is described below

commit d42536a6eedceba5d6572968a6dd0946cfaaffca
Author: gengjiaan <ge...@360.cn>
AuthorDate: Mon Aug 30 19:09:28 2021 +0800

    [SPARK-36574][SQL] pushDownPredicate=false should prevent push down filters to JDBC data source
    
    ### What changes were proposed in this pull request?
    Spark SQL includes a data source that can read data from other databases using JDBC.
    Spark also supports the case-insensitive option `pushDownPredicate`.
    According to http://spark.apache.org/docs/latest/sql-data-sources-jdbc.html, If set `pushDownPredicate` to false, no filter will be pushed down to the JDBC data source and thus all filters will be handled by Spark.
    But I find it still be pushed down to JDBC data source.
    
    ### Why are the changes needed?
    Fix bug `pushDownPredicate`=false failed to prevent push down filters to JDBC data source.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    The output of query will not change.
    
    ### How was this patch tested?
    Jenkins test.
    
    Closes #33822 from beliefer/SPARK-36574.
    
    Authored-by: gengjiaan <ge...@360.cn>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit fcc91cfec4d939eeebfa8cd88f2791aca48645c6)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/execution/datasources/jdbc/JDBCRDD.scala   | 14 ++++-----
 .../execution/datasources/jdbc/JDBCRelation.scala  |  8 ++++-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 34 ++++++++++++++++++++++
 3 files changed, 46 insertions(+), 10 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index f26897d..e024e4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -172,12 +172,12 @@ object JDBCRDD extends Logging {
    *
    * @param sc - Your SparkContext.
    * @param schema - The Catalyst schema of the underlying database table.
-   * @param requiredColumns - The names of the columns to SELECT.
+   * @param requiredColumns - The names of the columns or aggregate columns to SELECT.
    * @param filters - The filters to include in all WHERE clauses.
    * @param parts - An array of JDBCPartitions specifying partition ids and
    *    per-partition WHERE clauses.
    * @param options - JDBC options that contains url, table and other information.
-   * @param outputSchema - The schema of the columns to SELECT.
+   * @param outputSchema - The schema of the columns or aggregate columns to SELECT.
    * @param groupByColumns - The pushed down group by columns.
    *
    * @return An RDD representing "SELECT requiredColumns FROM fqTable".
@@ -213,8 +213,8 @@ object JDBCRDD extends Logging {
 }
 
 /**
- * An RDD representing a table in a database accessed via JDBC.  Both the
- * driver code and the workers must be able to access the database; the driver
+ * An RDD representing a query is related to a table in a database accessed via JDBC.
+ * Both the driver code and the workers must be able to access the database; the driver
  * needs to fetch the schema while the workers need to fetch the data.
  */
 private[jdbc] class JDBCRDD(
@@ -237,11 +237,7 @@ private[jdbc] class JDBCRDD(
   /**
    * `columns`, but as a String suitable for injection into a SQL query.
    */
-  private val columnList: String = {
-    val sb = new StringBuilder()
-    columns.foreach(x => sb.append(",").append(x))
-    if (sb.isEmpty) "1" else sb.substring(1)
-  }
+  private val columnList: String = if (columns.isEmpty) "1" else columns.mkString(",")
 
   /**
    * `filters`, but as a WHERE clause suitable for injection into a SQL query.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 60d88b6..8098fa0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -278,12 +278,18 @@ private[sql] case class JDBCRelation(
   }
 
   override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+    // When pushDownPredicate is false, all Filters that need to be pushed down should be ignored
+    val pushedFilters = if (jdbcOptions.pushDownPredicate) {
+      filters
+    } else {
+      Array.empty[Filter]
+    }
     // Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
     JDBCRDD.scanTable(
       sparkSession.sparkContext,
       schema,
       requiredColumns,
-      filters,
+      pushedFilters,
       parts,
       jdbcOptions).asInstanceOf[RDD[Row]]
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 95a9161..8842db2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1723,6 +1723,40 @@ class JDBCSuite extends QueryTest
       Row("fred", 1) :: Nil)
   }
 
+  test(
+    "SPARK-36574: pushDownPredicate=false should prevent push down filters to JDBC data source") {
+    val df = spark.read.format("jdbc")
+      .option("Url", urlWithUserAndPass)
+      .option("dbTable", "test.people")
+    val df1 = df
+      .option("pushDownPredicate", false)
+      .load()
+      .filter("theid = 1")
+      .select("name", "theid")
+    val df2 = df
+      .option("pushDownPredicate", true)
+      .load()
+      .filter("theid = 1")
+      .select("name", "theid")
+    val df3 = df
+      .load()
+      .select("name", "theid")
+
+    def getRowCount(df: DataFrame): Long = {
+      val queryExecution = df.queryExecution
+      val rawPlan = queryExecution.executedPlan.collect {
+        case p: DataSourceScanExec => p
+      } match {
+        case Seq(p) => p
+        case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
+      }
+      rawPlan.execute().count()
+    }
+
+    assert(getRowCount(df1) == df3.count)
+    assert(getRowCount(df2) < df3.count)
+  }
+
   test("SPARK-26383 throw IllegalArgumentException if wrong kind of driver to the given url") {
     val e = intercept[IllegalArgumentException] {
       val opts = Map(

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org