You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/01/13 07:12:36 UTC

[spark] branch branch-2.4 updated: [SPARK-28152][SQL][FOLLOWUP] Add a legacy conf for old MsSqlServerDialect numeric mapping

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 69de7f3  [SPARK-28152][SQL][FOLLOWUP] Add a legacy conf for old MsSqlServerDialect numeric mapping
69de7f3 is described below

commit 69de7f31c37a7e0298e66cc814afc1b0aa948bbb
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Sun Jan 12 23:03:34 2020 -0800

    [SPARK-28152][SQL][FOLLOWUP] Add a legacy conf for old MsSqlServerDialect numeric mapping
    
    This is a follow-up for https://github.com/apache/spark/pull/25248 .
    
    The new behavior cannot access the existing table which is created by old behavior.
    This PR provides a way to avoid new behavior for the existing users.
    
    Yes. This will fix the broken behavior on the existing tables.
    
    Pass the Jenkins and manually run JDBC integration test.
    ```
    build/mvn install -DskipTests
    build/mvn -Pdocker-integration-tests -pl :spark-docker-integration-tests_2.12 test
    ```
    
    Closes #27184 from dongjoon-hyun/SPARK-28152-CONF.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 28fc0437ce6d2f6fbcd83be38aafb8a491c1a67d)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/jdbc/MsSqlServerIntegrationSuite.scala     | 83 ++++++++++++++--------
 .../org/apache/spark/sql/internal/SQLConf.scala    | 10 +++
 .../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 16 +++--
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 32 +++++++--
 4 files changed, 100 insertions(+), 41 deletions(-)

diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
index efd7ca7..5738307 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
@@ -21,6 +21,7 @@ import java.math.BigDecimal
 import java.sql.{Connection, Date, Timestamp}
 import java.util.Properties
 
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.tags.DockerTest
 
 @DockerTest
@@ -112,36 +113,58 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
   }
 
   test("Numeric types") {
-    val df = spark.read.jdbc(jdbcUrl, "numbers", new Properties)
-    val rows = df.collect()
-    assert(rows.length == 1)
-    val row = rows(0)
-    val types = row.toSeq.map(x => x.getClass.toString)
-    assert(types.length == 12)
-    assert(types(0).equals("class java.lang.Boolean"))
-    assert(types(1).equals("class java.lang.Integer"))
-    assert(types(2).equals("class java.lang.Short"))
-    assert(types(3).equals("class java.lang.Integer"))
-    assert(types(4).equals("class java.lang.Long"))
-    assert(types(5).equals("class java.lang.Double"))
-    assert(types(6).equals("class java.lang.Float"))
-    assert(types(7).equals("class java.lang.Float"))
-    assert(types(8).equals("class java.math.BigDecimal"))
-    assert(types(9).equals("class java.math.BigDecimal"))
-    assert(types(10).equals("class java.math.BigDecimal"))
-    assert(types(11).equals("class java.math.BigDecimal"))
-    assert(row.getBoolean(0) == false)
-    assert(row.getInt(1) == 255)
-    assert(row.getShort(2) == 32767)
-    assert(row.getInt(3) == 2147483647)
-    assert(row.getLong(4) == 9223372036854775807L)
-    assert(row.getDouble(5) == 1.2345678901234512E14) // float = float(53) has 15-digits precision
-    assert(row.getFloat(6) == 1.23456788103168E14)   // float(24) has 7-digits precision
-    assert(row.getFloat(7) == 1.23456788103168E14)   // real = float(24)
-    assert(row.getAs[BigDecimal](8).equals(new BigDecimal("123.00")))
-    assert(row.getAs[BigDecimal](9).equals(new BigDecimal("12345.12000")))
-    assert(row.getAs[BigDecimal](10).equals(new BigDecimal("922337203685477.5800")))
-    assert(row.getAs[BigDecimal](11).equals(new BigDecimal("214748.3647")))
+    Seq(true, false).foreach { flag =>
+      withSQLConf(SQLConf.LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED.key -> s"$flag") {
+        val df = spark.read.jdbc(jdbcUrl, "numbers", new Properties)
+        val rows = df.collect()
+        assert(rows.length == 1)
+        val row = rows(0)
+        val types = row.toSeq.map(x => x.getClass.toString)
+        assert(types.length == 12)
+        assert(types(0).equals("class java.lang.Boolean"))
+        assert(types(1).equals("class java.lang.Integer"))
+        if (flag) {
+          assert(types(2).equals("class java.lang.Integer"))
+        } else {
+          assert(types(2).equals("class java.lang.Short"))
+        }
+        assert(types(3).equals("class java.lang.Integer"))
+        assert(types(4).equals("class java.lang.Long"))
+        assert(types(5).equals("class java.lang.Double"))
+        if (flag) {
+          assert(types(6).equals("class java.lang.Double"))
+          assert(types(7).equals("class java.lang.Double"))
+        } else {
+          assert(types(6).equals("class java.lang.Float"))
+          assert(types(7).equals("class java.lang.Float"))
+        }
+        assert(types(8).equals("class java.math.BigDecimal"))
+        assert(types(9).equals("class java.math.BigDecimal"))
+        assert(types(10).equals("class java.math.BigDecimal"))
+        assert(types(11).equals("class java.math.BigDecimal"))
+        assert(row.getBoolean(0) == false)
+        assert(row.getInt(1) == 255)
+        if (flag) {
+          assert(row.getInt(2) == 32767)
+        } else {
+          assert(row.getShort(2) == 32767)
+        }
+        assert(row.getInt(3) == 2147483647)
+        assert(row.getLong(4) == 9223372036854775807L)
+        assert(row.getDouble(5) == 1.2345678901234512E14) // float(53) has 15-digits precision
+        if (flag) {
+          assert(row.getDouble(6) == 1.23456788103168E14) // float(24) has 7-digits precision
+          assert(row.getDouble(7) == 1.23456788103168E14) // real = float(24)
+        } else {
+          assert(row.getFloat(6) == 1.23456788103168E14)  // float(24) has 7-digits precision
+          assert(row.getFloat(7) == 1.23456788103168E14)  // real = float(24)
+        }
+        assert(row.getAs[BigDecimal](8).equals(new BigDecimal("123.00")))
+        assert(row.getAs[BigDecimal](9).equals(new BigDecimal("12345.12000")))
+        assert(row.getAs[BigDecimal](10).equals(new BigDecimal("922337203685477.5800")))
+        assert(row.getAs[BigDecimal](11).equals(new BigDecimal("214748.3647")))
+      }
+    }
   }
 
   test("Date types") {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7a83df9..c326b27 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1584,6 +1584,13 @@ object SQLConf {
         "permission and ACLs when re-creating the table/partition paths.")
       .booleanConf
       .createWithDefault(false)
+
+  val LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED =
+     buildConf("spark.sql.legacy.mssqlserver.numericMapping.enabled")
+       .internal()
+       .doc("When true, use legacy MySqlServer SMALLINT and REAL type mapping.")
+       .booleanConf
+       .createWithDefault(false)
 }
 
 /**
@@ -1994,6 +2001,9 @@ class SQLConf extends Serializable with Logging {
   def truncateTableIgnorePermissionAcl: Boolean =
     getConf(SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL)
 
+  def legacyMsSqlServerNumericMappingEnabled: Boolean =
+    getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 805f73d..b38d1f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.jdbc
 
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 
@@ -30,10 +31,14 @@ private object MsSqlServerDialect extends JdbcDialect {
       // String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients
       Option(StringType)
     } else {
-      sqlType match {
-        case java.sql.Types.SMALLINT => Some(ShortType)
-        case java.sql.Types.REAL => Some(FloatType)
-        case _ => None
+      if (SQLConf.get.legacyMsSqlServerNumericMappingEnabled) {
+        None
+      } else {
+        sqlType match {
+          case java.sql.Types.SMALLINT => Some(ShortType)
+          case java.sql.Types.REAL => Some(FloatType)
+          case _ => None
+        }
       }
     }
   }
@@ -43,7 +48,8 @@ private object MsSqlServerDialect extends JdbcDialect {
     case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR))
     case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT))
     case BinaryType => Some(JdbcType("VARBINARY(MAX)", java.sql.Types.VARBINARY))
-    case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
+    case ShortType if !SQLConf.get.legacyMsSqlServerNumericMappingEnabled =>
+      Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
     case _ => None
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 2dcedc3..0edd226 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -887,17 +887,37 @@ class JDBCSuite extends QueryTest
       "BIT")
     assert(msSqlServerDialect.getJDBCType(BinaryType).map(_.databaseTypeDefinition).get ==
       "VARBINARY(MAX)")
-    assert(msSqlServerDialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).get ==
-      "SMALLINT")
+    Seq(true, false).foreach { flag =>
+      withSQLConf(SQLConf.LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED.key -> s"$flag") {
+        if (SQLConf.get.legacyMsSqlServerNumericMappingEnabled) {
+          assert(msSqlServerDialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).isEmpty)
+        } else {
+          assert(msSqlServerDialect.getJDBCType(ShortType).map(_.databaseTypeDefinition).get ==
+            "SMALLINT")
+        }
+      }
+    }
   }
 
   test("SPARK-28152 MsSqlServerDialect catalyst type mapping") {
     val msSqlServerDialect = JdbcDialects.get("jdbc:sqlserver")
     val metadata = new MetadataBuilder().putLong("scale", 1)
-    assert(msSqlServerDialect.getCatalystType(java.sql.Types.SMALLINT, "SMALLINT", 1,
-      metadata).get == ShortType)
-    assert(msSqlServerDialect.getCatalystType(java.sql.Types.REAL, "REAL", 1,
-      metadata).get == FloatType)
+
+    Seq(true, false).foreach { flag =>
+      withSQLConf(SQLConf.LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED.key -> s"$flag") {
+        if (SQLConf.get.legacyMsSqlServerNumericMappingEnabled) {
+          assert(msSqlServerDialect.getCatalystType(java.sql.Types.SMALLINT, "SMALLINT", 1,
+            metadata).isEmpty)
+          assert(msSqlServerDialect.getCatalystType(java.sql.Types.REAL, "REAL", 1,
+            metadata).isEmpty)
+        } else {
+          assert(msSqlServerDialect.getCatalystType(java.sql.Types.SMALLINT, "SMALLINT", 1,
+            metadata).get == ShortType)
+          assert(msSqlServerDialect.getCatalystType(java.sql.Types.REAL, "REAL", 1,
+            metadata).get == FloatType)
+        }
+      }
+    }
   }
 
   test("table exists query by jdbc dialect") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org