You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/24 03:08:46 UTC

[spark] branch master updated: [SPARK-42289][SQL] DS V2 pushdown could let JDBC dialect decide to push down offset and limit

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 810218bcb3d [SPARK-42289][SQL] DS V2 pushdown could let JDBC dialect decide to push down offset and limit
810218bcb3d is described below

commit 810218bcb3de1bffcff6bc447b09ad58cf63c4c9
Author: Jiaan Geng <be...@163.com>
AuthorDate: Fri Feb 24 11:08:06 2023 +0800

    [SPARK-42289][SQL] DS V2 pushdown could let JDBC dialect decide to push down offset and limit
    
    ### What changes were proposed in this pull request?
    Currently, DS V2 pushdown force push down `LIMIT` clause and `OFFSET` clause to underlying databases. The behavior is not correct. For example, Oracle and MsSQLServer doesn't support `LIMIT` clause. DS V2 pushdown could let JDBC dialect decide to push down `OFFSET` and `LIMIT`.
    
    ### Why are the changes needed?
    
    1. If the underlying database don't support `LIMIT` clause or `OFFSET` clause, the corresponding dialect could rejects push down them.
    2. Although some underlying database don't support `LIMIT` clause or `OFFSET` clause,  the corresponding dialect have a lot of trick to finish the same function.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    New feature.
    
    ### How was this patch tested?
    New test cases.
    
    Closes #39954 from beliefer/SPARK-42289.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../datasources/v2/jdbc/JDBCScanBuilder.scala      | 12 ++++----
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   | 16 ++++++++++-
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala    | 33 ++++++++++++++++++++++
 3 files changed, 53 insertions(+), 8 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
index 4c62c4c1c4a..dc834893db2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScanBuilder.scala
@@ -44,6 +44,8 @@ case class JDBCScanBuilder(
     with SupportsPushDownTopN
     with Logging {
 
+  private val dialect = JdbcDialects.get(jdbcOptions.url)
+
   private val isCaseSensitive = session.sessionState.conf.caseSensitiveAnalysis
 
   private var pushedPredicate = Array.empty[Predicate]
@@ -60,7 +62,6 @@ case class JDBCScanBuilder(
 
   override def pushPredicates(predicates: Array[Predicate]): Array[Predicate] = {
     if (jdbcOptions.pushDownPredicate) {
-      val dialect = JdbcDialects.get(jdbcOptions.url)
       val (pushed, unSupported) = predicates.partition(dialect.compileExpression(_).isDefined)
       this.pushedPredicate = pushed
       unSupported
@@ -88,7 +89,6 @@ case class JDBCScanBuilder(
   override def pushAggregation(aggregation: Aggregation): Boolean = {
     if (!jdbcOptions.pushDownAggregate) return false
 
-    val dialect = JdbcDialects.get(jdbcOptions.url)
     val compiledAggs = aggregation.aggregateExpressions.flatMap(dialect.compileAggregate)
     if (compiledAggs.length != aggregation.aggregateExpressions.length) return false
 
@@ -126,8 +126,7 @@ case class JDBCScanBuilder(
       upperBound: Double,
       withReplacement: Boolean,
       seed: Long): Boolean = {
-    if (jdbcOptions.pushDownTableSample &&
-      JdbcDialects.get(jdbcOptions.url).supportsTableSample) {
+    if (jdbcOptions.pushDownTableSample && dialect.supportsTableSample) {
       this.tableSample = Some(TableSampleInfo(lowerBound, upperBound, withReplacement, seed))
       return true
     }
@@ -135,7 +134,7 @@ case class JDBCScanBuilder(
   }
 
   override def pushLimit(limit: Int): Boolean = {
-    if (jdbcOptions.pushDownLimit) {
+    if (jdbcOptions.pushDownLimit && dialect.supportsLimit) {
       pushedLimit = limit
       return true
     }
@@ -143,7 +142,7 @@ case class JDBCScanBuilder(
   }
 
   override def pushOffset(offset: Int): Boolean = {
-    if (jdbcOptions.pushDownOffset && !isPartiallyPushed) {
+    if (jdbcOptions.pushDownOffset && !isPartiallyPushed && dialect.supportsOffset) {
       // Spark pushes down LIMIT first, then OFFSET. In SQL statements, OFFSET is applied before
       // LIMIT. Here we need to adjust the LIMIT value to match SQL statements.
       if (pushedLimit > 0) {
@@ -157,7 +156,6 @@ case class JDBCScanBuilder(
 
   override def pushTopN(orders: Array[SortOrder], limit: Int): Boolean = {
     if (jdbcOptions.pushDownLimit) {
-      val dialect = JdbcDialects.get(jdbcOptions.url)
       val compiledOrders = orders.flatMap(dialect.compileExpression(_))
       if (orders.length != compiledOrders.length) return false
       pushedLimit = limit
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 855fa6857af..280cffd1339 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -511,7 +511,7 @@ abstract class JdbcDialect extends Serializable with Logging {
    *
    * @param indexName the name of the index to be dropped.
    * @param tableIdent the table on which index to be dropped.
-  * @return the SQL statement to use for dropping the index.
+   * @return the SQL statement to use for dropping the index.
    */
   def dropIndex(indexName: String, tableIdent: Identifier): String = {
     throw new UnsupportedOperationException("dropIndex is not supported")
@@ -557,6 +557,20 @@ abstract class JdbcDialect extends Serializable with Logging {
   def getJdbcSQLQueryBuilder(options: JDBCOptions): JdbcSQLQueryBuilder =
     new JdbcSQLQueryBuilder(this, options)
 
+  /**
+   * Returns ture if dialect supports LIMIT clause.
+   *
+   * Note: Some build-in dialect supports LIMIT clause with some trick, please see:
+   * {@link OracleDialect.OracleSQLQueryBuilder} and
+   * {@link MsSqlServerDialect.MsSqlServerSQLQueryBuilder}.
+   */
+  def supportsLimit: Boolean = true
+
+  /**
+   * Returns ture if dialect supports OFFSET clause.
+   */
+  def supportsOffset: Boolean = true
+
   def supportsTableSample: Boolean = false
 
   def getTableSample(sample: TableSampleInfo): String =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 2093fe66dd7..ae0cfe17b11 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -51,6 +51,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
   val testH2Dialect = new JdbcDialect {
     override def canHandle(url: String): Boolean = H2Dialect.canHandle(url)
 
+    override def supportsLimit: Boolean = false
+
+    override def supportsOffset: Boolean = false
+
     class H2SQLBuilder extends JDBCSQLBuilder {
       override def visitUserDefinedScalarFunction(
           funcName: String, canonicalName: String, inputs: Array[String]): String = {
@@ -299,6 +303,19 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     // LIMIT is pushed down only if all the filters are pushed down
     checkPushedInfo(df5, "PushedFilters: []")
     checkAnswer(df5, Seq(Row(10000.00, 1000.0, "amy")))
+
+    JdbcDialects.unregisterDialect(H2Dialect)
+    try {
+      JdbcDialects.registerDialect(testH2Dialect)
+      val df6 = spark.read.table("h2.test.employee")
+        .where($"dept" === 1).limit(1)
+      checkLimitRemoved(df6, false)
+      checkPushedInfo(df6, "PushedFilters: [DEPT IS NOT NULL, DEPT = 1]")
+      checkAnswer(df6, Seq(Row(1, "amy", 10000.00, 1000.0, true)))
+    } finally {
+      JdbcDialects.unregisterDialect(testH2Dialect)
+      JdbcDialects.registerDialect(H2Dialect)
+    }
   }
 
   private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit = {
@@ -383,6 +400,22 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     // OFFSET is pushed down only if all the filters are pushed down
     checkPushedInfo(df6, "PushedFilters: []")
     checkAnswer(df6, Seq(Row(10000.00, 1300.0, "dav"), Row(9000.00, 1200.0, "cat")))
+
+    JdbcDialects.unregisterDialect(H2Dialect)
+    try {
+      JdbcDialects.registerDialect(testH2Dialect)
+      val df7 = spark.read
+        .table("h2.test.employee")
+        .where($"dept" === 1)
+        .offset(1)
+      checkOffsetRemoved(df7, false)
+      checkPushedInfo(df7,
+        "PushedFilters: [DEPT IS NOT NULL, DEPT = 1]")
+      checkAnswer(df7, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+    } finally {
+      JdbcDialects.unregisterDialect(testH2Dialect)
+      JdbcDialects.registerDialect(H2Dialect)
+    }
   }
 
   test("simple scan with LIMIT and OFFSET") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org