You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2017/09/15 02:53:15 UTC
spark git commit: [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json
support converting MapType to json for PySpark and SparkR
Repository: spark
Updated Branches:
refs/heads/master 054ddb2f5 -> a28728a9a
[SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json support converting MapType to json for PySpark and SparkR
## What changes were proposed in this pull request?
In previous work SPARK-21513, we has allowed `MapType` and `ArrayType` of `MapType`s convert to a json string but only for Scala API. In this follow-up PR, we will make SparkSQL support it for PySpark and SparkR, too. We also fix some little bugs and comments of the previous work in this follow-up PR.
### For PySpark
```
>>> data = [(1, {"name": "Alice"})]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"name":"Alice")']
>>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')]
```
### For SparkR
```
# Converts a map into a JSON object
df2 <- sql("SELECT map('name', 'Bob')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
# Converts an array of maps into a JSON array
df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
```
## How was this patch tested?
Add unit test cases.
cc viirya HyukjinKwon
Author: goldmedal <li...@gmail.com>
Closes #19223 from goldmedal/SPARK-21513-fp-PySaprkAndSparkR.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a28728a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a28728a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a28728a9
Branch: refs/heads/master
Commit: a28728a9afcff94194147573e07f6f4d0463687e
Parents: 054ddb2
Author: goldmedal <li...@gmail.com>
Authored: Fri Sep 15 11:53:10 2017 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Fri Sep 15 11:53:10 2017 +0900
----------------------------------------------------------------------
R/pkg/R/functions.R | 16 +++++++++++---
R/pkg/tests/fulltests/test_sparkSQL.R | 8 +++++++
python/pyspark/sql/functions.py | 22 ++++++++++++++------
.../catalyst/expressions/jsonExpressions.scala | 8 +++----
.../sql/catalyst/json/JacksonGenerator.scala | 2 +-
.../sql-tests/results/json-functions.sql.out | 8 +++----
6 files changed, 46 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/R/pkg/R/functions.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 5a46d73..e92e1fd 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -176,7 +176,8 @@ NULL
#'
#' @param x Column to compute on. Note the difference in the following methods:
#' \itemize{
-#' \item \code{to_json}: it is the column containing the struct or array of the structs.
+#' \item \code{to_json}: it is the column containing the struct, array of the structs,
+#' the map or array of maps.
#' \item \code{from_json}: it is the column containing the JSON string.
#' }
#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains
@@ -1700,8 +1701,9 @@ setMethod("to_date",
})
#' @details
-#' \code{to_json}: Converts a column containing a \code{structType} or array of \code{structType}
-#' into a Column of JSON string. Resolving the Column can fail if an unsupported type is encountered.
+#' \code{to_json}: Converts a column containing a \code{structType}, array of \code{structType},
+#' a \code{mapType} or array of \code{mapType} into a Column of JSON string.
+#' Resolving the Column can fail if an unsupported type is encountered.
#'
#' @rdname column_collection_functions
#' @aliases to_json to_json,Column-method
@@ -1715,6 +1717,14 @@ setMethod("to_date",
#'
#' # Converts an array of structs into a JSON array
#' df2 <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
+#' df2 <- mutate(df2, people_json = to_json(df2$people))
+#'
+#' # Converts a map into a JSON object
+#' df2 <- sql("SELECT map('name', 'Bob')) as people")
+#' df2 <- mutate(df2, people_json = to_json(df2$people))
+#'
+#' # Converts an array of maps into a JSON array
+#' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))}
#' @note to_json since 2.2.0
setMethod("to_json", signature(x = "Column"),
http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 7abc872..85a7e08 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -1491,6 +1491,14 @@ test_that("column functions", {
j <- collect(select(df, alias(to_json(df$people), "json")))
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
+ df <- sql("SELECT map('name', 'Bob') as people")
+ j <- collect(select(df, alias(to_json(df$people), "json")))
+ expect_equal(j[order(j$json), ][1], "{\"name\":\"Bob\"}")
+
+ df <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
+ j <- collect(select(df, alias(to_json(df$people), "json")))
+ expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
+
df <- read.json(mapTypeJsonPath)
j <- collect(select(df, alias(to_json(df$info), "json")))
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 0e76182..399bef0 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1884,9 +1884,9 @@ def json_tuple(col, *fields):
@since(2.1)
def from_json(col, schema, options={}):
"""
- Parses a column containing a JSON string into a [[StructType]] or [[ArrayType]]
- of [[StructType]]s with the specified schema. Returns `null`, in the case of an unparseable
- string.
+ Parses a column containing a JSON string into a :class:`StructType` or :class:`ArrayType`
+ of :class:`StructType`\\s with the specified schema. Returns `null`, in the case of an
+ unparseable string.
:param col: string column in json format
:param schema: a StructType or ArrayType of StructType to use when parsing the json column.
@@ -1921,10 +1921,12 @@ def from_json(col, schema, options={}):
@since(2.1)
def to_json(col, options={}):
"""
- Converts a column containing a [[StructType]] or [[ArrayType]] of [[StructType]]s into a
- JSON string. Throws an exception, in the case of an unsupported type.
+ Converts a column containing a :class:`StructType`, :class:`ArrayType` of
+ :class:`StructType`\\s, a :class:`MapType` or :class:`ArrayType` of :class:`MapType`\\s
+ into a JSON string. Throws an exception, in the case of an unsupported type.
- :param col: name of column containing the struct or array of the structs
+ :param col: name of column containing the struct, array of the structs, the map or
+ array of the maps.
:param options: options to control converting. accepts the same options as the json datasource
>>> from pyspark.sql import Row
@@ -1937,6 +1939,14 @@ def to_json(col, options={}):
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'[{"age":2,"name":"Alice"},{"age":3,"name":"Bob"}]')]
+ >>> data = [(1, {"name": "Alice"})]
+ >>> df = spark.createDataFrame(data, ("key", "value"))
+ >>> df.select(to_json(df.value).alias("json")).collect()
+ [Row(json=u'{"name":"Alice"}')]
+ >>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])]
+ >>> df = spark.createDataFrame(data, ("key", "value"))
+ >>> df.select(to_json(df.value).alias("json")).collect()
+ [Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')]
"""
sc = SparkContext._active_spark_context
http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 1341631..18b4fed 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -618,13 +618,13 @@ case class JsonToStructs(
{"time":"26/08/2015"}
> SELECT _FUNC_(array(named_struct('a', 1, 'b', 2));
[{"a":1,"b":2}]
- > SELECT _FUNC_(map('a',named_struct('b',1)));
+ > SELECT _FUNC_(map('a', named_struct('b', 1)));
{"a":{"b":1}}
- > SELECT _FUNC_(map(named_struct('a',1),named_struct('b',2)));
+ > SELECT _FUNC_(map(named_struct('a', 1),named_struct('b', 2)));
{"[1]":{"b":2}}
- > SELECT _FUNC_(map('a',1));
+ > SELECT _FUNC_(map('a', 1));
{"a":1}
- > SELECT _FUNC_(array((map('a',1))));
+ > SELECT _FUNC_(array((map('a', 1))));
[{"a":1}]
""",
since = "2.2.0")
http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
index dfe7e28..eb06e4f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
@@ -43,7 +43,7 @@ private[sql] class JacksonGenerator(
private type ValueWriter = (SpecializedGetters, Int) => Unit
// `JackGenerator` can only be initialized with a `StructType` or a `MapType`.
- require(dataType.isInstanceOf[StructType] | dataType.isInstanceOf[MapType],
+ require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType],
"JacksonGenerator only supports to be initialized with a StructType " +
s"or MapType but got ${dataType.simpleString}")
http://git-wip-us.apache.org/repos/asf/spark/blob/a28728a9/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
index dcced79..d9dc728 100644
--- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out
@@ -26,13 +26,13 @@ Extended Usage:
{"time":"26/08/2015"}
> SELECT to_json(array(named_struct('a', 1, 'b', 2));
[{"a":1,"b":2}]
- > SELECT to_json(map('a',named_struct('b',1)));
+ > SELECT to_json(map('a', named_struct('b', 1)));
{"a":{"b":1}}
- > SELECT to_json(map(named_struct('a',1),named_struct('b',2)));
+ > SELECT to_json(map(named_struct('a', 1),named_struct('b', 2)));
{"[1]":{"b":2}}
- > SELECT to_json(map('a',1));
+ > SELECT to_json(map('a', 1));
{"a":1}
- > SELECT to_json(array((map('a',1))));
+ > SELECT to_json(array((map('a', 1))));
[{"a":1}]
Since: 2.2.0
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org