You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/05/14 21:05:46 UTC

spark git commit: [SPARK-24027][SQL] Support MapType with StringType for keys as the root type by from_json

Repository: spark
Updated Branches:
  refs/heads/master 075d678c8 -> 8cd83acf4


[SPARK-24027][SQL] Support MapType with StringType for keys as the root type by from_json

## What changes were proposed in this pull request?

Currently, the from_json function support StructType or ArrayType as the root type. The PR allows to specify MapType(StringType, DataType) as the root type additionally to mentioned types. For example:

```scala
import org.apache.spark.sql.types._
val schema = MapType(StringType, IntegerType)
val in = Seq("""{"a": 1, "b": 2, "c": 3}""").toDS()
in.select(from_json($"value", schema, Map[String, String]())).collect()
```
```
res1: Array[org.apache.spark.sql.Row] = Array([Map(a -> 1, b -> 2, c -> 3)])
```

## How was this patch tested?

It was checked by new tests for the map type with integer type and struct type as value types. Also roundtrip tests like from_json(to_json) and to_json(from_json) for MapType are added.

Author: Maxim Gekk <ma...@databricks.com>
Author: Maxim Gekk <ma...@gmail.com>

Closes #21108 from MaxGekk/from_json-map-type.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cd83acf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cd83acf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cd83acf

Branch: refs/heads/master
Commit: 8cd83acf4075d369bfcf9e703760d4946ef15f00
Parents: 075d678
Author: Maxim Gekk <ma...@databricks.com>
Authored: Mon May 14 14:05:42 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon May 14 14:05:42 2018 -0700

----------------------------------------------------------------------
 python/pyspark/sql/functions.py                 | 10 ++-
 .../catalyst/expressions/jsonExpressions.scala  | 10 ++-
 .../spark/sql/catalyst/json/JacksonParser.scala | 18 +++++-
 .../scala/org/apache/spark/sql/functions.scala  | 29 ++++-----
 .../apache/spark/sql/JsonFunctionsSuite.scala   | 66 ++++++++++++++++++++
 5 files changed, 113 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8cd83acf/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index b62748e..6866c1c 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2095,12 +2095,13 @@ def json_tuple(col, *fields):
     return Column(jc)
 
 
+@ignore_unicode_prefix
 @since(2.1)
 def from_json(col, schema, options={}):
     """
-    Parses a column containing a JSON string into a :class:`StructType` or :class:`ArrayType`
-    of :class:`StructType`\\s with the specified schema. Returns `null`, in the case of an
-    unparseable string.
+    Parses a column containing a JSON string into a :class:`MapType` with :class:`StringType`
+    as keys type, :class:`StructType` or :class:`ArrayType` of :class:`StructType`\\s with
+    the specified schema. Returns `null`, in the case of an unparseable string.
 
     :param col: string column in json format
     :param schema: a StructType or ArrayType of StructType to use when parsing the json column.
@@ -2117,6 +2118,9 @@ def from_json(col, schema, options={}):
     [Row(json=Row(a=1))]
     >>> df.select(from_json(df.value, "a INT").alias("json")).collect()
     [Row(json=Row(a=1))]
+    >>> schema = MapType(StringType(), IntegerType())
+    >>> df.select(from_json(df.value, schema).alias("json")).collect()
+    [Row(json={u'a': 1})]
     >>> data = [(1, '''[{"a": 1}]''')]
     >>> schema = ArrayType(StructType([StructField("a", IntegerType())]))
     >>> df = spark.createDataFrame(data, ("key", "value"))

http://git-wip-us.apache.org/repos/asf/spark/blob/8cd83acf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 34161f0..04a4eb0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -548,7 +548,7 @@ case class JsonToStructs(
       forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
 
   override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
-    case _: StructType | ArrayType(_: StructType, _) =>
+    case _: StructType | ArrayType(_: StructType, _) | _: MapType =>
       super.checkInputDataTypes()
     case _ => TypeCheckResult.TypeCheckFailure(
       s"Input schema ${nullableSchema.simpleString} must be a struct or an array of structs.")
@@ -558,6 +558,7 @@ case class JsonToStructs(
   lazy val rowSchema = nullableSchema match {
     case st: StructType => st
     case ArrayType(st: StructType, _) => st
+    case mt: MapType => mt
   }
 
   // This converts parsed rows to the desired output by the given schema.
@@ -567,6 +568,8 @@ case class JsonToStructs(
       (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
     case ArrayType(_: StructType, _) =>
       (rows: Seq[InternalRow]) => new GenericArrayData(rows)
+    case _: MapType =>
+      (rows: Seq[InternalRow]) => rows.head.getMap(0)
   }
 
   @transient
@@ -613,6 +616,11 @@ case class JsonToStructs(
   }
 
   override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
+
+  override def sql: String = schema match {
+    case _: MapType => "entries"
+    case _ => super.sql
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8cd83acf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index a5a4a13..c3a4ca8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -36,7 +36,7 @@ import org.apache.spark.util.Utils
  * Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
  */
 class JacksonParser(
-    schema: StructType,
+    schema: DataType,
     val options: JSONOptions) extends Logging {
 
   import JacksonUtils._
@@ -57,7 +57,14 @@ class JacksonParser(
    * to a value according to a desired schema. This is a wrapper for the method
    * `makeConverter()` to handle a row wrapped with an array.
    */
-  private def makeRootConverter(st: StructType): JsonParser => Seq[InternalRow] = {
+  private def makeRootConverter(dt: DataType): JsonParser => Seq[InternalRow] = {
+    dt match {
+      case st: StructType => makeStructRootConverter(st)
+      case mt: MapType => makeMapRootConverter(mt)
+    }
+  }
+
+  private def makeStructRootConverter(st: StructType): JsonParser => Seq[InternalRow] = {
     val elementConverter = makeConverter(st)
     val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
     (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, st) {
@@ -87,6 +94,13 @@ class JacksonParser(
     }
   }
 
+  private def makeMapRootConverter(mt: MapType): JsonParser => Seq[InternalRow] = {
+    val fieldConverter = makeConverter(mt.valueType)
+    (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, mt) {
+      case START_OBJECT => Seq(InternalRow(convertMap(parser, fieldConverter)))
+    }
+  }
+
   /**
    * Create a converter which converts the JSON documents held by the `JsonParser`
    * to a value according to a desired schema.

http://git-wip-us.apache.org/repos/asf/spark/blob/8cd83acf/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 3c9ace4..b71dfda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3231,9 +3231,9 @@ object functions {
     from_json(e, schema.asInstanceOf[DataType], options)
 
   /**
-   * (Scala-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
-   * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
-   * string.
+   * (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+   * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
    *
    * @param e a string column containing JSON data.
    * @param schema the schema to use when parsing the json string
@@ -3263,9 +3263,9 @@ object functions {
     from_json(e, schema, options.asScala.toMap)
 
   /**
-   * (Java-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
-   * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
-   * string.
+   * (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+   * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
    *
    * @param e a string column containing JSON data.
    * @param schema the schema to use when parsing the json string
@@ -3292,8 +3292,9 @@ object functions {
     from_json(e, schema, Map.empty[String, String])
 
   /**
-   * Parses a column containing a JSON string into a `StructType` or `ArrayType` of `StructType`s
-   * with the specified schema. Returns `null`, in the case of an unparseable string.
+   * Parses a column containing a JSON string into a `MapType` with `StringType` as keys type,
+   * `StructType` or `ArrayType` of `StructType`s with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
    *
    * @param e a string column containing JSON data.
    * @param schema the schema to use when parsing the json string
@@ -3305,9 +3306,9 @@ object functions {
     from_json(e, schema, Map.empty[String, String])
 
   /**
-   * (Java-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
-   * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
-   * string.
+   * (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+   * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
    *
    * @param e a string column containing JSON data.
    * @param schema the schema to use when parsing the json string as a json string. In Spark 2.1,
@@ -3322,9 +3323,9 @@ object functions {
   }
 
   /**
-   * (Scala-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
-   * of `StructType`s with the specified schema. Returns `null`, in the case of an unparseable
-   * string.
+   * (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+   * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
    *
    * @param e a string column containing JSON data.
    * @param schema the schema to use when parsing the json string as a json string, it could be a

http://git-wip-us.apache.org/repos/asf/spark/blob/8cd83acf/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 00d2acc..055e1fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -326,4 +326,70 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
     assert(errMsg4.getMessage.startsWith(
       "A type of keys and values in map() must be string, but got"))
   }
+
+  test("SPARK-24027: from_json - map<string, int>") {
+    val in = Seq("""{"a": 1, "b": 2, "c": 3}""").toDS()
+    val schema =
+      """
+        |{
+        |  "type" : "map",
+        |  "keyType" : "string",
+        |  "valueType" : "integer",
+        |  "valueContainsNull" : true
+        |}
+      """.stripMargin
+    val out = in.select(from_json($"value", schema, Map[String, String]()))
+
+    assert(out.columns.head == "entries")
+    checkAnswer(out, Row(Map("a" -> 1, "b" -> 2, "c" -> 3)))
+  }
+
+  test("SPARK-24027: from_json - map<string, struct>") {
+    val in = Seq("""{"a": {"b": 1}}""").toDS()
+    val schema = MapType(StringType, new StructType().add("b", IntegerType), true)
+    val out = in.select(from_json($"value", schema))
+
+    checkAnswer(out, Row(Map("a" -> Row(1))))
+  }
+
+  test("SPARK-24027: from_json - map<string, map<string, int>>") {
+    val in = Seq("""{"a": {"b": 1}}""").toDS()
+    val schema = MapType(StringType, MapType(StringType, IntegerType))
+    val out = in.select(from_json($"value", schema))
+
+    checkAnswer(out, Row(Map("a" -> Map("b" -> 1))))
+  }
+
+  test("SPARK-24027: roundtrip - from_json -> to_json  - map<string, string>") {
+    val json = """{"a":1,"b":2,"c":3}"""
+    val schema = MapType(StringType, IntegerType, true)
+    val out = Seq(json).toDS().select(to_json(from_json($"value", schema)))
+
+    checkAnswer(out, Row(json))
+  }
+
+  test("SPARK-24027: roundtrip - to_json -> from_json  - map<string, string>") {
+    val in = Seq(Map("a" -> 1)).toDF()
+    val schema = MapType(StringType, IntegerType, true)
+    val out = in.select(from_json(to_json($"value"), schema))
+
+    checkAnswer(out, in)
+  }
+
+  test("SPARK-24027: from_json - wrong map<string, int>") {
+    val in = Seq("""{"a" 1}""").toDS()
+    val schema = MapType(StringType, IntegerType)
+    val out = in.select(from_json($"value", schema, Map[String, String]()))
+
+    checkAnswer(out, Row(null))
+  }
+
+  test("SPARK-24027: from_json of a map with unsupported key type") {
+    val schema = MapType(StructType(StructField("f", IntegerType) :: Nil), StringType)
+
+    checkAnswer(Seq("""{{"f": 1}: "a"}""").toDS().select(from_json($"value", schema)),
+      Row(null))
+    checkAnswer(Seq("""{"{"f": 1}": "a"}""").toDS().select(from_json($"value", schema)),
+      Row(null))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org