You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/24 02:20:03 UTC

[spark] branch master updated: [SPARK-42534][SQL] Fix DB2Dialect Limit clause

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f15414e9180 [SPARK-42534][SQL] Fix DB2Dialect Limit clause
f15414e9180 is described below

commit f15414e91805e050dac8e4624298a2047ab2c5e9
Author: Ivan Sadikov <iv...@databricks.com>
AuthorDate: Fri Feb 24 10:19:46 2023 +0800

    [SPARK-42534][SQL] Fix DB2Dialect Limit clause
    
    ### What changes were proposed in this pull request?
    
    The PR fixes DB2 Limit clause syntax. Although DB2 supports LIMIT keyword, it seems that this support varies across databases and versions and the recommended way is to use `FETCH FIRST x ROWS ONLY`. In fact, some versions don't support LIMIT at all. Doc: https://www.ibm.com/docs/en/db2/11.5?topic=subselect-fetch-clause, usage example: https://www.mullinsconsulting.com/dbu_0502.htm.
    
    ### Why are the changes needed?
    
    Fixes the incorrect Limit clause which could cause errors when using against DB2 versions that don't support LIMIT.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I added a unit test and an integration test to cover this functionality.
    
    Closes #40134 from sadikovi/db2-limit-fix.
    
    Authored-by: Ivan Sadikov <iv...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/jdbc/DB2IntegrationSuite.scala | 21 +++++++++++++++++++++
 .../org/apache/spark/sql/jdbc/DB2Dialect.scala      |  4 ++++
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala    |  4 ++--
 .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 13 +++++++++++++
 4 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
index 6cee6622e1c..e4251512e43 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala
@@ -217,4 +217,25 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationSuite {
     assert(actual.length === 2)
     assert(actual.toSet === expectedResult)
   }
+
+  test("SPARK-42534: DB2 Limit pushdown test") {
+    val actual = sqlContext.read
+      .format("jdbc")
+      .option("url", jdbcUrl)
+      .option("dbtable", "tbl")
+      .load()
+      .limit(2)
+      .select("x", "y")
+      .orderBy("x")
+      .collect()
+
+    val expected = sqlContext.read
+      .format("jdbc")
+      .option("url", jdbcUrl)
+      .option("query", "SELECT x, y FROM tbl ORDER BY x FETCH FIRST 2 ROWS ONLY")
+      .load()
+      .collect()
+
+    assert(actual === expected)
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
index 6c7c1bfe737..5889be880dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala
@@ -160,4 +160,8 @@ private object DB2Dialect extends JdbcDialect {
       s"DROP SCHEMA ${quoteIdentifier(schema)} RESTRICT"
     }
   }
+
+  override def getLimitClause(limit: Integer): String = {
+    if (limit > 0) s"FETCH FIRST $limit ROWS ONLY" else ""
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 2e9477356e6..855fa6857af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -541,14 +541,14 @@ abstract class JdbcDialect extends Serializable with Logging {
    * Returns the LIMIT clause for the SELECT statement
    */
   def getLimitClause(limit: Integer): String = {
-    if (limit > 0 ) s"LIMIT $limit" else ""
+    if (limit > 0) s"LIMIT $limit" else ""
   }
 
   /**
    * Returns the OFFSET clause for the SELECT statement
    */
   def getOffsetClause(offset: Integer): String = {
-    if (offset > 0 ) s"OFFSET $offset" else ""
+    if (offset > 0) s"OFFSET $offset" else ""
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 27609de5433..34c5fada19c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1028,6 +1028,19 @@ class JDBCSuite extends QueryTest with SharedSparkSession {
       "SELECT TOP (123) a,b FROM test")
   }
 
+  test("SPARK-42534: DB2Dialect Limit query test") {
+    // JDBC url is a required option but is not used in this test.
+    val options = new JDBCOptions(Map("url" -> "jdbc:db2://host:port", "dbtable" -> "test"))
+    assert(
+      DB2Dialect
+        .getJdbcSQLQueryBuilder(options)
+        .withColumns(Array("a", "b"))
+        .withLimit(123)
+        .build()
+        .trim() ==
+      "SELECT a,b FROM test     FETCH FIRST 123 ROWS ONLY")
+  }
+
   test("table exists query by jdbc dialect") {
     val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
     val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org