You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2017/09/15 02:53:15 UTC

spark git commit: [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json support converting MapType to json for PySpark and SparkR

Repository: spark
Updated Branches:
  refs/heads/master 054ddb2f5 -> a28728a9a


[SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json support converting MapType to json for PySpark and SparkR

## What changes were proposed in this pull request?
In previous work SPARK-21513, we has allowed `MapType` and `ArrayType` of `MapType`s convert to a json string but only for Scala API. In this follow-up PR, we will make SparkSQL support it for PySpark and SparkR, too. We also fix some little bugs and comments of the previous work in this follow-up PR.

### For PySpark
```
>>> data = [(1, {"name": "Alice"})]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"name":"Alice")']
>>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')]
```
### For SparkR
```
# Converts a map into a JSON object
df2 <- sql("SELECT map('name', 'Bob')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
# Converts an array of maps into a JSON array
df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
```
## How was this patch tested?
Add unit test cases.

cc viirya HyukjinKwon

Author: goldmedal <li...@gmail.com>

Closes #19223 from goldmedal/SPARK-21513-fp-PySaprkAndSparkR.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a28728a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a28728a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a28728a9

Branch: refs/heads/master
Commit: a28728a9afcff94194147573e07f6f4d0463687e
Parents: 054ddb2
Author: goldmedal <li...@gmail.com>
Authored: Fri Sep 15 11:53:10 2017 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Fri Sep 15 11:53:10 2017 +0900

----------------------------------------------------------------------
 R/pkg/R/functions.R                             | 16 +++++++++++---
 R/pkg/tests/fulltests/test_sparkSQL.R           |  8 +++++++
 python/pyspark/sql/functions.py                 | 22 ++++++++++++++------
 .../catalyst/expressions/jsonExpressions.scala  |  8 +++----
 .../sql/catalyst/json/JacksonGenerator.scala    |  2 +-
 .../sql-tests/results/json-functions.sql.out    |  8 +++----
 6 files changed, 46 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/R/pkg/R/functions.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 5a46d73..e92e1fd 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -176,7 +176,8 @@ NULL
 #'
 #' @param x Column to compute on. Note the difference in the following methods:
 #'          \itemize{
-#'          \item \code{to_json}: it is the column containing the struct or array of the structs.
+#'          \item \code{to_json}: it is the column containing the struct, array of the structs,
+#'              the map or array of maps.
 #'          \item \code{from_json}: it is the column containing the JSON string.
 #'          }
 #' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains
@@ -1700,8 +1701,9 @@ setMethod("to_date",
           })
 
 #' @details
-#' \code{to_json}: Converts a column containing a \code{structType} or array of \code{structType}
-#' into a Column of JSON string. Resolving the Column can fail if an unsupported type is encountered.
+#' \code{to_json}: Converts a column containing a \code{structType}, array of \code{structType},
+#' a \code{mapType} or array of \code{mapType} into a Column of JSON string.
+#' Resolving the Column can fail if an unsupported type is encountered.
 #'
 #' @rdname column_collection_functions
 #' @aliases to_json to_json,Column-method
@@ -1715,6 +1717,14 @@ setMethod("to_date",
 #'
 #' # Converts an array of structs into a JSON array
 #' df2 <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
+#' df2 <- mutate(df2, people_json = to_json(df2$people))
+#'
+#' # Converts a map into a JSON object
+#' df2 <- sql("SELECT map('name', 'Bob')) as people")
+#' df2 <- mutate(df2, people_json = to_json(df2$people))
+#'
+#' # Converts an array of maps into a JSON array
+#' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
 #' df2 <- mutate(df2, people_json = to_json(df2$people))}
 #' @note to_json since 2.2.0
 setMethod("to_json", signature(x = "Column"),

http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 7abc872..85a7e08 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -1491,6 +1491,14 @@ test_that("column functions", {
   j <- collect(select(df, alias(to_json(df$people), "json")))
   expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
 
+  df <- sql("SELECT map('name', 'Bob') as people")
+  j <- collect(select(df, alias(to_json(df$people), "json")))
+  expect_equal(j[order(j$json), ][1], "{\"name\":\"Bob\"}")
+
+  df <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
+  j <- collect(select(df, alias(to_json(df$people), "json")))
+  expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
+
   df <- read.json(mapTypeJsonPath)
   j <- collect(select(df, alias(to_json(df$info), "json")))
   expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")

http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 0e76182..399bef0 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1884,9 +1884,9 @@ def json_tuple(col, *fields):
 @since(2.1)
 def from_json(col, schema, options={}):
     """
-    Parses a column containing a JSON string into a [[StructType]] or [[ArrayType]]
-    of [[StructType]]s with the specified schema. Returns `null`, in the case of an unparseable
-    string.
+    Parses a column containing a JSON string into a :class:`StructType` or :class:`ArrayType`
+    of :class:`StructType`\\s with the specified schema. Returns `null`, in the case of an
+    unparseable string.
 
     :param col: string column in json format
     :param schema: a StructType or ArrayType of StructType to use when parsing the json column.
@@ -1921,10 +1921,12 @@ def from_json(col, schema, options={}):
 @since(2.1)
 def to_json(col, options={}):
     """
-    Converts a column containing a [[StructType]] or [[ArrayType]] of [[StructType]]s into a
-    JSON string. Throws an exception, in the case of an unsupported type.
+    Converts a column containing a :class:`StructType`, :class:`ArrayType` of
+    :class:`StructType`\\s, a :class:`MapType` or :class:`ArrayType` of :class:`MapType`\\s
+    into a JSON string. Throws an exception, in the case of an unsupported type.
 
-    :param col: name of column containing the struct or array of the structs
+    :param col: name of column containing the struct, array of the structs, the map or
+        array of the maps.
     :param options: options to control converting. accepts the same options as the json datasource
 
     >>> from pyspark.sql import Row
@@ -1937,6 +1939,14 @@ def to_json(col, options={}):
     >>> df = spark.createDataFrame(data, ("key", "value"))
     >>> df.select(to_json(df.value).alias("json")).collect()
     [Row(json=u'[{"age":2,"name":"Alice"},{"age":3,"name":"Bob"}]')]
+    >>> data = [(1, {"name": "Alice"})]
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+    >>> df.select(to_json(df.value).alias("json")).collect()
+    [Row(json=u'{"name":"Alice"}')]
+    >>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])]
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+    >>> df.select(to_json(df.value).alias("json")).collect()
+    [Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')]
     """
 
     sc = SparkContext._active_spark_context

http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 1341631..18b4fed 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -618,13 +618,13 @@ case class JsonToStructs(
        {"time":"26/08/2015"}
       > SELECT _FUNC_(array(named_struct('a', 1, 'b', 2));
        [{"a":1,"b":2}]
-      > SELECT _FUNC_(map('a',named_struct('b',1)));
+      > SELECT _FUNC_(map('a', named_struct('b', 1)));
        {"a":{"b":1}}
-      > SELECT _FUNC_(map(named_struct('a',1),named_struct('b',2)));
+      > SELECT _FUNC_(map(named_struct('a', 1),named_struct('b', 2)));
        {"[1]":{"b":2}}
-      > SELECT _FUNC_(map('a',1));
+      > SELECT _FUNC_(map('a', 1));
        {"a":1}
-      > SELECT _FUNC_(array((map('a',1))));
+      > SELECT _FUNC_(array((map('a', 1))));
        [{"a":1}]
   """,
   since = "2.2.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index dfe7e28..eb06e4f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -43,7 +43,7 @@ private[sql] class JacksonGenerator(
   private type ValueWriter = (SpecializedGetters, Int) => Unit
 
   // `JackGenerator` can only be initialized with a `StructType` or a `MapType`.
-  require(dataType.isInstanceOf[StructType] | dataType.isInstanceOf[MapType],
+  require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType],
     "JacksonGenerator only supports to be initialized with a StructType " +
       s"or MapType but got ${dataType.simpleString}")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
index dcced79..d9dc728 100644
--- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
@@ -26,13 +26,13 @@ Extended Usage:
        {"time":"26/08/2015"}
       > SELECT to_json(array(named_struct('a', 1, 'b', 2));
        [{"a":1,"b":2}]
-      > SELECT to_json(map('a',named_struct('b',1)));
+      > SELECT to_json(map('a', named_struct('b', 1)));
        {"a":{"b":1}}
-      > SELECT to_json(map(named_struct('a',1),named_struct('b',2)));
+      > SELECT to_json(map(named_struct('a', 1),named_struct('b', 2)));
        {"[1]":{"b":2}}
-      > SELECT to_json(map('a',1));
+      > SELECT to_json(map('a', 1));
        {"a":1}
-      > SELECT to_json(array((map('a',1))));
+      > SELECT to_json(array((map('a', 1))));
        [{"a":1}]
   
     Since: 2.2.0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org