You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2017/03/05 22:35:23 UTC
spark git commit: [SPARK-19595][SQL] Support json array in from_json
Repository: spark
Updated Branches:
refs/heads/master 80d5338b3 -> 369a148e5
[SPARK-19595][SQL] Support json array in from_json
## What changes were proposed in this pull request?
This PR proposes to both,
**Do not allow json arrays with multiple elements and return null in `from_json` with `StructType` as the schema.**
Currently, it only reads the single row when the input is a json array. So, the codes below:
```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = StructType(StructField("a", IntegerType) :: Nil)
Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("struct").select(from_json(col("struct"), schema)).show()
```
prints
```
+--------------------+
|jsontostruct(struct)|
+--------------------+
| [1]|
+--------------------+
```
This PR simply suggests to print this as `null` if the schema is `StructType` and input is json array.with multiple elements
```
+--------------------+
|jsontostruct(struct)|
+--------------------+
| null|
+--------------------+
```
**Support json arrays in `from_json` with `ArrayType` as the schema.**
```scala
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
Seq(("""[{"a": 1}, {"a": 2}]""")).toDF("array").select(from_json(col("array"), schema)).show()
```
prints
```
+-------------------+
|jsontostruct(array)|
+-------------------+
| [[1], [2]]|
+-------------------+
```
## How was this patch tested?
Unit test in `JsonExpressionsSuite`, `JsonFunctionsSuite`, Python doctests and manual test.
Author: hyukjinkwon <gu...@gmail.com>
Closes #16929 from HyukjinKwon/disallow-array.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/369a148e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/369a148e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/369a148e
Branch: refs/heads/master
Commit: 369a148e591bb16ec7da54867610b207602cd698
Parents: 80d5338
Author: hyukjinkwon <gu...@gmail.com>
Authored: Sun Mar 5 14:35:06 2017 -0800
Committer: Burak Yavuz <br...@gmail.com>
Committed: Sun Mar 5 14:35:06 2017 -0800
----------------------------------------------------------------------
python/pyspark/sql/functions.py | 11 +++-
.../catalyst/expressions/jsonExpressions.scala | 57 ++++++++++++++++---
.../expressions/JsonExpressionsSuite.scala | 58 +++++++++++++++++++-
.../scala/org/apache/spark/sql/functions.scala | 52 ++++++++++++++++--
.../apache/spark/sql/JsonFunctionsSuite.scala | 25 ++++++++-
5 files changed, 186 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 426a4a8..376b86e 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1773,11 +1773,11 @@ def json_tuple(col, *fields):
@since(2.1)
def from_json(col, schema, options={}):
"""
- Parses a column containing a JSON string into a [[StructType]] with the
- specified schema. Returns `null`, in the case of an unparseable string.
+ Parses a column containing a JSON string into a [[StructType]] or [[ArrayType]]
+ with the specified schema. Returns `null`, in the case of an unparseable string.
:param col: string column in json format
- :param schema: a StructType to use when parsing the json column
+ :param schema: a StructType or ArrayType to use when parsing the json column
:param options: options to control parsing. accepts the same options as the json datasource
>>> from pyspark.sql.types import *
@@ -1786,6 +1786,11 @@ def from_json(col, schema, options={}):
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=Row(a=1))]
+ >>> data = [(1, '''[{"a": 1}]''')]
+ >>> schema = ArrayType(StructType([StructField("a", IntegerType())]))
+ >>> df = spark.createDataFrame(data, ("key", "value"))
+ >>> df.select(from_json(df.value, schema).alias("json")).collect()
+ [Row(json=[Row(a=1)])]
"""
sc = SparkContext._active_spark_context
http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 1e690a4..dbff62e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json._
-import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.catalyst.util.{GenericArrayData, ParseModes}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -480,23 +480,45 @@ case class JsonTuple(children: Seq[Expression])
}
/**
- * Converts an json input string to a [[StructType]] with the specified schema.
+ * Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema.
*/
case class JsonToStruct(
- schema: StructType,
+ schema: DataType,
options: Map[String, String],
child: Expression,
timeZoneId: Option[String] = None)
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true
- def this(schema: StructType, options: Map[String, String], child: Expression) =
+ def this(schema: DataType, options: Map[String, String], child: Expression) =
this(schema, options, child, None)
+ override def checkInputDataTypes(): TypeCheckResult = schema match {
+ case _: StructType | ArrayType(_: StructType, _) =>
+ super.checkInputDataTypes()
+ case _ => TypeCheckResult.TypeCheckFailure(
+ s"Input schema ${schema.simpleString} must be a struct or an array of structs.")
+ }
+
+ @transient
+ lazy val rowSchema = schema match {
+ case st: StructType => st
+ case ArrayType(st: StructType, _) => st
+ }
+
+ // This converts parsed rows to the desired output by the given schema.
+ @transient
+ lazy val converter = schema match {
+ case _: StructType =>
+ (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+ case ArrayType(_: StructType, _) =>
+ (rows: Seq[InternalRow]) => new GenericArrayData(rows)
+ }
+
@transient
lazy val parser =
new JacksonParser(
- schema,
+ rowSchema,
new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
override def dataType: DataType = schema
@@ -505,11 +527,32 @@ case class JsonToStruct(
copy(timeZoneId = Option(timeZoneId))
override def nullSafeEval(json: Any): Any = {
+ // When input is,
+ // - `null`: `null`.
+ // - invalid json: `null`.
+ // - empty string: `null`.
+ //
+ // When the schema is array,
+ // - json array: `Array(Row(...), ...)`
+ // - json object: `Array(Row(...))`
+ // - empty json array: `Array()`.
+ // - empty json object: `Array(Row(null))`.
+ //
+ // When the schema is a struct,
+ // - json object/array with single element: `Row(...)`
+ // - json array with multiple elements: `null`
+ // - empty json array: `null`.
+ // - empty json object: `Row(null)`.
+
+ // We need `null` if the input string is an empty string. `JacksonParser` can
+ // deal with this but produces `Nil`.
+ if (json.toString.trim.isEmpty) return null
+
try {
- parser.parse(
+ converter(parser.parse(
json.asInstanceOf[UTF8String],
CreateJacksonParser.utf8String,
- identity[UTF8String]).headOption.orNull
+ identity[UTF8String]))
} catch {
case _: SparkSQLJsonProcessingException => null
}
http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 0c46819..e358490 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -22,7 +22,7 @@ import java.util.Calendar
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils, ParseModes}
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType}
+import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -372,6 +372,62 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
)
}
+ test("from_json - input=array, schema=array, output=array") {
+ val input = """[{"a": 1}, {"a": 2}]"""
+ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val output = InternalRow(1) :: InternalRow(2) :: Nil
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=object, schema=array, output=array of single row") {
+ val input = """{"a": 1}"""
+ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val output = InternalRow(1) :: Nil
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=empty array, schema=array, output=empty array") {
+ val input = "[ ]"
+ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val output = Nil
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=empty object, schema=array, output=array of single row with null") {
+ val input = "{ }"
+ val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
+ val output = InternalRow(null) :: Nil
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=array of single object, schema=struct, output=single row") {
+ val input = """[{"a": 1}]"""
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ val output = InternalRow(1)
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=array, schema=struct, output=null") {
+ val input = """[{"a": 1}, {"a": 2}]"""
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ val output = null
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=empty array, schema=struct, output=null") {
+ val input = """[]"""
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ val output = null
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
+ test("from_json - input=empty object, schema=struct, output=single row with null") {
+ val input = """{ }"""
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ val output = InternalRow(null)
+ checkEvaluation(JsonToStruct(schema, Map.empty, Literal(input), gmtId), output)
+ }
+
test("from_json null input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 2247010..201f726 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2973,7 +2973,22 @@ object functions {
* @group collection_funcs
* @since 2.1.0
*/
- def from_json(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
+ def from_json(e: Column, schema: StructType, options: Map[String, String]): Column =
+ from_json(e, schema.asInstanceOf[DataType], options)
+
+ /**
+ * (Scala-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
+ * with the specified schema. Returns `null`, in the case of an unparseable string.
+ *
+ * @param e a string column containing JSON data.
+ * @param schema the schema to use when parsing the json string
+ * @param options options to control how the json is parsed. accepts the same options and the
+ * json data source.
+ *
+ * @group collection_funcs
+ * @since 2.2.0
+ */
+ def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr {
JsonToStruct(schema, options, e.expr)
}
@@ -2993,6 +3008,21 @@ object functions {
from_json(e, schema, options.asScala.toMap)
/**
+ * (Java-specific) Parses a column containing a JSON string into a `StructType` or `ArrayType`
+ * with the specified schema. Returns `null`, in the case of an unparseable string.
+ *
+ * @param e a string column containing JSON data.
+ * @param schema the schema to use when parsing the json string
+ * @param options options to control how the json is parsed. accepts the same options and the
+ * json data source.
+ *
+ * @group collection_funcs
+ * @since 2.2.0
+ */
+ def from_json(e: Column, schema: DataType, options: java.util.Map[String, String]): Column =
+ from_json(e, schema, options.asScala.toMap)
+
+ /**
* Parses a column containing a JSON string into a `StructType` with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
@@ -3006,8 +3036,21 @@ object functions {
from_json(e, schema, Map.empty[String, String])
/**
- * Parses a column containing a JSON string into a `StructType` with the specified schema.
- * Returns `null`, in the case of an unparseable string.
+ * Parses a column containing a JSON string into a `StructType` or `ArrayType`
+ * with the specified schema. Returns `null`, in the case of an unparseable string.
+ *
+ * @param e a string column containing JSON data.
+ * @param schema the schema to use when parsing the json string
+ *
+ * @group collection_funcs
+ * @since 2.2.0
+ */
+ def from_json(e: Column, schema: DataType): Column =
+ from_json(e, schema, Map.empty[String, String])
+
+ /**
+ * Parses a column containing a JSON string into a `StructType` or `ArrayType`
+ * with the specified schema. Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string as a json string
@@ -3016,8 +3059,7 @@ object functions {
* @since 2.1.0
*/
def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
- from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)
-
+ from_json(e, DataType.fromJson(schema), options)
/**
* (Scala-specific) Converts a column containing a `StructType` into a JSON string with the
http://git-wip-us.apache.org/repos/asf/spark/blob/369a148e/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 9c39b3c..953d161 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
import org.apache.spark.sql.functions.{from_json, struct, to_json}
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{CalendarIntervalType, IntegerType, StructType, TimestampType}
+import org.apache.spark.sql.types._
class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
import testImplicits._
@@ -133,6 +133,29 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row(null) :: Nil)
}
+ test("from_json invalid schema") {
+ val df = Seq("""{"a" 1}""").toDS()
+ val schema = ArrayType(StringType)
+ val message = intercept[AnalysisException] {
+ df.select(from_json($"value", schema))
+ }.getMessage
+
+ assert(message.contains(
+ "Input schema array<string> must be a struct or an array of structs."))
+ }
+
+ test("from_json array support") {
+ val df = Seq("""[{"a": 1, "b": "a"}, {"a": 2}, { }]""").toDS()
+ val schema = ArrayType(
+ StructType(
+ StructField("a", IntegerType) ::
+ StructField("b", StringType) :: Nil))
+
+ checkAnswer(
+ df.select(from_json($"value", schema)),
+ Row(Seq(Row(1, "a"), Row(2, null), Row(null, null))))
+ }
+
test("to_json") {
val df = Seq(Tuple1(Tuple1(1))).toDF("a")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org