You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/13 03:12:07 UTC

[spark] branch master updated: [SPARK-42740][SQL] Fix the bug that pushdown offset or paging is invalid for some built-in dialect

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new aea76ce0050 [SPARK-42740][SQL] Fix the bug that pushdown offset or paging is invalid for some built-in dialect
aea76ce0050 is described below

commit aea76ce00505abe3319b39c662994d564d54f660
Author: Jiaan Geng <be...@163.com>
AuthorDate: Mon Mar 13 11:11:50 2023 +0800

    [SPARK-42740][SQL] Fix the bug that pushdown offset or paging is invalid for some built-in dialect
    
    ### What changes were proposed in this pull request?
    Currently, the DS V2 pushdown framework pushed offset as `OFFSET n` in default and pushed it with limit as `LIMIT m OFFSET n`. But some built-in dialect doesn't support these syntax. So, when Spark pushdown offset into these databases, them throwing errors.
    
    ### Why are the changes needed?
    Fix the bug that pushdown offset or paging is invalid for some built-in dialect.
    
    ### Does this PR introduce _any_ user-facing change?
    'Yes'.
    The bug will be fixed.
    
    ### How was this patch tested?
    New test cases.
    
    Closes #40359 from beliefer/SPARK-42740.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/jdbc/v2/DB2IntegrationSuite.scala    |  5 ++
 .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala  |  5 ++
 .../spark/sql/jdbc/v2/OracleIntegrationSuite.scala |  5 ++
 .../sql/jdbc/v2/PostgresIntegrationSuite.scala     |  5 ++
 .../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala  | 66 ++++++++++++++++++++++
 .../org/apache/spark/sql/jdbc/DB2Dialect.scala     | 21 +++++++
 .../apache/spark/sql/jdbc/MsSqlServerDialect.scala |  3 +-
 .../org/apache/spark/sql/jdbc/MySQLDialect.scala   | 29 ++++++++++
 .../org/apache/spark/sql/jdbc/OracleDialect.scala  | 31 ++++++++--
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      |  2 +-
 10 files changed, 165 insertions(+), 7 deletions(-)

diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
index b4f832e7902..6a42158f587 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
@@ -63,6 +63,7 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
     .set("spark.sql.catalog.db2.url", db.getJdbcUrl(dockerIp, externalPort))
     .set("spark.sql.catalog.db2.pushDownAggregate", "true")
     .set("spark.sql.catalog.db2.pushDownLimit", "true")
+    .set("spark.sql.catalog.db2.pushDownOffset", "true")
 
   override def tablePreparation(connection: Connection): Unit = {
     connection.prepareStatement(
@@ -97,6 +98,10 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
 
   override def caseConvert(tableName: String): String = tableName.toUpperCase(Locale.ROOT)
 
+  testOffset()
+  testLimitAndOffset()
+  testPaging()
+
   testVarPop()
   testVarPop(true)
   testVarSamp()
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index 97f9843b9ce..41a42e21f44 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -56,6 +56,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
     .set("spark.sql.catalog.mysql.url", db.getJdbcUrl(dockerIp, externalPort))
     .set("spark.sql.catalog.mysql.pushDownAggregate", "true")
     .set("spark.sql.catalog.mysql.pushDownLimit", "true")
+    .set("spark.sql.catalog.mysql.pushDownOffset", "true")
 
   override val connectionTimeout = timeout(7.minutes)
 
@@ -124,6 +125,10 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
 
   override def indexOptions: String = "KEY_BLOCK_SIZE=10"
 
+  testOffset()
+  testLimitAndOffset()
+  testPaging()
+
   testVarPop()
   testVarSamp()
   testStddevPop()
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
index 1f8e55d04f1..a8106026527 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
@@ -77,6 +77,7 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes
     .set("spark.sql.catalog.oracle.url", db.getJdbcUrl(dockerIp, externalPort))
     .set("spark.sql.catalog.oracle.pushDownAggregate", "true")
     .set("spark.sql.catalog.oracle.pushDownLimit", "true")
+    .set("spark.sql.catalog.oracle.pushDownOffset", "true")
 
   override val connectionTimeout = timeout(7.minutes)
 
@@ -105,6 +106,10 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes
 
   override def caseConvert(tableName: String): String = tableName.toUpperCase(Locale.ROOT)
 
+  testOffset()
+  testLimitAndOffset()
+  testPaging()
+
   testVarPop()
   testVarSamp()
   testStddevPop()
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
index db3a80ffeaa..4065dbcc036 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
@@ -52,6 +52,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
     .set("spark.sql.catalog.postgresql.pushDownTableSample", "true")
     .set("spark.sql.catalog.postgresql.pushDownLimit", "true")
     .set("spark.sql.catalog.postgresql.pushDownAggregate", "true")
+    .set("spark.sql.catalog.postgresql.pushDownOffset", "true")
 
   override def tablePreparation(connection: Connection): Unit = {
     connection.prepareStatement(
@@ -90,6 +91,10 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
 
   override def indexOptions: String = "FILLFACTOR=70"
 
+  testOffset()
+  testLimitAndOffset()
+  testPaging()
+
   testVarPop()
   testVarPop(true)
   testVarSamp()
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index 323aeab477e..97ee3385090 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -410,6 +410,15 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
     assert(sorts.isEmpty)
   }
 
+  private def checkOffsetPushed(df: DataFrame, offset: Option[Int]): Unit = {
+    df.queryExecution.optimizedPlan.collect {
+      case relation: DataSourceV2ScanRelation => relation.scan match {
+        case v1: V1ScanWrapper =>
+          assert(v1.pushedDownOperators.offset == offset)
+      }
+    }
+  }
+
   test("simple scan with LIMIT") {
     val df = sql(s"SELECT name, salary, bonus FROM $catalogAndNamespace." +
       s"${caseConvert("employee")} WHERE dept > 0 LIMIT 1")
@@ -445,6 +454,63 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
     }
   }
 
+  protected def testOffset(): Unit = {
+    test("simple scan with OFFSET") {
+      val df = sql(s"SELECT name, salary, bonus FROM $catalogAndNamespace." +
+        s"${caseConvert("employee")} WHERE dept > 0 OFFSET 4")
+      checkOffsetPushed(df, Some(4))
+      val rows = df.collect()
+      assert(rows.length === 1)
+      assert(rows(0).getString(0) === "jen")
+      assert(rows(0).getDecimal(1) === new java.math.BigDecimal("12000.00"))
+      assert(rows(0).getDouble(2) === 1200d)
+    }
+  }
+
+  protected def testLimitAndOffset(): Unit = {
+    test("simple scan with LIMIT and OFFSET") {
+      val df = sql(s"SELECT name, salary, bonus FROM $catalogAndNamespace." +
+        s"${caseConvert("employee")} WHERE dept > 0 LIMIT 1 OFFSET 2")
+      assert(limitPushed(df, 3))
+      checkOffsetPushed(df, Some(2))
+      val rows = df.collect()
+      assert(rows.length === 1)
+      assert(rows(0).getString(0) === "cathy")
+      assert(rows(0).getDecimal(1) === new java.math.BigDecimal("9000.00"))
+      assert(rows(0).getDouble(2) === 1200d)
+    }
+  }
+
+  protected def testPaging(): Unit = {
+    test("simple scan with paging: top N and OFFSET") {
+      Seq(NullOrdering.values()).flatten.foreach { nullOrdering =>
+        val df1 = sql(s"SELECT name, salary, bonus FROM $catalogAndNamespace." +
+          s"${caseConvert("employee")}" +
+          s" WHERE dept > 0 ORDER BY salary $nullOrdering, bonus LIMIT 1 OFFSET 2")
+        assert(limitPushed(df1, 3))
+        checkOffsetPushed(df1, Some(2))
+        checkSortRemoved(df1)
+        val rows1 = df1.collect()
+        assert(rows1.length === 1)
+        assert(rows1(0).getString(0) === "david")
+        assert(rows1(0).getDecimal(1) === new java.math.BigDecimal("10000.00"))
+        assert(rows1(0).getDouble(2) === 1300d)
+
+        val df2 = sql(s"SELECT name, salary, bonus FROM $catalogAndNamespace." +
+          s"${caseConvert("employee")}" +
+          s" WHERE dept > 0 ORDER BY salary DESC $nullOrdering, bonus LIMIT 1 OFFSET 2")
+        assert(limitPushed(df2, 3))
+        checkOffsetPushed(df2, Some(2))
+        checkSortRemoved(df2)
+        val rows2 = df2.collect()
+        assert(rows2.length === 1)
+        assert(rows2(0).getString(0) === "amy")
+        assert(rows2(0).getDecimal(1) === new java.math.BigDecimal("10000.00"))
+        assert(rows2(0).getDouble(2) === 1000d)
+      }
+    }
+  }
+
   private def checkAggregateRemoved(df: DataFrame): Unit = {
     val aggregates = df.queryExecution.optimizedPlan.collect {
       case agg: Aggregate => agg
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index 5889be880dd..a6ae5a8abf4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -25,6 +25,7 @@ import scala.util.control.NonFatal
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
 import org.apache.spark.sql.connector.expressions.Expression
+import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
 import org.apache.spark.sql.types._
 
 private object DB2Dialect extends JdbcDialect {
@@ -164,4 +165,24 @@ private object DB2Dialect extends JdbcDialect {
   override def getLimitClause(limit: Integer): String = {
     if (limit > 0) s"FETCH FIRST $limit ROWS ONLY" else ""
   }
+
+  override def getOffsetClause(offset: Integer): String = {
+    if (offset > 0) s"OFFSET $offset ROWS" else ""
+  }
+
+  class DB2SQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions)
+    extends JdbcSQLQueryBuilder(dialect, options) {
+
+    override def build(): String = {
+      val limitClause = dialect.getLimitClause(limit)
+      val offsetClause = dialect.getOffsetClause(offset)
+
+      options.prepareQuery +
+        s"SELECT $columnList FROM ${options.tableOrQuery} $tableSampleClause" +
+        s" $whereClause $groupByClause $orderByClause $offsetClause $limitClause"
+    }
+  }
+
+  override def getJdbcSQLQueryBuilder(options: JDBCOptions): JdbcSQLQueryBuilder =
+    new DB2SQLQueryBuilder(this, options)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 156f495943e..fc0d2d2470a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -200,7 +200,6 @@ private object MsSqlServerDialect extends JdbcDialect {
   class MsSqlServerSQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions)
     extends JdbcSQLQueryBuilder(dialect, options) {
 
-    // TODO[SPARK-42289]: DS V2 pushdown could let JDBC dialect decide to push down offset
     override def build(): String = {
       val limitClause = dialect.getLimitClause(limit)
 
@@ -212,4 +211,6 @@ private object MsSqlServerDialect extends JdbcDialect {
 
   override def getJdbcSQLQueryBuilder(options: JDBCOptions): JdbcSQLQueryBuilder =
     new MsSqlServerSQLQueryBuilder(this, options)
+
+  override def supportsOffset: Boolean = false
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 1f615ed76c5..7db2237c474 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -291,4 +291,33 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
       throw QueryExecutionErrors.unsupportedDropNamespaceRestrictError()
     }
   }
+
+  class MySQLSQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions)
+    extends JdbcSQLQueryBuilder(dialect, options) {
+
+    override def build(): String = {
+      val limitOrOffsetStmt = if (limit > 0) {
+        if (offset > 0) {
+          s"LIMIT $offset, $limit"
+        } else {
+          dialect.getLimitClause(limit)
+        }
+      } else if (offset > 0) {
+        // MySQL doesn't support OFFSET without LIMIT. According to the suggestion of MySQL
+        // official website, in order to retrieve all rows from a certain offset up to the end of
+        // the result set, you can use some large number for the second parameter. Please refer:
+        // https://dev.mysql.com/doc/refman/8.0/en/select.html
+        s"LIMIT $offset, 18446744073709551615"
+      } else {
+        ""
+      }
+
+      options.prepareQuery +
+        s"SELECT $columnList FROM ${options.tableOrQuery} $tableSampleClause" +
+        s" $whereClause $groupByClause $orderByClause $limitOrOffsetStmt"
+    }
+  }
+
+  override def getJdbcSQLQueryBuilder(options: JDBCOptions): JdbcSQLQueryBuilder =
+    new MySQLSQLQueryBuilder(this, options)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index d0e925bad38..55b4f1eb004 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -181,19 +181,40 @@ private case object OracleDialect extends JdbcDialect {
     if (limit > 0) s"WHERE rownum <= $limit" else ""
   }
 
+  override def getOffsetClause(offset: Integer): String = {
+    // Oracle doesn't support OFFSET clause.
+    // We can use rownum > n to skip some rows in the result set.
+    // Note: rn is an alias of rownum.
+    if (offset > 0) s"WHERE rn > $offset" else ""
+  }
+
   class OracleSQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions)
     extends JdbcSQLQueryBuilder(dialect, options) {
 
-    // TODO[SPARK-42289]: DS V2 pushdown could let JDBC dialect decide to push down offset
     override def build(): String = {
       val selectStmt = s"SELECT $columnList FROM ${options.tableOrQuery} $tableSampleClause" +
         s" $whereClause $groupByClause $orderByClause"
-      if (limit > 0) {
-        val limitClause = dialect.getLimitClause(limit)
-        options.prepareQuery + s"SELECT tab.* FROM ($selectStmt) tab $limitClause"
+      val finalSelectStmt = if (limit > 0) {
+        if (offset > 0) {
+          // Because the rownum is calculated when the value is returned,
+          // if we not give an alias for rownum and using it directly, e.g.
+          // SELECT $columnList FROM ($selectStmt) tab WHERE rownum > $offset AND
+          // rownum <= ${limit + offset}. The result is incorrect.
+          s"SELECT $columnList FROM (SELECT tab.*, rownum rn FROM ($selectStmt) tab)" +
+            s" WHERE rn > $offset AND rn <= ${limit + offset}"
+        } else {
+          val limitClause = dialect.getLimitClause(limit)
+          s"SELECT tab.* FROM ($selectStmt) tab $limitClause"
+        }
+      } else if (offset > 0) {
+        val offsetClause = dialect.getOffsetClause(offset)
+        // Using rownum directly will lead to incorrect result too.
+        s"SELECT $columnList FROM (SELECT tab.*, rownum rn FROM ($selectStmt) tab) $offsetClause"
       } else {
-        options.prepareQuery + selectStmt
+        selectStmt
       }
+
+      options.prepareQuery + finalSelectStmt
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 34c5fada19c..0d102e1632a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1038,7 +1038,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
         .withLimit(123)
         .build()
         .trim() ==
-      "SELECT a,b FROM test     FETCH FIRST 123 ROWS ONLY")
+      "SELECT a,b FROM test      FETCH FIRST 123 ROWS ONLY")
   }
 
   test("table exists query by jdbc dialect") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org