You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/11/17 13:07:18 UTC
[spark] branch branch-2.4 updated: [SPARK-29644][SQL][2.4]
Corrected ShortType and ByteType mapping to SmallInt and TinyInt in
JDBCUtils
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new eda6360 [SPARK-29644][SQL][2.4] Corrected ShortType and ByteType mapping to SmallInt and TinyInt in JDBCUtils
eda6360 is described below
commit eda6360354b3d32453672ccdb368053516c8c285
Author: shivsood <sh...@microsoft.com>
AuthorDate: Sun Nov 17 07:06:50 2019 -0600
[SPARK-29644][SQL][2.4] Corrected ShortType and ByteType mapping to SmallInt and TinyInt in JDBCUtils
This is a port SPARK-29644 to 2.4
### What changes were proposed in this pull request?
Corrected ShortType and ByteType mapping to SmallInt and TinyInt, corrected setter methods to set ShortType and ByteType as setShort() and setByte(). Changes in JDBCUtils.scala
Fixed Unit test cases to where applicable and added new E2E test cases in to test table read/write using ShortType and ByteType.
Problems
- In master in JDBCUtils.scala line number 547 and 551 have a problem where ShortType and ByteType are set as Integers rather than set as Short and Byte respectively.
```
case ShortType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setInt(pos + 1, row.getShort(pos))
The issue was pointed out by maropu
case ByteType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
stmt.setInt(pos + 1, row.getByte(pos))
```
- Also at line JDBCUtils.scala 247 TinyInt is interpreted wrongly as IntergetType in getCatalystType()
``` case java.sql.Types.TINYINT => IntegerType ```
- At line 172 ShortType was wrongly interpreted as IntegerType
``` case ShortType => Option(JdbcType("INTEGER", java.sql.Types.SMALLINT)) ```
- All thru out tests, ShortType and ByteType were being interpreted as IntegerTypes.
### Why are the changes needed?
Given type should be set using the right type.
### Does this PR introduce any user-facing change?
Yes.
- User will now be able to create tables where dataframe contains ByteType when using JDBC connector in overwrite mode.
- Users will see a SQL side table are created with the right data type. ShortType in spark will translate to smallint and ByteType to TinyInt on the SQL side. This will resulting in small size of db tables where applicable.
### How was this patch tested?
Corrected Unit test cases where applicable. Validated in CI/CD
Added/fixed test case in MsSqlServerIntegrationSuite.scala, PostgresIntegrationSuite.scala , MySQLIntegrationSuite.scala to
write/read tables from dataframe with cols as shorttype and bytetype. Validated by manual as follows.
```
./build/mvn install -DskipTests
./build/mvn test -Pdocker-integration-tests -pl :spark-docker-integration-tests_2.12
```
Closes #26549 from shivsood/port_29644_2.4.
Authored-by: shivsood <sh...@microsoft.com>
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../sql/jdbc/MsSqlServerIntegrationSuite.scala | 48 ++++++++++++++++++++--
.../spark/sql/jdbc/MySQLIntegrationSuite.scala | 4 +-
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 12 +++---
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 4 +-
.../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 42 +++++++++++++++++++
5 files changed, 97 insertions(+), 13 deletions(-)
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
index efd7ca7..f1cd334 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
@@ -59,7 +59,7 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
"""
|INSERT INTO numbers VALUES (
|0,
- |255, 32767, 2147483647, 9223372036854775807,
+ |127, 32767, 2147483647, 9223372036854775807,
|123456789012345.123456789012345, 123456789012345.123456789012345,
|123456789012345.123456789012345,
|123, 12345.12,
@@ -119,7 +119,7 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
val types = row.toSeq.map(x => x.getClass.toString)
assert(types.length == 12)
assert(types(0).equals("class java.lang.Boolean"))
- assert(types(1).equals("class java.lang.Integer"))
+ assert(types(1).equals("class java.lang.Byte"))
assert(types(2).equals("class java.lang.Short"))
assert(types(3).equals("class java.lang.Integer"))
assert(types(4).equals("class java.lang.Long"))
@@ -131,7 +131,7 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(types(10).equals("class java.math.BigDecimal"))
assert(types(11).equals("class java.math.BigDecimal"))
assert(row.getBoolean(0) == false)
- assert(row.getInt(1) == 255)
+ assert(row.getByte(1) == 127)
assert(row.getShort(2) == 32767)
assert(row.getInt(3) == 2147483647)
assert(row.getLong(4) == 9223372036854775807L)
@@ -202,4 +202,46 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {
df2.write.jdbc(jdbcUrl, "datescopy", new Properties)
df3.write.jdbc(jdbcUrl, "stringscopy", new Properties)
}
+
+ test("SPARK-29644: Write tables with ShortType") {
+ import testImplicits._
+ val df = Seq(-32768.toShort, 0.toShort, 1.toShort, 38.toShort, 32768.toShort).toDF("a")
+ val tablename = "shorttable"
+ df.write
+ .format("jdbc")
+ .mode("overwrite")
+ .option("url", jdbcUrl)
+ .option("dbtable", tablename)
+ .save()
+ val df2 = spark.read
+ .format("jdbc")
+ .option("url", jdbcUrl)
+ .option("dbtable", tablename)
+ .load()
+ assert(df.count == df2.count)
+ val rows = df2.collect()
+ val colType = rows(0).toSeq.map(x => x.getClass.toString)
+ assert(colType(0) == "class java.lang.Short")
+ }
+
+ test("SPARK-29644: Write tables with ByteType") {
+ import testImplicits._
+ val df = Seq(-127.toByte, 0.toByte, 1.toByte, 38.toByte, 128.toByte).toDF("a")
+ val tablename = "bytetable"
+ df.write
+ .format("jdbc")
+ .mode("overwrite")
+ .option("url", jdbcUrl)
+ .option("dbtable", tablename)
+ .save()
+ val df2 = spark.read
+ .format("jdbc")
+ .option("url", jdbcUrl)
+ .option("dbtable", tablename)
+ .load()
+ assert(df.count == df2.count)
+ val rows = df2.collect()
+ val colType = rows(0).toSeq.map(x => x.getClass.toString)
+ assert(colType(0) == "class java.lang.Byte")
+ }
}
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
index 9cd5c4e..5b08093 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala
@@ -82,7 +82,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(types.length == 9)
assert(types(0).equals("class java.lang.Boolean"))
assert(types(1).equals("class java.lang.Long"))
- assert(types(2).equals("class java.lang.Integer"))
+ assert(types(2).equals("class java.lang.Short"))
assert(types(3).equals("class java.lang.Integer"))
assert(types(4).equals("class java.lang.Integer"))
assert(types(5).equals("class java.lang.Long"))
@@ -91,7 +91,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
assert(types(8).equals("class java.lang.Double"))
assert(rows(0).getBoolean(0) == false)
assert(rows(0).getLong(1) == 0x225)
- assert(rows(0).getInt(2) == 17)
+ assert(rows(0).getShort(2) == 17)
assert(rows(0).getInt(3) == 77777)
assert(rows(0).getInt(4) == 123456789)
assert(rows(0).getLong(5) == 123456789012345L)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index c0f628f..f19778f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -165,8 +165,8 @@ object JdbcUtils extends Logging {
case LongType => Option(JdbcType("BIGINT", java.sql.Types.BIGINT))
case DoubleType => Option(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
case FloatType => Option(JdbcType("REAL", java.sql.Types.FLOAT))
- case ShortType => Option(JdbcType("INTEGER", java.sql.Types.SMALLINT))
- case ByteType => Option(JdbcType("BYTE", java.sql.Types.TINYINT))
+ case ShortType => Option(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
+ case ByteType => Option(JdbcType("TINYINT", java.sql.Types.TINYINT))
case BooleanType => Option(JdbcType("BIT(1)", java.sql.Types.BIT))
case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB))
case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB))
@@ -230,7 +230,7 @@ object JdbcUtils extends Logging {
case java.sql.Types.REF => StringType
case java.sql.Types.REF_CURSOR => null
case java.sql.Types.ROWID => LongType
- case java.sql.Types.SMALLINT => IntegerType
+ case java.sql.Types.SMALLINT => ShortType
case java.sql.Types.SQLXML => StringType
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME => TimestampType
@@ -239,7 +239,7 @@ object JdbcUtils extends Logging {
case java.sql.Types.TIMESTAMP => TimestampType
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
=> null
- case java.sql.Types.TINYINT => IntegerType
+ case java.sql.Types.TINYINT => ByteType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
case _ =>
@@ -541,11 +541,11 @@ object JdbcUtils extends Logging {
case ShortType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
- stmt.setInt(pos + 1, row.getShort(pos))
+ stmt.setShort(pos + 1, row.getShort(pos))
case ByteType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
- stmt.setInt(pos + 1, row.getByte(pos))
+ stmt.setByte(pos + 1, row.getByte(pos))
case BooleanType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 2dcedc3..348c1a7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -578,8 +578,8 @@ class JDBCSuite extends QueryTest
assert(rows.length === 1)
assert(rows(0).getInt(0) === 1)
assert(rows(0).getBoolean(1) === false)
- assert(rows(0).getInt(2) === 3)
- assert(rows(0).getInt(3) === 4)
+ assert(rows(0).getByte(2) === 3.toByte)
+ assert(rows(0).getShort(3) === 4.toShort)
assert(rows(0).getLong(4) === 1234567890123L)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index b751ec2..e8155f4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -543,4 +543,46 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
}.getMessage
assert(errMsg.contains("Statement was canceled or the session timed out"))
}
+
+ test("SPARK-29644: Write tables with ShortType") {
+ import testImplicits._
+ val df = Seq(-32768.toShort, 0.toShort, 1.toShort, 38.toShort, 32768.toShort).toDF("a")
+ val tablename = "shorttable"
+ df.write
+ .format("jdbc")
+ .mode("overwrite")
+ .option("url", url)
+ .option("dbtable", tablename)
+ .save()
+ val df2 = spark.read
+ .format("jdbc")
+ .option("url", url)
+ .option("dbtable", tablename)
+ .load()
+ assert(df.count == df2.count)
+ val rows = df2.collect()
+ val colType = rows(0).toSeq.map(x => x.getClass.toString)
+ assert(colType(0) == "class java.lang.Short")
+ }
+
+ test("SPARK-29644: Write tables with ByteType") {
+ import testImplicits._
+ val df = Seq(-127.toByte, 0.toByte, 1.toByte, 38.toByte, 128.toByte).toDF("a")
+ val tablename = "bytetable"
+ df.write
+ .format("jdbc")
+ .mode("overwrite")
+ .option("url", url)
+ .option("dbtable", tablename)
+ .save()
+ val df2 = spark.read
+ .format("jdbc")
+ .option("url", url)
+ .option("dbtable", tablename)
+ .load()
+ assert(df.count == df2.count)
+ val rows = df2.collect()
+ val colType = rows(0).toSeq.map(x => x.getClass.toString)
+ assert(colType(0) == "class java.lang.Byte")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org