You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/10/08 08:12:40 UTC

spark git commit: [SPARK-7869][SQL] Adding Postgres JSON and JSONb data types support

Repository: spark
Updated Branches:
  refs/heads/master 3aff0866a -> b8f849b54


[SPARK-7869][SQL] Adding Postgres JSON and JSONb data types support

This PR addresses [SPARK-7869](https://issues.apache.org/jira/browse/SPARK-7869)

Before the patch, attempt to load the table from Postgres with JSON/JSONb datatype caused error `java.sql.SQLException: Unsupported type 1111`
Postgres data types JSON and JSONb are now mapped to String on Spark side thus they can be loaded into DF and processed on Spark side

Example

Postgres:
```
create table test_json  (id int, value json);
create table test_jsonb (id int, value jsonb);

insert into test_json (id, value) values
(1, '{"field1":"value1","field2":"value2","field3":[1,2,3]}'::json),
(2, '{"field1":"value3","field2":"value4","field3":[4,5,6]}'::json),
(3, '{"field3":"value5","field4":"value6","field3":[7,8,9]}'::json);

insert into test_jsonb (id, value) values
(4, '{"field1":"value1","field2":"value2","field3":[1,2,3]}'::jsonb),
(5, '{"field1":"value3","field2":"value4","field3":[4,5,6]}'::jsonb),
(6, '{"field3":"value5","field4":"value6","field3":[7,8,9]}'::jsonb);
```

PySpark:
```
>>> import json
>>> df1 = sqlContext.read.jdbc("jdbc:postgresql://127.0.0.1:5432/test?user=testuser", "test_json")
>>> df1.map(lambda x: (x.id, json.loads(x.value))).map(lambda (id, value): (id, value.get('field3'))).collect()
[(1, [1, 2, 3]), (2, [4, 5, 6]), (3, [7, 8, 9])]
>>> df2 = sqlContext.read.jdbc("jdbc:postgresql://127.0.0.1:5432/test?user=testuser", "test_jsonb")
>>> df2.map(lambda x: (x.id, json.loads(x.value))).map(lambda (id, value): (id, value.get('field1'))).collect()
[(4, u'value1'), (5, u'value3'), (6, None)]
```

Author: 0x0FFF <pr...@gmail.com>

Closes #8948 from 0x0FFF/SPARK-7869.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8f849b5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8f849b5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8f849b5

Branch: refs/heads/master
Commit: b8f849b546739d3e4339563557509a51417fcb68
Parents: 3aff086
Author: 0x0FFF <pr...@gmail.com>
Authored: Wed Oct 7 23:12:35 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Oct 7 23:12:35 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala   | 4 ++++
 .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala  | 7 +++++++
 2 files changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b8f849b5/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 5abbfbf..0cd356f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -202,6 +202,10 @@ case object PostgresDialect extends JdbcDialect {
       Some(StringType)
     } else if (sqlType == Types.OTHER && typeName.equals("inet")) {
       Some(StringType)
+    } else if (sqlType == Types.OTHER && typeName.equals("json")) {
+      Some(StringType)
+    } else if (sqlType == Types.OTHER && typeName.equals("jsonb")) {
+      Some(StringType)
     } else None
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b8f849b5/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index c4b039a..bbf705c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -452,6 +452,13 @@ class JDBCSuite extends SparkFunSuite with BeforeAndAfter with SharedSQLContext
     assert(db2Dialect.getJDBCType(BooleanType).map(_.databaseTypeDefinition).get == "CHAR(1)")
   }
 
+  test("PostgresDialect type mapping") {
+    val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")
+    // SPARK-7869: Testing JSON types handling
+    assert(Postgres.getCatalystType(java.sql.Types.OTHER, "json", 1, null) === Some(StringType))
+    assert(Postgres.getCatalystType(java.sql.Types.OTHER, "jsonb", 1, null) === Some(StringType))
+  }
+
   test("table exists query by jdbc dialect") {
     val MySQL = JdbcDialects.get("jdbc:mysql://127.0.0.1/db")
     val Postgres = JdbcDialects.get("jdbc:postgresql://127.0.0.1/db")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org