You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/29 17:11:50 UTC

spark git commit: [SPARK-22291][SQL] Conversion error when transforming array types of uuid, inet and cidr to StingType in PostgreSQL

Repository: spark
Updated Branches:
  refs/heads/master 544a1ba67 -> bc7ca9786


[SPARK-22291][SQL] Conversion error when transforming array types of uuid, inet and cidr to StingType in PostgreSQL

## What changes were proposed in this pull request?

This PR fixes the conversion error when reads data from a PostgreSQL table that contains columns of `uuid[]`, `inet[]` and `cidr[]` data types.

For example, create a table with the uuid[] data type, and insert the test data.
```SQL
CREATE TABLE users
(
    id smallint NOT NULL,
    name character varying(50),
    user_ids uuid[],
    PRIMARY KEY (id)
)

INSERT INTO users ("id", "name","user_ids")
VALUES (1, 'foo', ARRAY
    ['7be8aaf8-650e-4dbb-8186-0a749840ecf2'
    ,'205f9bfc-018c-4452-a605-609c0cfad228']::UUID[]
)
```
Then it will throw the following exceptions when trying to load the data.
```
java.lang.ClassCastException: [Ljava.util.UUID; cannot be cast to [Ljava.lang.String;
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:459)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$14.apply(JdbcUtils.scala:458)
...
```

## How was this patch tested?

Added test in `PostgresIntegrationSuite`.

Author: Jen-Ming Chung <je...@gmail.com>

Closes #19567 from jmchung/SPARK-22291.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc7ca978
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc7ca978
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc7ca978

Branch: refs/heads/master
Commit: bc7ca9786e162e33f29d57c4aacb830761b97221
Parents: 544a1ba
Author: Jen-Ming Chung <je...@gmail.com>
Authored: Sun Oct 29 18:11:48 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sun Oct 29 18:11:48 2017 +0100

----------------------------------------------------------------------
 .../sql/jdbc/PostgresIntegrationSuite.scala     | 37 ++++++++++++++++++--
 .../execution/datasources/jdbc/JdbcUtils.scala  |  5 +--
 2 files changed, 38 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bc7ca978/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index eb3c458..48aba90 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -60,7 +60,22 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
       "(id integer, tstz TIMESTAMP WITH TIME ZONE, ttz TIME WITH TIME ZONE)")
       .executeUpdate()
     conn.prepareStatement("INSERT INTO ts_with_timezone VALUES " +
-      "(1, TIMESTAMP WITH TIME ZONE '2016-08-12 10:22:31.949271-07', TIME WITH TIME ZONE '17:22:31.949271+00')")
+      "(1, TIMESTAMP WITH TIME ZONE '2016-08-12 10:22:31.949271-07', " +
+      "TIME WITH TIME ZONE '17:22:31.949271+00')")
+      .executeUpdate()
+
+    conn.prepareStatement("CREATE TABLE st_with_array (c0 uuid, c1 inet, c2 cidr," +
+      "c3 json, c4 jsonb, c5 uuid[], c6 inet[], c7 cidr[], c8 json[], c9 jsonb[])")
+      .executeUpdate()
+    conn.prepareStatement("INSERT INTO st_with_array VALUES ( " +
+      "'0a532531-cdf1-45e3-963d-5de90b6a30f1', '172.168.22.1', '192.168.100.128/25', " +
+      """'{"a": "foo", "b": "bar"}', '{"a": 1, "b": 2}', """ +
+      "ARRAY['7be8aaf8-650e-4dbb-8186-0a749840ecf2'," +
+      "'205f9bfc-018c-4452-a605-609c0cfad228']::uuid[], ARRAY['172.16.0.41', " +
+      "'172.16.0.42']::inet[], ARRAY['192.168.0.0/24', '10.1.0.0/16']::cidr[], " +
+      """ARRAY['{"a": "foo", "b": "bar"}', '{"a": 1, "b": 2}']::json[], """ +
+      """ARRAY['{"a": 1, "b": 2, "c": 3}']::jsonb[])"""
+    )
       .executeUpdate()
   }
 
@@ -134,11 +149,29 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
     assert(schema(1).dataType == ShortType)
   }
 
-  test("SPARK-20557: column type TIMESTAMP with TIME ZONE and TIME with TIME ZONE should be recognized") {
+  test("SPARK-20557: column type TIMESTAMP with TIME ZONE and TIME with TIME ZONE " +
+    "should be recognized") {
     val dfRead = sqlContext.read.jdbc(jdbcUrl, "ts_with_timezone", new Properties)
     val rows = dfRead.collect()
     val types = rows(0).toSeq.map(x => x.getClass.toString)
     assert(types(1).equals("class java.sql.Timestamp"))
     assert(types(2).equals("class java.sql.Timestamp"))
   }
+
+  test("SPARK-22291: Conversion error when transforming array types of " +
+    "uuid, inet and cidr to StingType in PostgreSQL") {
+    val df = sqlContext.read.jdbc(jdbcUrl, "st_with_array", new Properties)
+    val rows = df.collect()
+    assert(rows(0).getString(0) == "0a532531-cdf1-45e3-963d-5de90b6a30f1")
+    assert(rows(0).getString(1) == "172.168.22.1")
+    assert(rows(0).getString(2) == "192.168.100.128/25")
+    assert(rows(0).getString(3) == "{\"a\": \"foo\", \"b\": \"bar\"}")
+    assert(rows(0).getString(4) == "{\"a\": 1, \"b\": 2}")
+    assert(rows(0).getSeq(5) == Seq("7be8aaf8-650e-4dbb-8186-0a749840ecf2",
+      "205f9bfc-018c-4452-a605-609c0cfad228"))
+    assert(rows(0).getSeq(6) == Seq("172.16.0.41", "172.16.0.42"))
+    assert(rows(0).getSeq(7) == Seq("192.168.0.0/24", "10.1.0.0/16"))
+    assert(rows(0).getSeq(8) == Seq("""{"a": "foo", "b": "bar"}""", """{"a": 1, "b": 2}"""))
+    assert(rows(0).getSeq(9) == Seq("""{"a": 1, "b": 2, "c": 3}"""))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bc7ca978/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 9debc4f..75c94fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -456,8 +456,9 @@ object JdbcUtils extends Logging {
 
         case StringType =>
           (array: Object) =>
-            array.asInstanceOf[Array[java.lang.String]]
-              .map(UTF8String.fromString)
+            // some underling types are not String such as uuid, inet, cidr, etc.
+            array.asInstanceOf[Array[java.lang.Object]]
+              .map(obj => if (obj == null) null else UTF8String.fromString(obj.toString))
 
         case DateType =>
           (array: Object) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org