You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2017/03/05 22:35:23 UTC

spark git commit: [SPARK-19595][SQL] Support json array in from_json

Repository: spark
Updated Branches:
  refs/heads/master 80d5338b3 -> 369a148e5


[SPARK-19595][SQL] Support json array in from_json

## What changes were proposed in this pull request?

This PR proposes to both,

**Do not allow json arrays with multiple elements and return null in `from_json` with `StructType` as the schema.**

Currently, it only reads the single row when the input is a json array. So, the codes below:

```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = StructType(StructField("a", IntegerType) :: Nil)
Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("struct").select(from_json(col("struct"), schema)).show()
```
prints

```
+--------------------+
|jsontostruct(struct)|
+--------------------+
|                 [1]|
+--------------------+
```

This PR simply suggests to print this as `null` if the schema is `StructType` and input is json array.with multiple elements

```
+--------------------+
|jsontostruct(struct)|
+--------------------+
|                null|
+--------------------+
```

**Support json arrays in `from_json` with `ArrayType` as the schema.**

```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("array").select(from_json(col("array"), schema)).show()
```

prints

```
+-------------------+
|jsontostruct(array)|
+-------------------+
|         [[1], [2]]|
+-------------------+
```

## How was this patch tested?

Unit test in `JsonExpressionsSuite`, `JsonFunctionsSuite`, Python doctests and manual test.

Author: hyukjinkwon <gu...@gmail.com>

Closes #16929 from HyukjinKwon/disallow-array.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/369a148e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/369a148e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/369a148e

Branch: refs/heads/master
Commit: 369a148e591bb16ec7da54867610b207602cd698
Parents: 80d5338
Author: hyukjinkwon <gu...@gmail.com>
Authored: Sun Mar 5 14:35:06 2017 -0800
Committer: Burak Yavuz <br...@gmail.com>
Committed: Sun Mar 5 14:35:06 2017 -0800

----------------------------------------------------------------------
 python/pyspark/sql/functions.py                 | 11 +++-
 .../catalyst/expressions/jsonExpressions.scala  | 57 ++++++++++++++++---
 .../expressions/JsonExpressionsSuite.scala      | 58 +++++++++++++++++++-
 .../scala/org/apache/spark/sql/functions.scala  | 52 ++++++++++++++++--
 .../apache/spark/sql/JsonFunctionsSuite.scala   | 25 ++++++++-
 5 files changed, 186 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 426a4a8..376b86e 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1773,11 +1773,11 @@ def json_tuple(col, *fields):
 @since(2.1)
 def from_json(col, schema, options={}):
     """
-    Parses a column containing a JSON string into a [[StructType]] with the
-    specified schema. Returns `null`, in the case of an unparseable string.
+    Parses a column containing a JSON string into a [[StructType]] or [[ArrayType]]
+    with the specified schema. Returns `null`, in the case of an unparseable string.
 
     :param col: string column in json format
-    :param schema: a StructType to use when parsing the json column
+    :param schema: a StructType or ArrayType to use when parsing the json column
     :param options: options to control parsing. accepts the same options as the json datasource
 
     >>> from pyspark.sql.types import *
@@ -1786,6 +1786,11 @@ def from_json(col, schema, options={}):
     >>> df = spark.createDataFrame(data, ("key", "value"))
     >>> df.select(from_json(df.value, schema).alias("json")).collect()
     [Row(json=Row(a=1))]
+    >>> data = [(1, '''[{"a": 1}]''')]
+    >>> schema = ArrayType(StructType([StructField("a", IntegerType())]))
+    >>> df = spark.createDataFrame(data, ("key", "value"))
+    >>> df.select(from_json(df.value, schema).alias("json")).collect()
+    [Row(json=[Row(a=1)])]
     """
 
     sc = SparkContext._active_spark_context

http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 1e690a4..dbff62e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.json._
-import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.catalyst.util.{GenericArrayData, ParseModes}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
@@ -480,23 +480,45 @@ case class JsonTuple(children: Seq[Expression])
 }
 
 /**
- * Converts an json input string to a [[StructType]] with the specified schema.
+ * Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema.
  */
 case class JsonToStruct(
-    schema: StructType,
+    schema: DataType,
     options: Map[String, String],
     child: Expression,
     timeZoneId: Option[String] = None)
   extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
   override def nullable: Boolean = true
 
-  def this(schema: StructType, options: Map[String, String], child: Expression) =
+  def this(schema: DataType, options: Map[String, String], child: Expression) =
     this(schema, options, child, None)
 
+  override def checkInputDataTypes(): TypeCheckResult = schema match {
+    case _: StructType | ArrayType(_: StructType, _) =>
+      super.checkInputDataTypes()
+    case _ => TypeCheckResult.TypeCheckFailure(
+      s"Input schema ${schema.simpleString} must be a struct or an array of structs.")
+  }
+
+  @transient
+  lazy val rowSchema = schema match {
+    case st: StructType => st
+    case ArrayType(st: StructType, _) => st
+  }
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = schema match {
+    case _: StructType =>
+      (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+    case ArrayType(_: StructType, _) =>
+      (rows: Seq[InternalRow]) => new GenericArrayData(rows)
+  }
+
   @transient
   lazy val parser =
     new JacksonParser(
-      schema,
+      rowSchema,
       new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
 
   override def dataType: DataType = schema
@@ -505,11 +527,32 @@ case class JsonToStruct(
     copy(timeZoneId = Option(timeZoneId))
 
   override def nullSafeEval(json: Any): Any = {
+    // When input is,
+    //   - `null`: `null`.
+    //   - invalid json: `null`.
+    //   - empty string: `null`.
+    //
+    // When the schema is array,
+    //   - json array: `Array(Row(...), ...)`
+    //   - json object: `Array(Row(...))`
+    //   - empty json array: `Array()`.
+    //   - empty json object: `Array(Row(null))`.
+    //
+    // When the schema is a struct,
+    //   - json object/array with single element: `Row(...)`
+    //   - json array with multiple elements: `null`
+    //   - empty json array: `null`.
+    //   - empty json object: `Row(null)`.
+
+    // We need `null` if the input string is an empty string. `JacksonParser` can
+    // deal with this but produces `Nil`.
+    if (json.toString.trim.isEmpty) return null
+
     try {
-      parser.parse(
+      converter(parser.parse(
         json.asInstanceOf[UTF8String],
         CreateJacksonParser.utf8String,
-        identity[UTF8String]).headOption.orNull
+        identity[UTF8String]))
     } catch {
       case _: SparkSQLJsonProcessingException => null
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 0c46819..e358490 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -22,7 +22,7 @@ import java.util.Calendar
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes}
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType}
+import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
 class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -372,6 +372,62 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     )
   }
 
+  test("from_json - input=array, schema=array, output=array") {
+    val input = """[{"a": 1}, {"a": 2}]"""
+    val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+    val output = InternalRow(1) :: InternalRow(2) :: Nil
+    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+  }
+
+  test("from_json - input=object, schema=array, output=array of single row") {
+    val input = """{"a": 1}"""
+    val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+    val output = InternalRow(1) :: Nil
+    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+  }
+
+  test("from_json - input=empty array, schema=array, output=empty array") {
+    val input = "[ ]"
+    val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+    val output = Nil
+    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+  }
+
+  test("from_json - input=empty object, schema=array, output=array of single row with null") {
+    val input = "{ }"
+    val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+    val output = InternalRow(null) :: Nil
+    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+  }
+
+  test("from_json - input=array of single object, schema=struct, output=single row") {
+    val input = """[{"a": 1}]"""
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    val output = InternalRow(1)
+    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+  }
+
+  test("from_json - input=array, schema=struct, output=null") {
+    val input = """[{"a": 1}, {"a": 2}]"""
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    val output = null
+    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+  }
+
+  test("from_json - input=empty array, schema=struct, output=null") {
+    val input = """[]"""
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    val output = null
+    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+  }
+
+  test("from_json - input=empty object, schema=struct, output=single row with null") {
+    val input = """{  }"""
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    val output = InternalRow(null)
+    checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+  }
+
   test("from_json null input column") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(

http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 2247010..201f726 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2973,7 +2973,22 @@ object functions {
    * @group collection_funcs
    * @since 2.1.0
    */
-  def from_json(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
+  def from_json(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_json(e, schema.asInstanceOf[DataType], options)
+
+  /**
+   * (Scala-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
+   * with the specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string
+   * @param options options to control how the json is parsed. accepts the same options and the
+   *                json data source.
+   *
+   * @group collection_funcs
+   * @since 2.2.0
+   */
+  def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr {
     JsonToStruct(schema, options, e.expr)
   }
 
@@ -2993,6 +3008,21 @@ object functions {
     from_json(e, schema, options.asScala.toMap)
 
   /**
+   * (Java-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
+   * with the specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string
+   * @param options options to control how the json is parsed. accepts the same options and the
+   *                json data source.
+   *
+   * @group collection_funcs
+   * @since 2.2.0
+   */
+  def from_json(e: Column, schema: DataType, options: java.util.Map[String, String]): Column =
+    from_json(e, schema, options.asScala.toMap)
+
+  /**
    * Parses a column containing a JSON string into a `StructType` with the specified schema.
    * Returns `null`, in the case of an unparseable string.
    *
@@ -3006,8 +3036,21 @@ object functions {
     from_json(e, schema, Map.empty[String, String])
 
   /**
-   * Parses a column containing a JSON string into a `StructType` with the specified schema.
-   * Returns `null`, in the case of an unparseable string.
+   * Parses a column containing a JSON string into a `StructType` or `ArrayType`
+   * with the specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e a string column containing JSON data.
+   * @param schema the schema to use when parsing the json string
+   *
+   * @group collection_funcs
+   * @since 2.2.0
+   */
+  def from_json(e: Column, schema: DataType): Column =
+    from_json(e, schema, Map.empty[String, String])
+
+  /**
+   * Parses a column containing a JSON string into a `StructType` or `ArrayType`
+   * with the specified schema. Returns `null`, in the case of an unparseable string.
    *
    * @param e a string column containing JSON data.
    * @param schema the schema to use when parsing the json string as a json string
@@ -3016,8 +3059,7 @@ object functions {
    * @since 2.1.0
    */
   def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
-    from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)
-
+    from_json(e, DataType.fromJson(schema), options)
 
   /**
    * (Scala-specific) Converts a column containing a `StructType` into a JSON string with the

http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 9c39b3c..953d161 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.functions.{from_json, struct, to_json}
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{CalendarIntervalType, IntegerType, StructType, TimestampType}
+import org.apache.spark.sql.types._
 
 class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
   import testImplicits._
@@ -133,6 +133,29 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
       Row(null) :: Nil)
   }
 
+  test("from_json invalid schema") {
+    val df = Seq("""{"a" 1}""").toDS()
+    val schema = ArrayType(StringType)
+    val message = intercept[AnalysisException] {
+      df.select(from_json($"value", schema))
+    }.getMessage
+
+    assert(message.contains(
+      "Input schema array<string> must be a struct or an array of structs."))
+  }
+
+  test("from_json array support") {
+    val df = Seq("""[{"a": 1, "b": "a"}, {"a": 2}, { }]""").toDS()
+    val schema = ArrayType(
+      StructType(
+        StructField("a", IntegerType) ::
+        StructField("b", StringType) :: Nil))
+
+    checkAnswer(
+      df.select(from_json($"value", schema)),
+      Row(Seq(Row(1, "a"), Row(2, null), Row(null, null))))
+  }
+
   test("to_json") {
     val df = Seq(Tuple1(Tuple1(1))).toDF("a")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org