You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/07/17 22:10:39 UTC
[spark] branch master updated: [SPARK-28097][SQL] Map ByteType to
SMALLINT for PostgresDialect
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new eb5dc746 [SPARK-28097][SQL] Map ByteType to SMALLINT for PostgresDialect
eb5dc746 is described below
commit eb5dc746c250d614d7ebf9b310fe531f154c0fee
Author: Seth Fitzsimmons <se...@mojodna.net>
AuthorDate: Wed Jul 17 15:10:01 2019 -0700
[SPARK-28097][SQL] Map ByteType to SMALLINT for PostgresDialect
## What changes were proposed in this pull request?
PostgreSQL doesn't have `TINYINT`, which would map directly, but `SMALLINT`s are sufficient for uni-directional translation.
A side-effect of this fix is that `AggregatedDialect` is now usable with multiple dialects targeting `jdbc:postgresql`, as `PostgresDialect.getJDBCType` no longer throws (for which reason backporting this fix would be lovely):
https://github.com/apache/spark/blob/1217996f1574f758d8cccc1c4e3846452d24b35b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala#L42
`dialects.flatMap` currently throws on the first attempt to get a JDBC type preventing subsequent dialects in the chain from providing an alternative.
## How was this patch tested?
Unit tests.
Closes #24845 from mojodna/postgres-byte-type-mapping.
Authored-by: Seth Fitzsimmons <se...@mojodna.net>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 13 +++++++++++++
.../scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 3 +--
.../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 5 +----
3 files changed, 15 insertions(+), 6 deletions(-)
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index 462f88f..89da9a1 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -206,4 +206,17 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
""".stripMargin.replaceAll("\n", " "))
assert(sql("select c1, c3 from queryOption").collect.toSet == expectedResult)
}
+
+ test("write byte as smallint") {
+ sqlContext.createDataFrame(Seq((1.toByte, 2.toShort)))
+ .write.jdbc(jdbcUrl, "byte_to_smallint_test", new Properties)
+ val df = sqlContext.read.jdbc(jdbcUrl, "byte_to_smallint_test", new Properties)
+ val schema = df.schema
+ assert(schema.head.dataType == ShortType)
+ assert(schema(1).dataType == ShortType)
+ val rows = df.collect()
+ assert(rows.length === 1)
+ assert(rows(0).getShort(0) === 1)
+ assert(rows(0).getShort(1) === 2)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index 5be45c9..2645e4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -73,14 +73,13 @@ private object PostgresDialect extends JdbcDialect {
case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN))
case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT))
case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE))
- case ShortType => Some(JdbcType("SMALLINT", Types.SMALLINT))
+ case ShortType | ByteType => Some(JdbcType("SMALLINT", Types.SMALLINT))
case t: DecimalType => Some(
JdbcType(s"NUMERIC(${t.precision},${t.scale})", java.sql.Types.NUMERIC))
case ArrayType(et, _) if et.isInstanceOf[AtomicType] =>
getJDBCType(et).map(_.databaseTypeDefinition)
.orElse(JdbcUtils.getCommonJDBCType(et).map(_.databaseTypeDefinition))
.map(typeName => JdbcType(s"$typeName[]", java.sql.Types.ARRAY))
- case ByteType => throw new IllegalArgumentException(s"Unsupported type in postgresql: $dt");
case _ => None
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index a47fc18..89eaac8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -857,10 +857,7 @@ class JDBCSuite extends QueryTest
Some(ArrayType(DecimalType.SYSTEM_DEFAULT)))
assert(Postgres.getJDBCType(FloatType).map(_.databaseTypeDefinition).get == "FLOAT4")
assert(Postgres.getJDBCType(DoubleType).map(_.databaseTypeDefinition).get == "FLOAT8")
- val errMsg = intercept[IllegalArgumentException] {
- Postgres.getJDBCType(ByteType)
- }
- assert(errMsg.getMessage contains "Unsupported type in postgresql: ByteType")
+ assert(Postgres.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == "SMALLINT")
}
test("DerbyDialect jdbc type mapping") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org