You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/04/14 04:11:13 UTC

[spark] branch master updated: [SPARK-38889][SQL] Compile boolean column filters to use the bit type for MSSQL data source

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 320f88d5444 [SPARK-38889][SQL] Compile boolean column filters to use the bit type for MSSQL data source
320f88d5444 is described below

commit 320f88d54440e05228a90ef5663991e28ae07c95
Author: allisonwang-db <al...@databricks.com>
AuthorDate: Thu Apr 14 13:11:00 2022 +0900

    [SPARK-38889][SQL] Compile boolean column filters to use the bit type for MSSQL data source
    
    ### What changes were proposed in this pull request?
    This PR compiles the boolean data type to the bit data type for pushed column filters while querying the MSSQL data soruce. Microsoft SQL Server does not support the boolean type, so the JDBC dialect should use the bit data type instead.
    
    ### Why are the changes needed?
    
    To fix a bug that was exposed by the boolean column filter pushdown to SQL server data source.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added a new integration test.
    
    Closes #36182 from allisonwang-db/spark-38889-mssql-predicate-pushdown.
    
    Authored-by: allisonwang-db <al...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../spark/sql/jdbc/MsSqlServerIntegrationSuite.scala    | 17 +++++++++++++++++
 .../org/apache/spark/sql/jdbc/MsSqlServerDialect.scala  | 10 ++++++++++
 2 files changed, 27 insertions(+)

diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
index 5992253a958..e293f9a8f7b 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
@@ -22,6 +22,7 @@ import java.sql.{Connection, Date, Timestamp}
 import java.util.Properties
 
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
+import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.tags.DockerTest
 
@@ -140,6 +141,14 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
         |'MULTIPOLYGON(((2 2, 2 -2, -2 -2, -2 2, 2 2)),((1 1, 3 1, 3 3, 1 3, 1 1)))',
         |'GEOMETRYCOLLECTION(LINESTRING(1 1, 3 5),POLYGON((-1 -1, -1 -5, -5 -5, -5 -1, -1 -1)))')
       """.stripMargin).executeUpdate()
+    conn.prepareStatement(
+      """
+        |CREATE TABLE bits(a INT, b INT, c BIT)
+        |""".stripMargin).executeUpdate()
+    conn.prepareStatement(
+      """
+        |INSERT INTO bits VALUES (1, 2, 1)
+      """.stripMargin).executeUpdate()
   }
 
   test("Basic test") {
@@ -357,4 +366,12 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
         0, 3, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0,
         0, 0, 0, 1, 0, 0, 0, 3))
   }
+
+  test("SPARK-38889: MsSqlServerDialect should handle boolean filter push down") {
+    val df = spark.read.jdbc(jdbcUrl, "bits", new Properties)
+    val rows = df.collect()
+    assert(rows.length == 1)
+    val filtered = df.where(col("c") === 0).collect()
+    assert(filtered.length == 0)
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 8d2fbec55f9..a42129dbe8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -40,6 +40,16 @@ private object MsSqlServerDialect extends JdbcDialect {
   override def canHandle(url: String): Boolean =
     url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")
 
+  // Microsoft SQL Server does not have the boolean type.
+  // Compile the boolean value to the bit data type instead.
+  // scalastyle:off line.size.limit
+  // See https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver15
+  // scalastyle:on line.size.limit
+  override def compileValue(value: Any): Any = value match {
+    case booleanValue: Boolean => if (booleanValue) 1 else 0
+    case other => super.compileValue(other)
+  }
+
   // scalastyle:off line.size.limit
   // See https://docs.microsoft.com/en-us/sql/t-sql/functions/aggregate-functions-transact-sql?view=sql-server-ver15
   // scalastyle:on line.size.limit


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org