You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/07/17 22:10:39 UTC

[spark] branch master updated: [SPARK-28097][SQL] Map ByteType to SMALLINT for PostgresDialect

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new eb5dc746 [SPARK-28097][SQL] Map ByteType to SMALLINT for PostgresDialect
eb5dc746 is described below

commit eb5dc746c250d614d7ebf9b310fe531f154c0fee
Author: Seth Fitzsimmons <se...@mojodna.net>
AuthorDate: Wed Jul 17 15:10:01 2019 -0700

    [SPARK-28097][SQL] Map ByteType to SMALLINT for PostgresDialect
    
    ## What changes were proposed in this pull request?
    
    PostgreSQL doesn't have `TINYINT`, which would map directly, but `SMALLINT`s are sufficient for uni-directional translation.
    
    A side-effect of this fix is that `AggregatedDialect` is now usable with multiple dialects targeting `jdbc:postgresql`, as `PostgresDialect.getJDBCType` no longer throws (for which reason backporting this fix would be lovely):
    
    https://github.com/apache/spark/blob/1217996f1574f758d8cccc1c4e3846452d24b35b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala#L42
    
    `dialects.flatMap` currently throws on the first attempt to get a JDBC type preventing subsequent dialects in the chain from providing an alternative.
    
    ## How was this patch tested?
    
    Unit tests.
    
    Closes #24845 from mojodna/postgres-byte-type-mapping.
    
    Authored-by: Seth Fitzsimmons <se...@mojodna.net>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/sql/jdbc/PostgresIntegrationSuite.scala    | 13 +++++++++++++
 .../scala/org/apache/spark/sql/jdbc/PostgresDialect.scala   |  3 +--
 .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala    |  5 +----
 3 files changed, 15 insertions(+), 6 deletions(-)

diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index 462f88f..89da9a1 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -206,4 +206,17 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
        """.stripMargin.replaceAll("\n", " "))
     assert(sql("select c1, c3 from queryOption").collect.toSet == expectedResult)
   }
+
+  test("write byte as smallint") {
+    sqlContext.createDataFrame(Seq((1.toByte, 2.toShort)))
+      .write.jdbc(jdbcUrl, "byte_to_smallint_test", new Properties)
+    val df = sqlContext.read.jdbc(jdbcUrl, "byte_to_smallint_test", new Properties)
+    val schema = df.schema
+    assert(schema.head.dataType == ShortType)
+    assert(schema(1).dataType == ShortType)
+    val rows = df.collect()
+    assert(rows.length === 1)
+    assert(rows(0).getShort(0) === 1)
+    assert(rows(0).getShort(1) === 2)
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index 5be45c9..2645e4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -73,14 +73,13 @@ private object PostgresDialect extends JdbcDialect {
     case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN))
     case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT))
     case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE))
-    case ShortType => Some(JdbcType("SMALLINT", Types.SMALLINT))
+    case ShortType | ByteType => Some(JdbcType("SMALLINT", Types.SMALLINT))
     case t: DecimalType => Some(
       JdbcType(s"NUMERIC(${t.precision},${t.scale})", java.sql.Types.NUMERIC))
     case ArrayType(et, _) if et.isInstanceOf[AtomicType] =>
       getJDBCType(et).map(_.databaseTypeDefinition)
         .orElse(JdbcUtils.getCommonJDBCType(et).map(_.databaseTypeDefinition))
         .map(typeName => JdbcType(s"$typeName[]", java.sql.Types.ARRAY))
-    case ByteType => throw new IllegalArgumentException(s"Unsupported type in postgresql: $dt");
     case _ => None
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index a47fc18..89eaac8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -857,10 +857,7 @@ class JDBCSuite extends QueryTest
       Some(ArrayType(DecimalType.SYSTEM_DEFAULT)))
     assert(Postgres.getJDBCType(FloatType).map(_.databaseTypeDefinition).get == "FLOAT4")
     assert(Postgres.getJDBCType(DoubleType).map(_.databaseTypeDefinition).get == "FLOAT8")
-    val errMsg = intercept[IllegalArgumentException] {
-      Postgres.getJDBCType(ByteType)
-    }
-    assert(errMsg.getMessage contains "Unsupported type in postgresql: ByteType")
+    assert(Postgres.getJDBCType(ByteType).map(_.databaseTypeDefinition).get == "SMALLINT")
   }
 
   test("DerbyDialect jdbc type mapping") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org