You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/14 21:05:46 UTC
spark git commit: [SPARK-24027][SQL] Support MapType with StringType
for keys as the root type by from_json
Repository: spark
Updated Branches:
refs/heads/master 075d678c8 -> 8cd83acf4
[SPARK-24027][SQL] Support MapType with StringType for keys as the root type by from_json
## What changes were proposed in this pull request?
Currently, the from_json function support StructType or ArrayType as the root type. The PR allows to specify MapType(StringType, DataType) as the root type additionally to mentioned types. For example:
```scala
import org.apache.spark.sql.types._
val schema = MapType(StringType, IntegerType)
val in = Seq("""{"a": 1, "b": 2, "c": 3}""").toDS()
in.select(from_json($"value", schema, Map[String, String]())).collect()
```
```
res1: Array[org.apache.spark.sql.Row] = Array([Map(a -> 1, b -> 2, c -> 3)])
```
## How was this patch tested?
It was checked by new tests for the map type with integer type and struct type as value types. Also roundtrip tests like from_json(to_json) and to_json(from_json) for MapType are added.
Author: Maxim Gekk <ma...@databricks.com>
Author: Maxim Gekk <ma...@gmail.com>
Closes #21108 from MaxGekk/from_json-map-type.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cd83acf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cd83acf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cd83acf
Branch: refs/heads/master
Commit: 8cd83acf4075d369bfcf9e703760d4946ef15f00
Parents: 075d678
Author: Maxim Gekk <ma...@databricks.com>
Authored: Mon May 14 14:05:42 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon May 14 14:05:42 2018 -0700
----------------------------------------------------------------------
python/pyspark/sql/functions.py | 10 ++-
.../catalyst/expressions/jsonExpressions.scala | 10 ++-
.../spark/sql/catalyst/json/JacksonParser.scala | 18 +++++-
.../scala/org/apache/spark/sql/functions.scala | 29 ++++-----
.../apache/spark/sql/JsonFunctionsSuite.scala | 66 ++++++++++++++++++++
5 files changed, 113 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8cd83acf/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index b62748e..6866c1c 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2095,12 +2095,13 @@ def json_tuple(col, *fields):
return Column(jc)
+@ignore_unicode_prefix
@since(2.1)
def from_json(col, schema, options={}):
"""
- Parses a column containing a JSON string into a :class:`StructType` or :class:`ArrayType`
- of :class:`StructType`\\s with the specified schema. Returns `null`, in the case of an
- unparseable string.
+ Parses a column containing a JSON string into a :class:`MapType` with :class:`StringType`
+ as keys type, :class:`StructType` or :class:`ArrayType` of :class:`StructType`\\s with
+ the specified schema. Returns `null`, in the case of an unparseable string.
:param col: string column in json format
:param schema: a StructType or ArrayType of StructType to use when parsing the json column.
@@ -2117,6 +2118,9 @@ def from_json(col, schema, options={}):
[Row(json=Row(a=1))]
>>> df.select(from_json(df.value, "a INT").alias("json")).collect()
[Row(json=Row(a=1))]
+ >>> schema = MapType(StringType(), IntegerType())
+ >>> df.select(from_json(df.value, schema).alias("json")).collect()
+ [Row(json={u'a': 1})]
>>> data = [(1, '''[{"a": 1}]''')]
>>> schema = ArrayType(StructType([StructField("a", IntegerType())]))
>>> df = spark.createDataFrame(data, ("key", "value"))
http://git-wip-us.apache.org/repos/asf/spark/blob/8cd83acf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 34161f0..04a4eb0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -548,7 +548,7 @@ case class JsonToStructs(
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
- case _: StructType | ArrayType(_: StructType, _) =>
+ case _: StructType | ArrayType(_: StructType, _) | _: MapType =>
super.checkInputDataTypes()
case _ => TypeCheckResult.TypeCheckFailure(
s"Input schema ${nullableSchema.simpleString} must be a struct or an array of structs.")
@@ -558,6 +558,7 @@ case class JsonToStructs(
lazy val rowSchema = nullableSchema match {
case st: StructType => st
case ArrayType(st: StructType, _) => st
+ case mt: MapType => mt
}
// This converts parsed rows to the desired output by the given schema.
@@ -567,6 +568,8 @@ case class JsonToStructs(
(rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
case ArrayType(_: StructType, _) =>
(rows: Seq[InternalRow]) => new GenericArrayData(rows)
+ case _: MapType =>
+ (rows: Seq[InternalRow]) => rows.head.getMap(0)
}
@transient
@@ -613,6 +616,11 @@ case class JsonToStructs(
}
override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
+
+ override def sql: String = schema match {
+ case _: MapType => "entries"
+ case _ => super.sql
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/8cd83acf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index a5a4a13..c3a4ca8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -36,7 +36,7 @@ import org.apache.spark.util.Utils
* Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
*/
class JacksonParser(
- schema: StructType,
+ schema: DataType,
val options: JSONOptions) extends Logging {
import JacksonUtils._
@@ -57,7 +57,14 @@ class JacksonParser(
* to a value according to a desired schema. This is a wrapper for the method
* `makeConverter()` to handle a row wrapped with an array.
*/
- private def makeRootConverter(st: StructType): JsonParser => Seq[InternalRow] = {
+ private def makeRootConverter(dt: DataType): JsonParser => Seq[InternalRow] = {
+ dt match {
+ case st: StructType => makeStructRootConverter(st)
+ case mt: MapType => makeMapRootConverter(mt)
+ }
+ }
+
+ private def makeStructRootConverter(st: StructType): JsonParser => Seq[InternalRow] = {
val elementConverter = makeConverter(st)
val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
(parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, st) {
@@ -87,6 +94,13 @@ class JacksonParser(
}
}
+ private def makeMapRootConverter(mt: MapType): JsonParser => Seq[InternalRow] = {
+ val fieldConverter = makeConverter(mt.valueType)
+ (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, mt) {
+ case START_OBJECT => Seq(InternalRow(convertMap(parser, fieldConverter)))
+ }
+ }
+
/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema.
http://git-wip-us.apache.org/repos/asf/spark/blob/8cd83acf/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 3c9ace4..b71dfda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3231,9 +3231,9 @@ object functions {
from_json(e, schema.asInstanceOf[DataType], options)
/**
- * (Scala-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
- * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
- * string.
+ * (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+ * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
+ * Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
@@ -3263,9 +3263,9 @@ object functions {
from_json(e, schema, options.asScala.toMap)
/**
- * (Java-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
- * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
- * string.
+ * (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+ * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
+ * Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
@@ -3292,8 +3292,9 @@ object functions {
from_json(e, schema, Map.empty[String, String])
/**
- * Parses a column containing a JSON string into a `StructType` or `ArrayType` of `StructType`s
- * with the specified schema. Returns `null`, in the case of an unparseable string.
+ * Parses a column containing a JSON string into a `MapType` with `StringType` as keys type,
+ * `StructType` or `ArrayType` of `StructType`s with the specified schema.
+ * Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
@@ -3305,9 +3306,9 @@ object functions {
from_json(e, schema, Map.empty[String, String])
/**
- * (Java-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
- * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
- * string.
+ * (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+ * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
+ * Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string as a json string. In Spark 2.1,
@@ -3322,9 +3323,9 @@ object functions {
}
/**
- * (Scala-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
- * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
- * string.
+ * (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+ * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
+ * Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string as a json string, it could be a
http://git-wip-us.apache.org/repos/asf/spark/blob/8cd83acf/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 00d2acc..055e1fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -326,4 +326,70 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
assert(errMsg4.getMessage.startsWith(
"A type of keys and values in map() must be string, but got"))
}
+
+ test("SPARK-24027: from_json - map<string, int>") {
+ val in = Seq("""{"a": 1, "b": 2, "c": 3}""").toDS()
+ val schema =
+ """
+ |{
+ | "type" : "map",
+ | "keyType" : "string",
+ | "valueType" : "integer",
+ | "valueContainsNull" : true
+ |}
+ """.stripMargin
+ val out = in.select(from_json($"value", schema, Map[String, String]()))
+
+ assert(out.columns.head == "entries")
+ checkAnswer(out, Row(Map("a" -> 1, "b" -> 2, "c" -> 3)))
+ }
+
+ test("SPARK-24027: from_json - map<string, struct>") {
+ val in = Seq("""{"a": {"b": 1}}""").toDS()
+ val schema = MapType(StringType, new StructType().add("b", IntegerType), true)
+ val out = in.select(from_json($"value", schema))
+
+ checkAnswer(out, Row(Map("a" -> Row(1))))
+ }
+
+ test("SPARK-24027: from_json - map<string, map<string, int>>") {
+ val in = Seq("""{"a": {"b": 1}}""").toDS()
+ val schema = MapType(StringType, MapType(StringType, IntegerType))
+ val out = in.select(from_json($"value", schema))
+
+ checkAnswer(out, Row(Map("a" -> Map("b" -> 1))))
+ }
+
+ test("SPARK-24027: roundtrip - from_json -> to_json - map<string, string>") {
+ val json = """{"a":1,"b":2,"c":3}"""
+ val schema = MapType(StringType, IntegerType, true)
+ val out = Seq(json).toDS().select(to_json(from_json($"value", schema)))
+
+ checkAnswer(out, Row(json))
+ }
+
+ test("SPARK-24027: roundtrip - to_json -> from_json - map<string, string>") {
+ val in = Seq(Map("a" -> 1)).toDF()
+ val schema = MapType(StringType, IntegerType, true)
+ val out = in.select(from_json(to_json($"value"), schema))
+
+ checkAnswer(out, in)
+ }
+
+ test("SPARK-24027: from_json - wrong map<string, int>") {
+ val in = Seq("""{"a" 1}""").toDS()
+ val schema = MapType(StringType, IntegerType)
+ val out = in.select(from_json($"value", schema, Map[String, String]()))
+
+ checkAnswer(out, Row(null))
+ }
+
+ test("SPARK-24027: from_json of a map with unsupported key type") {
+ val schema = MapType(StructType(StructField("f", IntegerType) :: Nil), StringType)
+
+ checkAnswer(Seq("""{{"f": 1}: "a"}""").toDS().select(from_json($"value", schema)),
+ Row(null))
+ checkAnswer(Seq("""{"{"f": 1}": "a"}""").toDS().select(from_json($"value", schema)),
+ Row(null))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org