You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/24 04:02:37 UTC
[spark] branch branch-3.0 updated: [SPARK-33813][SQL][3.0] Fix the
issue that JDBC source can't treat MS SQL Server's spatial types
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ff05e32 [SPARK-33813][SQL][3.0] Fix the issue that JDBC source can't treat MS SQL Server's spatial types
ff05e32 is described below
commit ff05e32a11a6ca5f21dc0b892156aa85003c6c42
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Sun Jan 24 13:01:36 2021 +0900
[SPARK-33813][SQL][3.0] Fix the issue that JDBC source can't treat MS SQL Server's spatial types
### What changes were proposed in this pull request?
This PR backports SPARK-33813 (#31283).
This PR fixes the issue that reading tables which contain spatial datatypes from MS SQL Server fails.
MS SQL server supports two non-standard spatial JDBC types, `geometry` and `geography` but Spark SQL can't treat them
```
java.sql.SQLException: Unrecognized SQL type -157
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getCatalystType(JdbcUtils.scala:251)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$getSchema$1(JdbcUtils.scala:321)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:321)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:63)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:366)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:355)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:355)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:381)
```
Considering the [data type mapping](https://docs.microsoft.com/ja-jp/sql/connect/jdbc/using-basic-data-types?view=sql-server-ver15) says, I think those spatial types can be mapped to Catalyst's `BinaryType`.
### Why are the changes needed?
To provide better support.
### Does this PR introduce _any_ user-facing change?
Yes. MS SQL Server users can use `geometry` and `geography` types in datasource tables.
### How was this patch tested?
New test case added to `MsSqlServerIntegrationSuite`.
Closes #31290 from sarutak/SPARK-33813-branch-3.0.
Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../sql/jdbc/MsSqlServerIntegrationSuite.scala | 121 +++++++++++++++++++++
.../apache/spark/sql/jdbc/MsSqlServerDialect.scala | 8 ++
2 files changed, 129 insertions(+)
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
index 5738307..7076e18 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
@@ -100,6 +100,37 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
|'the', 'lazy',
|'dog')
""".stripMargin).executeUpdate()
+ conn.prepareStatement(
+ """
+ |CREATE TABLE spatials (
+ |point geometry,
+ |line geometry,
+ |circle geometry,
+ |curve geography,
+ |polygon geometry,
+ |curve_polygon geography,
+ |multi_point geometry,
+ |multi_line geometry,
+ |multi_polygon geometry,
+ |geometry_collection geometry)
+ """.stripMargin).executeUpdate()
+ conn.prepareStatement(
+ """
+ |INSERT INTO spatials VALUES (
+ |'POINT(3 4 7 2.5)',
+ |'LINESTRING(1 0, 0 1, -1 0)',
+ |'CIRCULARSTRING(
+ | -122.358 47.653, -122.348 47.649, -122.348 47.658, -122.358 47.658, -122.358 47.653)',
+ |'COMPOUNDCURVE(
+ | CIRCULARSTRING(-122.358 47.653, -122.348 47.649,
+ | -122.348 47.658, -122.358 47.658, -122.358 47.653))',
+ |'POLYGON((-20 -20, -20 20, 20 20, 20 -20, -20 -20), (10 0, 0 10, 0 -10, 10 0))',
+ |'CURVEPOLYGON((-122.3 47, 122.3 47, 125.7 49, 121 38, -122.3 47))',
+ |'MULTIPOINT((2 3), (7 8 9.5))',
+ |'MULTILINESTRING((0 2, 1 1), (1 0, 1 1))',
+ |'MULTIPOLYGON(((2 2, 2 -2, -2 -2, -2 2, 2 2)),((1 1, 3 1, 3 3, 1 3, 1 1)))',
+ |'GEOMETRYCOLLECTION(LINESTRING(1 1, 3 5),POLYGON((-1 -1, -1 -5, -5 -5, -5 -1, -1 -1)))')
+ """.stripMargin).executeUpdate()
}
test("Basic test") {
@@ -225,4 +256,94 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
}
+
+ test("SPARK-33813: MsSqlServerDialect should support spatial types") {
+ val df = spark.read.jdbc(jdbcUrl, "spatials", new Properties)
+ val rows = df.collect()
+ assert(rows.length == 1)
+ val row = rows(0)
+ val types = row.toSeq.map(x => x.getClass.toString)
+ assert(types.length == 10)
+ assert(types(0) == "class [B")
+ assert(row.getAs[Array[Byte]](0) ===
+ Array(0, 0, 0, 0, 1, 15, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0,
+ 16, 64, 0, 0, 0, 0, 0, 0, 28, 64, 0, 0, 0, 0, 0, 0, 4, 64))
+ assert(types(1) == "class [B")
+ assert(row.getAs[Array[Byte]](1) ===
+ Array[Byte](0, 0, 0, 0, 1, 4, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ -16, 63, 0, 0, 0, 0, 0, 0, -16, -65, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0,
+ 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 2))
+ assert(types(2) == "class [B")
+ assert(row.getAs[Array[Byte]](2) ===
+ Array[Byte](0, 0, 0, 0, 2, 4, 5, 0, 0, 0, -12, -3, -44, 120, -23, -106,
+ 94, -64, -35, 36, 6, -127, -107, -45, 71, 64, -125, -64, -54, -95, 69,
+ -106, 94, -64, 80, -115, -105, 110, 18, -45, 71, 64, -125, -64, -54,
+ -95, 69, -106, 94, -64, 78, 98, 16, 88, 57, -44, 71, 64, -12, -3, -44,
+ 120, -23, -106, 94, -64, 78, 98, 16, 88, 57, -44, 71, 64, -12, -3, -44,
+ 120, -23, -106, 94, -64, -35, 36, 6, -127, -107, -45, 71, 64, 1, 0, 0,
+ 0, 2, 0, 0, 0, 0, 1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 8))
+ assert(types(3) == "class [B")
+ assert(row.getAs[Array[Byte]](3) ===
+ Array[Byte](-26, 16, 0, 0, 2, 4, 5, 0, 0, 0, -35, 36, 6, -127, -107, -45,
+ 71, 64, -12, -3, -44, 120, -23, -106, 94, -64, 80, -115, -105, 110, 18,
+ -45, 71, 64, -125, -64, -54, -95, 69, -106, 94, -64, 78, 98, 16, 88, 57,
+ -44, 71, 64, -125, -64, -54, -95, 69, -106, 94, -64, 78, 98, 16, 88, 57,
+ -44, 71, 64, -12, -3, -44, 120, -23, -106, 94, -64, -35, 36, 6, -127, -107,
+ -45, 71, 64, -12, -3, -44, 120, -23, -106, 94, -64, 1, 0, 0, 0, 3, 0, 0,
+ 0, 0, 1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 9, 2, 0, 0, 0, 3, 1))
+ assert(types(5) == "class [B")
+ assert(row.getAs[Array[Byte]](4) ===
+ Array[Byte](0, 0, 0, 0, 1, 4, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 52, -64, 0, 0,
+ 0, 0, 0, 0, 52, -64, 0, 0, 0, 0, 0, 0, 52, -64, 0, 0, 0, 0, 0, 0, 52, 64,
+ 0, 0, 0, 0, 0, 0, 52, 64, 0, 0, 0, 0, 0, 0, 52, 64, 0, 0, 0, 0, 0, 0, 52,
+ 64, 0, 0, 0, 0, 0, 0, 52, -64, 0, 0, 0, 0, 0, 0, 52, -64, 0, 0, 0, 0, 0,
+ 0, 52, -64, 0, 0, 0, 0, 0, 0, 36, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 36, -64, 0, 0, 0, 0, 0, 0, 36, 64, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0,
+ 0, 2, 0, 0, 0, 0, 0, 5, 0, 0, 0, 1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 3))
+ assert(types(6) === "class [B")
+ assert(row.getAs[Array[Byte]](5) ===
+ Array[Byte](-26, 16, 0, 0, 2, 4, 5, 0, 0, 0, 0, 0, 0, 0, 0, -128, 71, 64, 51,
+ 51, 51, 51, 51, -109, 94, -64, 0, 0, 0, 0, 0, -128, 71, 64, 51, 51, 51, 51,
+ 51, -109, 94, 64, 0, 0, 0, 0, 0, -128, 72, 64, -51, -52, -52, -52, -52, 108,
+ 95, 64, 0, 0, 0, 0, 0, 0, 67, 64, 0, 0, 0, 0, 0, 64, 94, 64, 0, 0, 0, 0, 0,
+ -128, 71, 64, 51, 51, 51, 51, 51, -109, 94, -64, 1, 0, 0, 0, 1, 0, 0, 0, 0,
+ 1, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 10))
+ assert(types(6) === "class [B")
+ assert(row.getAs[Array[Byte]](6) ===
+ Array[Byte](0, 0, 0, 0, 1, 5, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0,
+ 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 28, 64, 0, 0, 0, 0, 0, 0, 32, 64, 0, 0, 0, 0,
+ 0, 0, -8, -1, 0, 0, 0, 0, 0, 0, 35, 64, 2, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0,
+ 0, 0, 3, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 1,
+ 0, 0, 0, 0, 1, 0, 0, 0, 1))
+ assert(types(6) === "class [B")
+ assert(row.getAs[Array[Byte]](7) ===
+ Array[Byte](0, 0, 0, 0, 1, 4, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0, 0,
+ 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0,
+ 0, 0, 0, 0, -16, 63, 2, 0, 0, 0, 1, 0, 0, 0, 0, 1, 2, 0, 0, 0, 3, 0, 0, 0,
+ -1, -1, -1, -1, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 1, 0, 0, 0, 2))
+ assert(types(6) === "class [B")
+ assert(row.getAs[Array[Byte]](8) ===
+ Array[Byte](0, 0, 0, 0, 1, 0, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0,
+ 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, -64, 0, 0, 0,
+ 0, 0, 0, 0, -64, 0, 0, 0, 0, 0, 0, 0, -64, 0, 0, 0, 0, 0, 0, 0, -64, 0, 0,
+ 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 64, 0, 0,
+ 0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 8, 64, 0,
+ 0, 0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 8, 64, 0,
+ 0, 0, 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, -16, 63,
+ 0, 0, 0, 0, 0, 0, -16, 63, 2, 0, 0, 0, 2, 0, 0, 0, 0, 2, 5, 0, 0, 0, 3, 0,
+ 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 1, 0, 0, 0, 3))
+ assert(types(6) === "class [B")
+ assert(row.getAs[Array[Byte]](9) ===
+ Array[Byte](0, 0, 0, 0, 1, 4, 7, 0, 0, 0, 0, 0, 0, 0, 0, 0, -16, 63, 0, 0, 0,
+ 0, 0, 0, -16, 63, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 20, 64, 0, 0,
+ 0, 0, 0, 0, -16, -65, 0, 0, 0, 0, 0, 0, -16, -65, 0, 0, 0, 0, 0, 0, -16, -65,
+ 0, 0, 0, 0, 0, 0, 20, -64, 0, 0, 0, 0, 0, 0, 20, -64, 0, 0, 0, 0, 0, 0, 20,
+ -64, 0, 0, 0, 0, 0, 0, 20, -64, 0, 0, 0, 0, 0, 0, -16, -65, 0, 0, 0, 0, 0, 0,
+ -16, -65, 0, 0, 0, 0, 0, 0, -16, -65, 2, 0, 0, 0, 1, 0, 0, 0, 0, 2, 2, 0, 0,
+ 0, 3, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0,
+ 0, 0, 0, 1, 0, 0, 0, 3))
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 72284b5..72cdc68 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -25,6 +25,13 @@ import org.apache.spark.sql.types._
private object MsSqlServerDialect extends JdbcDialect {
+ // Special JDBC types in Microsoft SQL Server.
+ // https://github.com/microsoft/mssql-jdbc/blob/v7.2.1/src/main/java/microsoft/sql/Types.java
+ private object SpecificTypes {
+ val GEOMETRY = -157
+ val GEOGRAPHY = -158
+ }
+
override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")
@@ -40,6 +47,7 @@ private object MsSqlServerDialect extends JdbcDialect {
sqlType match {
case java.sql.Types.SMALLINT => Some(ShortType)
case java.sql.Types.REAL => Some(FloatType)
+ case SpecificTypes.GEOMETRY | SpecificTypes.GEOGRAPHY => Some(BinaryType)
case _ => None
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org