You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/24 04:02:37 UTC

[spark] branch branch-3.0 updated: [SPARK-33813][SQL][3.0] Fix the issue that JDBC source can't treat MS SQL Server's spatial types

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ff05e32  [SPARK-33813][SQL][3.0] Fix the issue that JDBC source can't treat MS SQL Server's spatial types
ff05e32 is described below

commit ff05e32a11a6ca5f21dc0b892156aa85003c6c42
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Sun Jan 24 13:01:36 2021 +0900

    [SPARK-33813][SQL][3.0] Fix the issue that JDBC source can't treat MS SQL Server's spatial types
    
    ### What changes were proposed in this pull request?
    
    This PR backports SPARK-33813 (#31283).
    This PR fixes the issue that reading tables which contain spatial datatypes from MS SQL Server fails.
    MS SQL server supports two non-standard spatial JDBC types, `geometry` and  `geography` but Spark SQL can't treat them
    
    ```
    java.sql.SQLException: Unrecognized SQL type -157
     at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getCatalystType(JdbcUtils.scala:251)
     at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$getSchema$1(JdbcUtils.scala:321)
     at scala.Option.getOrElse(Option.scala:189)
     at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:321)
     at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:63)
     at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
     at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
     at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364)
     at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:366)
     at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:355)
     at scala.Option.getOrElse(Option.scala:189)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:355)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
     at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:381)
    ```
    
    Considering the [data type mapping](https://docs.microsoft.com/ja-jp/sql/connect/jdbc/using-basic-data-types?view=sql-server-ver15) says, I think those spatial types can be mapped to Catalyst's `BinaryType`.
    
    ### Why are the changes needed?
    
    To provide better support.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. MS SQL Server users can use `geometry` and `geography` types in datasource tables.
    
    ### How was this patch tested?
    
    New test case added to `MsSqlServerIntegrationSuite`.
    
    Closes #31290 from sarutak/SPARK-33813-branch-3.0.
    
    Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../sql/jdbc/MsSqlServerIntegrationSuite.scala     | 121 +++++++++++++++++++++
 .../apache/spark/sql/jdbc/MsSqlServerDialect.scala |   8 ++
 2 files changed, 129 insertions(+)

diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
index 5738307..7076e18 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
@@ -100,6 +100,37 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
         |'the', 'lazy',
         |'dog')
       """.stripMargin).executeUpdate()
+    conn.prepareStatement(
+      """
+        |CREATE TABLE spatials (
+        |point geometry,
+        |line geometry,
+        |circle geometry,
+        |curve geography,
+        |polygon geometry,
+        |curve_polygon geography,
+        |multi_point geometry,
+        |multi_line geometry,
+        |multi_polygon geometry,
+        |geometry_collection geometry)
+      """.stripMargin).executeUpdate()
+    conn.prepareStatement(
+      """
+        |INSERT INTO spatials VALUES (
+        |'POINT(3 4 7 2.5)',
+        |'LINESTRING(1 0, 0 1, -1 0)',
+        |'CIRCULARSTRING(
+        |  -122.358 47.653, -122.348 47.649, -122.348 47.658, -122.358 47.658, -122.358 47.653)',
+        |'COMPOUNDCURVE(
+        |  CIRCULARSTRING(-122.358 47.653, -122.348 47.649,
+        |    -122.348 47.658, -122.358 47.658, -122.358 47.653))',
+        |'POLYGON((-20 -20, -20 20, 20 20, 20 -20, -20 -20), (10 0, 0 10, 0 -10, 10 0))',
+        |'CURVEPOLYGON((-122.3 47, 122.3 47, 125.7 49, 121 38, -122.3 47))',
+        |'MULTIPOINT((2 3), (7 8 9.5))',
+        |'MULTILINESTRING((0 2, 1 1), (1 0, 1 1))',
+        |'MULTIPOLYGON(((2 2, 2 -2, -2 -2, -2 2, 2 2)),((1 1, 3 1, 3 3, 1 3, 1 1)))',
+        |'GEOMETRYCOLLECTION(LINESTRING(1 1, 3 5),POLYGON((-1 -1, -1 -5, -5 -5, -5 -1, -1 -1)))')
+      """.stripMargin).executeUpdate()
   }
 
   test("Basic test") {
@@ -225,4 +256,94 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
     df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
     df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
   }
+
+  test("SPARK-33813: MsSqlServerDialect should support spatial types") {
+    val df = spark.read.jdbc(jdbcUrl, "spatials", new Properties)
+    val rows = df.collect()
+    assert(rows.length == 1)
+    val row = rows(0)
+    val types = row.toSeq.map(x => x.getClass.toString)
+    assert(types.length == 10)
+    assert(types(0) == "class [B")
+    assert(row.getAs[Array[Byte]](0) ===
+      Array(0, 0, 0, 0, 1, 15, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0,
+        16, 64, 0, 0, 0, 0, 0, 0, 28, 64, 0, 0, 0, 0, 0, 0, 4, 64))
+    assert(types(1) == "class [B")
+    assert(row.getAs[Array[Byte]](1) ===
+      Array[Byte](0, 0, 0, 0, 1, 4, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0,
+        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+        -16, 63, 0, 0, 0, 0, 0, 0, -16, -65, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0,
+        0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 2))
+    assert(types(2) == "class [B")
+    assert(row.getAs[Array[Byte]](2) ===
+      Array[Byte](0, 0, 0, 0, 2, 4, 5, 0, 0, 0, -12, -3, -44, 120, -23, -106,
+        94, -64, -35, 36, 6, -127, -107, -45, 71, 64, -125, -64, -54, -95, 69,
+        -106, 94, -64, 80, -115, -105, 110, 18, -45, 71, 64, -125, -64, -54,
+        -95, 69, -106, 94, -64, 78, 98, 16, 88, 57, -44, 71, 64, -12, -3, -44,
+        120, -23, -106, 94, -64, 78, 98, 16, 88, 57, -44, 71, 64, -12, -3, -44,
+        120, -23, -106, 94, -64, -35, 36, 6, -127, -107, -45, 71, 64, 1, 0, 0,
+        0, 2, 0, 0, 0, 0, 1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 8))
+    assert(types(3) == "class [B")
+    assert(row.getAs[Array[Byte]](3) ===
+      Array[Byte](-26, 16, 0, 0, 2, 4, 5, 0, 0, 0, -35, 36, 6, -127, -107, -45,
+        71, 64, -12, -3, -44, 120, -23, -106, 94, -64, 80, -115, -105, 110, 18,
+        -45, 71, 64, -125, -64, -54, -95, 69, -106, 94, -64, 78, 98, 16, 88, 57,
+        -44, 71, 64, -125, -64, -54, -95, 69, -106, 94, -64, 78, 98, 16, 88, 57,
+        -44, 71, 64, -12, -3, -44, 120, -23, -106, 94, -64, -35, 36, 6, -127, -107,
+        -45, 71, 64, -12, -3, -44, 120, -23, -106, 94, -64, 1, 0, 0, 0, 3, 0, 0,
+        0, 0, 1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 9, 2, 0, 0, 0, 3, 1))
+    assert(types(5) == "class [B")
+    assert(row.getAs[Array[Byte]](4) ===
+      Array[Byte](0, 0, 0, 0, 1, 4, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 52, -64, 0, 0,
+        0, 0, 0, 0, 52, -64, 0, 0, 0, 0, 0, 0, 52, -64, 0, 0, 0, 0, 0, 0, 52, 64,
+        0, 0, 0, 0, 0, 0, 52, 64, 0, 0, 0, 0, 0, 0, 52, 64, 0, 0, 0, 0, 0, 0, 52,
+        64, 0, 0, 0, 0, 0, 0, 52, -64, 0, 0, 0, 0, 0, 0, 52, -64, 0, 0, 0, 0, 0,
+        0, 52, -64, 0, 0, 0, 0, 0, 0, 36, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+        0, 0, 36, -64, 0, 0, 0, 0, 0, 0, 36, 64, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0,
+        0, 2, 0, 0, 0, 0, 0, 5, 0, 0, 0, 1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 3))
+    assert(types(6) === "class [B")
+    assert(row.getAs[Array[Byte]](5) ===
+      Array[Byte](-26, 16, 0, 0, 2, 4, 5, 0, 0, 0, 0, 0, 0, 0, 0, -128, 71, 64, 51,
+        51, 51, 51, 51, -109, 94, -64, 0, 0, 0, 0, 0, -128, 71, 64, 51, 51, 51, 51,
+        51, -109, 94, 64, 0, 0, 0, 0, 0, -128, 72, 64, -51, -52, -52, -52, -52, 108,
+        95, 64, 0, 0, 0, 0, 0, 0, 67, 64, 0, 0, 0, 0, 0, 64, 94, 64, 0, 0, 0, 0, 0,
+        -128, 71, 64, 51, 51, 51, 51, 51, -109, 94, -64, 1, 0, 0, 0, 1, 0, 0, 0, 0,
+        1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 10))
+    assert(types(6) === "class [B")
+    assert(row.getAs[Array[Byte]](6) ===
+      Array[Byte](0, 0, 0, 0, 1, 5, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0,
+        0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 28, 64, 0, 0, 0, 0, 0, 0, 32, 64, 0, 0, 0, 0,
+        0, 0, -8, -1, 0, 0, 0, 0, 0, 0, 35, 64, 2, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0,
+        0, 0, 3, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 1,
+        0, 0, 0, 0, 1, 0, 0, 0, 1))
+    assert(types(6) === "class [B")
+    assert(row.getAs[Array[Byte]](7) ===
+      Array[Byte](0, 0, 0, 0, 1, 4, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+        0, 0, 0, 64, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0, 0,
+        0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0,
+        0, 0, 0, 0, -16, 63, 2, 0, 0, 0, 1, 0, 0, 0, 0, 1, 2, 0, 0, 0, 3, 0, 0, 0,
+        -1, -1, -1, -1, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 1, 0, 0, 0, 2))
+    assert(types(6) === "class [B")
+    assert(row.getAs[Array[Byte]](8) ===
+      Array[Byte](0, 0, 0, 0, 1, 0, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0,
+        0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, -64, 0, 0, 0,
+        0, 0, 0, 0, -64, 0, 0, 0, 0, 0, 0, 0, -64, 0, 0, 0, 0, 0, 0, 0, -64, 0, 0,
+        0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0,
+        0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 8, 64, 0,
+        0, 0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 8, 64, 0,
+        0, 0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, -16, 63,
+        0, 0, 0, 0, 0, 0, -16, 63, 2, 0, 0, 0, 2, 0, 0, 0, 0, 2, 5, 0, 0, 0, 3, 0,
+        0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 1, 0, 0, 0, 3))
+    assert(types(6) === "class [B")
+    assert(row.getAs[Array[Byte]](9) ===
+      Array[Byte](0, 0, 0, 0, 1, 4, 7, 0, 0, 0, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0, 0,
+        0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 20, 64, 0, 0,
+        0, 0, 0, 0, -16, -65, 0, 0, 0, 0, 0, 0, -16, -65, 0, 0, 0, 0, 0, 0, -16, -65,
+        0, 0, 0, 0, 0, 0, 20, -64, 0, 0, 0, 0, 0, 0, 20, -64, 0, 0, 0, 0, 0, 0, 20,
+        -64, 0, 0, 0, 0, 0, 0, 20, -64, 0, 0, 0, 0, 0, 0, -16, -65, 0, 0, 0, 0, 0, 0,
+        -16, -65, 0, 0, 0, 0, 0, 0, -16, -65, 2, 0, 0, 0, 1, 0, 0, 0, 0, 2, 2, 0, 0,
+        0, 3, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0,
+        0, 0, 0, 1, 0, 0, 0, 3))
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 72284b5..72cdc68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -25,6 +25,13 @@ import org.apache.spark.sql.types._
 
 private object MsSqlServerDialect extends JdbcDialect {
 
+  // Special JDBC types in Microsoft SQL Server.
+  // https://github.com/microsoft/mssql-jdbc/blob/v7.2.1/src/main/java/microsoft/sql/Types.java
+  private object SpecificTypes {
+    val GEOMETRY = -157
+    val GEOGRAPHY = -158
+  }
+
   override def canHandle(url: String): Boolean =
     url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")
 
@@ -40,6 +47,7 @@ private object MsSqlServerDialect extends JdbcDialect {
         sqlType match {
           case java.sql.Types.SMALLINT => Some(ShortType)
           case java.sql.Types.REAL => Some(FloatType)
+          case SpecificTypes.GEOMETRY | SpecificTypes.GEOGRAPHY => Some(BinaryType)
           case _ => None
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org