You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/30 11:11:06 UTC
[spark] branch branch-3.2 updated: [SPARK-36574][SQL]
pushDownPredicate=false should prevent push down filters to JDBC data
source
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new d42536a [SPARK-36574][SQL] pushDownPredicate=false should prevent push down filters to JDBC data source
d42536a is described below
commit d42536a6eedceba5d6572968a6dd0946cfaaffca
Author: gengjiaan <ge...@360.cn>
AuthorDate: Mon Aug 30 19:09:28 2021 +0800
[SPARK-36574][SQL] pushDownPredicate=false should prevent push down filters to JDBC data source
### What changes were proposed in this pull request?
Spark SQL includes a data source that can read data from other databases using JDBC.
Spark also supports the case-insensitive option `pushDownPredicate`.
According to http://spark.apache.org/docs/latest/sql-data-sources-jdbc.html, If set `pushDownPredicate` to false, no filter will be pushed down to the JDBC data source and thus all filters will be handled by Spark.
But I find it still be pushed down to JDBC data source.
### Why are the changes needed?
Fix bug `pushDownPredicate`=false failed to prevent push down filters to JDBC data source.
### Does this PR introduce _any_ user-facing change?
'No'.
The output of query will not change.
### How was this patch tested?
Jenkins test.
Closes #33822 from beliefer/SPARK-36574.
Authored-by: gengjiaan <ge...@360.cn>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit fcc91cfec4d939eeebfa8cd88f2791aca48645c6)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/execution/datasources/jdbc/JDBCRDD.scala | 14 ++++-----
.../execution/datasources/jdbc/JDBCRelation.scala | 8 ++++-
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 34 ++++++++++++++++++++++
3 files changed, 46 insertions(+), 10 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index f26897d..e024e4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -172,12 +172,12 @@ object JDBCRDD extends Logging {
*
* @param sc - Your SparkContext.
* @param schema - The Catalyst schema of the underlying database table.
- * @param requiredColumns - The names of the columns to SELECT.
+ * @param requiredColumns - The names of the columns or aggregate columns to SELECT.
* @param filters - The filters to include in all WHERE clauses.
* @param parts - An array of JDBCPartitions specifying partition ids and
* per-partition WHERE clauses.
* @param options - JDBC options that contains url, table and other information.
- * @param outputSchema - The schema of the columns to SELECT.
+ * @param outputSchema - The schema of the columns or aggregate columns to SELECT.
* @param groupByColumns - The pushed down group by columns.
*
* @return An RDD representing "SELECT requiredColumns FROM fqTable".
@@ -213,8 +213,8 @@ object JDBCRDD extends Logging {
}
/**
- * An RDD representing a table in a database accessed via JDBC. Both the
- * driver code and the workers must be able to access the database; the driver
+ * An RDD representing a query is related to a table in a database accessed via JDBC.
+ * Both the driver code and the workers must be able to access the database; the driver
* needs to fetch the schema while the workers need to fetch the data.
*/
private[jdbc] class JDBCRDD(
@@ -237,11 +237,7 @@ private[jdbc] class JDBCRDD(
/**
* `columns`, but as a String suitable for injection into a SQL query.
*/
- private val columnList: String = {
- val sb = new StringBuilder()
- columns.foreach(x => sb.append(",").append(x))
- if (sb.isEmpty) "1" else sb.substring(1)
- }
+ private val columnList: String = if (columns.isEmpty) "1" else columns.mkString(",")
/**
* `filters`, but as a WHERE clause suitable for injection into a SQL query.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
index 60d88b6..8098fa0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
@@ -278,12 +278,18 @@ private[sql] case class JDBCRelation(
}
override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+ // When pushDownPredicate is false, all Filters that need to be pushed down should be ignored
+ val pushedFilters = if (jdbcOptions.pushDownPredicate) {
+ filters
+ } else {
+ Array.empty[Filter]
+ }
// Rely on a type erasure hack to pass RDD[InternalRow] back as RDD[Row]
JDBCRDD.scanTable(
sparkSession.sparkContext,
schema,
requiredColumns,
- filters,
+ pushedFilters,
parts,
jdbcOptions).asInstanceOf[RDD[Row]]
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 95a9161..8842db2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1723,6 +1723,40 @@ class JDBCSuite extends QueryTest
Row("fred", 1) :: Nil)
}
+ test(
+ "SPARK-36574: pushDownPredicate=false should prevent push down filters to JDBC data source") {
+ val df = spark.read.format("jdbc")
+ .option("Url", urlWithUserAndPass)
+ .option("dbTable", "test.people")
+ val df1 = df
+ .option("pushDownPredicate", false)
+ .load()
+ .filter("theid = 1")
+ .select("name", "theid")
+ val df2 = df
+ .option("pushDownPredicate", true)
+ .load()
+ .filter("theid = 1")
+ .select("name", "theid")
+ val df3 = df
+ .load()
+ .select("name", "theid")
+
+ def getRowCount(df: DataFrame): Long = {
+ val queryExecution = df.queryExecution
+ val rawPlan = queryExecution.executedPlan.collect {
+ case p: DataSourceScanExec => p
+ } match {
+ case Seq(p) => p
+ case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")
+ }
+ rawPlan.execute().count()
+ }
+
+ assert(getRowCount(df1) == df3.count)
+ assert(getRowCount(df2) < df3.count)
+ }
+
test("SPARK-26383 throw IllegalArgumentException if wrong kind of driver to the given url") {
val e = intercept[IllegalArgumentException] {
val opts = Map(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org