You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/13 03:12:07 UTC
[spark] branch master updated: [SPARK-42740][SQL] Fix the bug that pushdown offset or paging is invalid for some built-in dialect
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new aea76ce0050 [SPARK-42740][SQL] Fix the bug that pushdown offset or paging is invalid for some built-in dialect
aea76ce0050 is described below
commit aea76ce00505abe3319b39c662994d564d54f660
Author: Jiaan Geng <be...@163.com>
AuthorDate: Mon Mar 13 11:11:50 2023 +0800
[SPARK-42740][SQL] Fix the bug that pushdown offset or paging is invalid for some built-in dialect
### What changes were proposed in this pull request?
Currently, the DS V2 pushdown framework pushed offset as `OFFSET n` in default and pushed it with limit as `LIMIT m OFFSET n`. But some built-in dialect doesn't support these syntax. So, when Spark pushdown offset into these databases, them throwing errors.
### Why are the changes needed?
Fix the bug that pushdown offset or paging is invalid for some built-in dialect.
### Does this PR introduce _any_ user-facing change?
'Yes'.
The bug will be fixed.
### How was this patch tested?
New test cases.
Closes #40359 from beliefer/SPARK-42740.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/jdbc/v2/DB2IntegrationSuite.scala | 5 ++
.../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala | 5 ++
.../spark/sql/jdbc/v2/OracleIntegrationSuite.scala | 5 ++
.../sql/jdbc/v2/PostgresIntegrationSuite.scala | 5 ++
.../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 66 ++++++++++++++++++++++
.../org/apache/spark/sql/jdbc/DB2Dialect.scala | 21 +++++++
.../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 3 +-
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 29 ++++++++++
.../org/apache/spark/sql/jdbc/OracleDialect.scala | 31 ++++++++--
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +-
10 files changed, 165 insertions(+), 7 deletions(-)
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
index b4f832e7902..6a42158f587 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
@@ -63,6 +63,7 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
.set("spark.sql.catalog.db2.url", db.getJdbcUrl(dockerIp, externalPort))
.set("spark.sql.catalog.db2.pushDownAggregate", "true")
.set("spark.sql.catalog.db2.pushDownLimit", "true")
+ .set("spark.sql.catalog.db2.pushDownOffset", "true")
override def tablePreparation(connection: Connection): Unit = {
connection.prepareStatement(
@@ -97,6 +98,10 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
override def caseConvert(tableName: String): String = tableName.toUpperCase(Locale.ROOT)
+ testOffset()
+ testLimitAndOffset()
+ testPaging()
+
testVarPop()
testVarPop(true)
testVarSamp()
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index 97f9843b9ce..41a42e21f44 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -56,6 +56,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
.set("spark.sql.catalog.mysql.url", db.getJdbcUrl(dockerIp, externalPort))
.set("spark.sql.catalog.mysql.pushDownAggregate", "true")
.set("spark.sql.catalog.mysql.pushDownLimit", "true")
+ .set("spark.sql.catalog.mysql.pushDownOffset", "true")
override val connectionTimeout = timeout(7.minutes)
@@ -124,6 +125,10 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
override def indexOptions: String = "KEY_BLOCK_SIZE=10"
+ testOffset()
+ testLimitAndOffset()
+ testPaging()
+
testVarPop()
testVarSamp()
testStddevPop()
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
index 1f8e55d04f1..a8106026527 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
@@ -77,6 +77,7 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes
.set("spark.sql.catalog.oracle.url", db.getJdbcUrl(dockerIp, externalPort))
.set("spark.sql.catalog.oracle.pushDownAggregate", "true")
.set("spark.sql.catalog.oracle.pushDownLimit", "true")
+ .set("spark.sql.catalog.oracle.pushDownOffset", "true")
override val connectionTimeout = timeout(7.minutes)
@@ -105,6 +106,10 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes
override def caseConvert(tableName: String): String = tableName.toUpperCase(Locale.ROOT)
+ testOffset()
+ testLimitAndOffset()
+ testPaging()
+
testVarPop()
testVarSamp()
testStddevPop()
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
index db3a80ffeaa..4065dbcc036 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
@@ -52,6 +52,7 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
.set("spark.sql.catalog.postgresql.pushDownTableSample", "true")
.set("spark.sql.catalog.postgresql.pushDownLimit", "true")
.set("spark.sql.catalog.postgresql.pushDownAggregate", "true")
+ .set("spark.sql.catalog.postgresql.pushDownOffset", "true")
override def tablePreparation(connection: Connection): Unit = {
connection.prepareStatement(
@@ -90,6 +91,10 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
override def indexOptions: String = "FILLFACTOR=70"
+ testOffset()
+ testLimitAndOffset()
+ testPaging()
+
testVarPop()
testVarPop(true)
testVarSamp()
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index 323aeab477e..97ee3385090 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -410,6 +410,15 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
assert(sorts.isEmpty)
}
+ private def checkOffsetPushed(df: DataFrame, offset: Option[Int]): Unit = {
+ df.queryExecution.optimizedPlan.collect {
+ case relation: DataSourceV2ScanRelation => relation.scan match {
+ case v1: V1ScanWrapper =>
+ assert(v1.pushedDownOperators.offset == offset)
+ }
+ }
+ }
+
test("simple scan with LIMIT") {
val df = sql(s"SELECT name, salary, bonus FROM $catalogAndNamespace." +
s"${caseConvert("employee")} WHERE dept > 0 LIMIT 1")
@@ -445,6 +454,63 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
}
}
+ protected def testOffset(): Unit = {
+ test("simple scan with OFFSET") {
+ val df = sql(s"SELECT name, salary, bonus FROM $catalogAndNamespace." +
+ s"${caseConvert("employee")} WHERE dept > 0 OFFSET 4")
+ checkOffsetPushed(df, Some(4))
+ val rows = df.collect()
+ assert(rows.length === 1)
+ assert(rows(0).getString(0) === "jen")
+ assert(rows(0).getDecimal(1) === new java.math.BigDecimal("12000.00"))
+ assert(rows(0).getDouble(2) === 1200d)
+ }
+ }
+
+ protected def testLimitAndOffset(): Unit = {
+ test("simple scan with LIMIT and OFFSET") {
+ val df = sql(s"SELECT name, salary, bonus FROM $catalogAndNamespace." +
+ s"${caseConvert("employee")} WHERE dept > 0 LIMIT 1 OFFSET 2")
+ assert(limitPushed(df, 3))
+ checkOffsetPushed(df, Some(2))
+ val rows = df.collect()
+ assert(rows.length === 1)
+ assert(rows(0).getString(0) === "cathy")
+ assert(rows(0).getDecimal(1) === new java.math.BigDecimal("9000.00"))
+ assert(rows(0).getDouble(2) === 1200d)
+ }
+ }
+
+ protected def testPaging(): Unit = {
+ test("simple scan with paging: top N and OFFSET") {
+ Seq(NullOrdering.values()).flatten.foreach { nullOrdering =>
+ val df1 = sql(s"SELECT name, salary, bonus FROM $catalogAndNamespace." +
+ s"${caseConvert("employee")}" +
+ s" WHERE dept > 0 ORDER BY salary $nullOrdering, bonus LIMIT 1 OFFSET 2")
+ assert(limitPushed(df1, 3))
+ checkOffsetPushed(df1, Some(2))
+ checkSortRemoved(df1)
+ val rows1 = df1.collect()
+ assert(rows1.length === 1)
+ assert(rows1(0).getString(0) === "david")
+ assert(rows1(0).getDecimal(1) === new java.math.BigDecimal("10000.00"))
+ assert(rows1(0).getDouble(2) === 1300d)
+
+ val df2 = sql(s"SELECT name, salary, bonus FROM $catalogAndNamespace." +
+ s"${caseConvert("employee")}" +
+ s" WHERE dept > 0 ORDER BY salary DESC $nullOrdering, bonus LIMIT 1 OFFSET 2")
+ assert(limitPushed(df2, 3))
+ checkOffsetPushed(df2, Some(2))
+ checkSortRemoved(df2)
+ val rows2 = df2.collect()
+ assert(rows2.length === 1)
+ assert(rows2(0).getString(0) === "amy")
+ assert(rows2(0).getDecimal(1) === new java.math.BigDecimal("10000.00"))
+ assert(rows2(0).getDouble(2) === 1000d)
+ }
+ }
+ }
+
private def checkAggregateRemoved(df: DataFrame): Unit = {
val aggregates = df.queryExecution.optimizedPlan.collect {
case agg: Aggregate => agg
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index 5889be880dd..a6ae5a8abf4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -25,6 +25,7 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.connector.expressions.Expression
+import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.types._
private object DB2Dialect extends JdbcDialect {
@@ -164,4 +165,24 @@ private object DB2Dialect extends JdbcDialect {
override def getLimitClause(limit: Integer): String = {
if (limit > 0) s"FETCH FIRST $limit ROWS ONLY" else ""
}
+
+ override def getOffsetClause(offset: Integer): String = {
+ if (offset > 0) s"OFFSET $offset ROWS" else ""
+ }
+
+ class DB2SQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions)
+ extends JdbcSQLQueryBuilder(dialect, options) {
+
+ override def build(): String = {
+ val limitClause = dialect.getLimitClause(limit)
+ val offsetClause = dialect.getOffsetClause(offset)
+
+ options.prepareQuery +
+ s"SELECT $columnList FROM ${options.tableOrQuery} $tableSampleClause" +
+ s" $whereClause $groupByClause $orderByClause $offsetClause $limitClause"
+ }
+ }
+
+ override def getJdbcSQLQueryBuilder(options: JDBCOptions): JdbcSQLQueryBuilder =
+ new DB2SQLQueryBuilder(this, options)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 156f495943e..fc0d2d2470a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -200,7 +200,6 @@ private object MsSqlServerDialect extends JdbcDialect {
class MsSqlServerSQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions)
extends JdbcSQLQueryBuilder(dialect, options) {
- // TODO[SPARK-42289]: DS V2 pushdown could let JDBC dialect decide to push down offset
override def build(): String = {
val limitClause = dialect.getLimitClause(limit)
@@ -212,4 +211,6 @@ private object MsSqlServerDialect extends JdbcDialect {
override def getJdbcSQLQueryBuilder(options: JDBCOptions): JdbcSQLQueryBuilder =
new MsSqlServerSQLQueryBuilder(this, options)
+
+ override def supportsOffset: Boolean = false
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 1f615ed76c5..7db2237c474 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -291,4 +291,33 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
throw QueryExecutionErrors.unsupportedDropNamespaceRestrictError()
}
}
+
+ class MySQLSQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions)
+ extends JdbcSQLQueryBuilder(dialect, options) {
+
+ override def build(): String = {
+ val limitOrOffsetStmt = if (limit > 0) {
+ if (offset > 0) {
+ s"LIMIT $offset, $limit"
+ } else {
+ dialect.getLimitClause(limit)
+ }
+ } else if (offset > 0) {
+ // MySQL doesn't support OFFSET without LIMIT. According to the suggestion of MySQL
+ // official website, in order to retrieve all rows from a certain offset up to the end of
+ // the result set, you can use some large number for the second parameter. Please refer:
+ // https://dev.mysql.com/doc/refman/8.0/en/select.html
+ s"LIMIT $offset, 18446744073709551615"
+ } else {
+ ""
+ }
+
+ options.prepareQuery +
+ s"SELECT $columnList FROM ${options.tableOrQuery} $tableSampleClause" +
+ s" $whereClause $groupByClause $orderByClause $limitOrOffsetStmt"
+ }
+ }
+
+ override def getJdbcSQLQueryBuilder(options: JDBCOptions): JdbcSQLQueryBuilder =
+ new MySQLSQLQueryBuilder(this, options)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
index d0e925bad38..55b4f1eb004 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala
@@ -181,19 +181,40 @@ private case object OracleDialect extends JdbcDialect {
if (limit > 0) s"WHERE rownum <= $limit" else ""
}
+ override def getOffsetClause(offset: Integer): String = {
+ // Oracle doesn't support OFFSET clause.
+ // We can use rownum > n to skip some rows in the result set.
+ // Note: rn is an alias of rownum.
+ if (offset > 0) s"WHERE rn > $offset" else ""
+ }
+
class OracleSQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions)
extends JdbcSQLQueryBuilder(dialect, options) {
- // TODO[SPARK-42289]: DS V2 pushdown could let JDBC dialect decide to push down offset
override def build(): String = {
val selectStmt = s"SELECT $columnList FROM ${options.tableOrQuery} $tableSampleClause" +
s" $whereClause $groupByClause $orderByClause"
- if (limit > 0) {
- val limitClause = dialect.getLimitClause(limit)
- options.prepareQuery + s"SELECT tab.* FROM ($selectStmt) tab $limitClause"
+ val finalSelectStmt = if (limit > 0) {
+ if (offset > 0) {
+ // Because the rownum is calculated when the value is returned,
+ // if we not give an alias for rownum and using it directly, e.g.
+ // SELECT $columnList FROM ($selectStmt) tab WHERE rownum > $offset AND
+ // rownum <= ${limit + offset}. The result is incorrect.
+ s"SELECT $columnList FROM (SELECT tab.*, rownum rn FROM ($selectStmt) tab)" +
+ s" WHERE rn > $offset AND rn <= ${limit + offset}"
+ } else {
+ val limitClause = dialect.getLimitClause(limit)
+ s"SELECT tab.* FROM ($selectStmt) tab $limitClause"
+ }
+ } else if (offset > 0) {
+ val offsetClause = dialect.getOffsetClause(offset)
+ // Using rownum directly will lead to incorrect result too.
+ s"SELECT $columnList FROM (SELECT tab.*, rownum rn FROM ($selectStmt) tab) $offsetClause"
} else {
- options.prepareQuery + selectStmt
+ selectStmt
}
+
+ options.prepareQuery + finalSelectStmt
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 34c5fada19c..0d102e1632a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1038,7 +1038,7 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
.withLimit(123)
.build()
.trim() ==
- "SELECT a,b FROM test FETCH FIRST 123 ROWS ONLY")
+ "SELECT a,b FROM test FETCH FIRST 123 ROWS ONLY")
}
test("table exists query by jdbc dialect") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org