You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/24 03:08:46 UTC
[spark] branch master updated: [SPARK-42289][SQL] DS V2 pushdown could let JDBC dialect decide to push down offset and limit
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 810218bcb3d [SPARK-42289][SQL] DS V2 pushdown could let JDBC dialect decide to push down offset and limit
810218bcb3d is described below
commit 810218bcb3de1bffcff6bc447b09ad58cf63c4c9
Author: Jiaan Geng <be...@163.com>
AuthorDate: Fri Feb 24 11:08:06 2023 +0800
[SPARK-42289][SQL] DS V2 pushdown could let JDBC dialect decide to push down offset and limit
### What changes were proposed in this pull request?
Currently, DS V2 pushdown force push down `LIMIT` clause and `OFFSET` clause to underlying databases. The behavior is not correct. For example, Oracle and MsSQLServer doesn't support `LIMIT` clause. DS V2 pushdown could let JDBC dialect decide to push down `OFFSET` and `LIMIT`.
### Why are the changes needed?
1. If the underlying database don't support `LIMIT` clause or `OFFSET` clause, the corresponding dialect could rejects push down them.
2. Although some underlying database don't support `LIMIT` clause or `OFFSET` clause, the corresponding dialect have a lot of trick to finish the same function.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
New test cases.
Closes #39954 from beliefer/SPARK-42289.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../datasources/v2/jdbc/JDBCScanBuilder.scala | 12 ++++----
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 16 ++++++++++-
.../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 33 ++++++++++++++++++++++
3 files changed, 53 insertions(+), 8 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
index 4c62c4c1c4a..dc834893db2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
@@ -44,6 +44,8 @@ case class JDBCScanBuilder(
with SupportsPushDownTopN
with Logging {
+ private val dialect = JdbcDialects.get(jdbcOptions.url)
+
private val isCaseSensitive = session.sessionState.conf.caseSensitiveAnalysis
private var pushedPredicate = Array.empty[Predicate]
@@ -60,7 +62,6 @@ case class JDBCScanBuilder(
override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
if (jdbcOptions.pushDownPredicate) {
- val dialect = JdbcDialects.get(jdbcOptions.url)
val (pushed, unSupported) = predicates.partition(dialect.compileExpression(_).isDefined)
this.pushedPredicate = pushed
unSupported
@@ -88,7 +89,6 @@ case class JDBCScanBuilder(
override def pushAggregation(aggregation: Aggregation): Boolean = {
if (!jdbcOptions.pushDownAggregate) return false
- val dialect = JdbcDialects.get(jdbcOptions.url)
val compiledAggs = aggregation.aggregateExpressions.flatMap(dialect.compileAggregate)
if (compiledAggs.length != aggregation.aggregateExpressions.length) return false
@@ -126,8 +126,7 @@ case class JDBCScanBuilder(
upperBound: Double,
withReplacement: Boolean,
seed: Long): Boolean = {
- if (jdbcOptions.pushDownTableSample &&
- JdbcDialects.get(jdbcOptions.url).supportsTableSample) {
+ if (jdbcOptions.pushDownTableSample && dialect.supportsTableSample) {
this.tableSample = Some(TableSampleInfo(lowerBound, upperBound, withReplacement, seed))
return true
}
@@ -135,7 +134,7 @@ case class JDBCScanBuilder(
}
override def pushLimit(limit: Int): Boolean = {
- if (jdbcOptions.pushDownLimit) {
+ if (jdbcOptions.pushDownLimit && dialect.supportsLimit) {
pushedLimit = limit
return true
}
@@ -143,7 +142,7 @@ case class JDBCScanBuilder(
}
override def pushOffset(offset: Int): Boolean = {
- if (jdbcOptions.pushDownOffset && !isPartiallyPushed) {
+ if (jdbcOptions.pushDownOffset && !isPartiallyPushed && dialect.supportsOffset) {
// Spark pushes down LIMIT first, then OFFSET. In SQL statements, OFFSET is applied before
// LIMIT. Here we need to adjust the LIMIT value to match SQL statements.
if (pushedLimit > 0) {
@@ -157,7 +156,6 @@ case class JDBCScanBuilder(
override def pushTopN(orders: Array[SortOrder], limit: Int): Boolean = {
if (jdbcOptions.pushDownLimit) {
- val dialect = JdbcDialects.get(jdbcOptions.url)
val compiledOrders = orders.flatMap(dialect.compileExpression(_))
if (orders.length != compiledOrders.length) return false
pushedLimit = limit
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 855fa6857af..280cffd1339 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -511,7 +511,7 @@ abstract class JdbcDialect extends Serializable with Logging {
*
* @param indexName the name of the index to be dropped.
* @param tableIdent the table on which index to be dropped.
- * @return the SQL statement to use for dropping the index.
+ * @return the SQL statement to use for dropping the index.
*/
def dropIndex(indexName: String, tableIdent: Identifier): String = {
throw new UnsupportedOperationException("dropIndex is not supported")
@@ -557,6 +557,20 @@ abstract class JdbcDialect extends Serializable with Logging {
def getJdbcSQLQueryBuilder(options: JDBCOptions): JdbcSQLQueryBuilder =
new JdbcSQLQueryBuilder(this, options)
+ /**
+ * Returns ture if dialect supports LIMIT clause.
+ *
+ * Note: Some build-in dialect supports LIMIT clause with some trick, please see:
+ * {@link OracleDialect.OracleSQLQueryBuilder} and
+ * {@link MsSqlServerDialect.MsSqlServerSQLQueryBuilder}.
+ */
+ def supportsLimit: Boolean = true
+
+ /**
+ * Returns ture if dialect supports OFFSET clause.
+ */
+ def supportsOffset: Boolean = true
+
def supportsTableSample: Boolean = false
def getTableSample(sample: TableSampleInfo): String =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 2093fe66dd7..ae0cfe17b11 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -51,6 +51,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val testH2Dialect = new JdbcDialect {
override def canHandle(url: String): Boolean = H2Dialect.canHandle(url)
+ override def supportsLimit: Boolean = false
+
+ override def supportsOffset: Boolean = false
+
class H2SQLBuilder extends JDBCSQLBuilder {
override def visitUserDefinedScalarFunction(
funcName: String, canonicalName: String, inputs: Array[String]): String = {
@@ -299,6 +303,19 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
// LIMIT is pushed down only if all the filters are pushed down
checkPushedInfo(df5, "PushedFilters: []")
checkAnswer(df5, Seq(Row(10000.00, 1000.0, "amy")))
+
+ JdbcDialects.unregisterDialect(H2Dialect)
+ try {
+ JdbcDialects.registerDialect(testH2Dialect)
+ val df6 = spark.read.table("h2.test.employee")
+ .where($"dept" === 1).limit(1)
+ checkLimitRemoved(df6, false)
+ checkPushedInfo(df6, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1]")
+ checkAnswer(df6, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
+ } finally {
+ JdbcDialects.unregisterDialect(testH2Dialect)
+ JdbcDialects.registerDialect(H2Dialect)
+ }
}
private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit = {
@@ -383,6 +400,22 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
// OFFSET is pushed down only if all the filters are pushed down
checkPushedInfo(df6, "PushedFilters: []")
checkAnswer(df6, Seq(Row(10000.00, 1300.0, "dav"), Row(9000.00, 1200.0, "cat")))
+
+ JdbcDialects.unregisterDialect(H2Dialect)
+ try {
+ JdbcDialects.registerDialect(testH2Dialect)
+ val df7 = spark.read
+ .table("h2.test.employee")
+ .where($"dept" === 1)
+ .offset(1)
+ checkOffsetRemoved(df7, false)
+ checkPushedInfo(df7,
+ "PushedFilters: [DEPT IS NOT NULL, DEPT = 1]")
+ checkAnswer(df7, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+ } finally {
+ JdbcDialects.unregisterDialect(testH2Dialect)
+ JdbcDialects.registerDialect(H2Dialect)
+ }
}
test("simple scan with LIMIT and OFFSET") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org