You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/11/08 00:54:51 UTC
spark git commit: [SPARK-18295][SQL] Make to_json function null safe
(matching it to from_json)
Repository: spark
Updated Branches:
refs/heads/master 3a710b94b -> 3eda05703
[SPARK-18295][SQL] Make to_json function null safe (matching it to from_json)
## What changes were proposed in this pull request?
This PR proposes to match up the behaviour of `to_json` to `from_json` function for null-safety.
Currently, it throws `NullPointException` but this PR fixes this to produce `null` instead.
with the data below:
```scala
import spark.implicits._
val df = Seq(Some(Tuple1(Tuple1(1))), None).toDF("a")
df.show()
```
```
+----+
| a|
+----+
| [1]|
|null|
+----+
```
the codes below
```scala
import org.apache.spark.sql.functions._
df.select(to_json($"a")).show()
```
produces..
**Before**
throws `NullPointException` as below:
```
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.json.JacksonGenerator.org$apache$spark$sql$catalyst$json$JacksonGenerator$$writeFields(JacksonGenerator.scala:138)
at org.apache.spark.sql.catalyst.json.JacksonGenerator$$anonfun$write$1.apply$mcV$sp(JacksonGenerator.scala:194)
at org.apache.spark.sql.catalyst.json.JacksonGenerator.org$apache$spark$sql$catalyst$json$JacksonGenerator$$writeObject(JacksonGenerator.scala:131)
at org.apache.spark.sql.catalyst.json.JacksonGenerator.write(JacksonGenerator.scala:193)
at org.apache.spark.sql.catalyst.expressions.StructToJson.eval(jsonExpressions.scala:544)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:142)
at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:48)
at org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:30)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
```
**After**
```
+---------------+
|structtojson(a)|
+---------------+
| {"_1":1}|
| null|
+---------------+
```
## How was this patch tested?
Unit test in `JsonExpressionsSuite.scala` and `JsonFunctionsSuite.scala`.
Author: hyukjinkwon <gu...@gmail.com>
Closes #15792 from HyukjinKwon/SPARK-18295.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3eda0570
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3eda0570
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3eda0570
Branch: refs/heads/master
Commit: 3eda05703f02413540f180ade01f0f114e70b9cc
Parents: 3a710b9
Author: hyukjinkwon <gu...@gmail.com>
Authored: Mon Nov 7 16:54:40 2016 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Nov 7 16:54:40 2016 -0800
----------------------------------------------------------------------
.../sql/catalyst/expressions/jsonExpressions.scala | 14 +++++---------
.../catalyst/expressions/JsonExpressionsSuite.scala | 13 +++++++++++--
.../org/apache/spark/sql/JsonFunctionsSuite.scala | 14 ++++++++++++++
3 files changed, 30 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3eda0570/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 89fe7c4..b61583d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -484,7 +484,7 @@ case class JsonTuple(children: Seq[Expression])
* Converts an json input string to a [[StructType]] with the specified schema.
*/
case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
- extends Expression with CodegenFallback with ExpectsInputTypes {
+ extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true
@transient
@@ -495,11 +495,8 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child:
new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
override def dataType: DataType = schema
- override def children: Seq[Expression] = child :: Nil
- override def eval(input: InternalRow): Any = {
- val json = child.eval(input)
- if (json == null) return null
+ override def nullSafeEval(json: Any): Any = {
try parser.parse(json.toString).head catch {
case _: SparkSQLJsonProcessingException => null
}
@@ -512,7 +509,7 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child:
* Converts a [[StructType]] to a json output string.
*/
case class StructToJson(options: Map[String, String], child: Expression)
- extends Expression with CodegenFallback with ExpectsInputTypes {
+ extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true
@transient
@@ -523,7 +520,6 @@ case class StructToJson(options: Map[String, String], child: Expression)
new JacksonGenerator(child.dataType.asInstanceOf[StructType], writer)
override def dataType: DataType = StringType
- override def children: Seq[Expression] = child :: Nil
override def checkInputDataTypes(): TypeCheckResult = {
if (StructType.acceptsType(child.dataType)) {
@@ -540,8 +536,8 @@ case class StructToJson(options: Map[String, String], child: Expression)
}
}
- override def eval(input: InternalRow): Any = {
- gen.write(child.eval(input).asInstanceOf[InternalRow])
+ override def nullSafeEval(row: Any): Any = {
+ gen.write(row.asInstanceOf[InternalRow])
gen.flush()
val json = writer.toString
writer.reset()
http://git-wip-us.apache.org/repos/asf/spark/blob/3eda0570/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 3bfa0bf..3b0e908 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.ParseModes
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -347,7 +347,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("from_json null input column") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
- JsonToStruct(schema, Map.empty, Literal(null)),
+ JsonToStruct(schema, Map.empty, Literal.create(null, StringType)),
null
)
}
@@ -360,4 +360,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
"""{"a":1}"""
)
}
+
+ test("to_json null input column") {
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ val struct = Literal.create(null, schema)
+ checkEvaluation(
+ StructToJson(Map.empty, struct),
+ null
+ )
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3eda0570/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 59ae889..7d63d31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -141,4 +141,18 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
assert(e.getMessage.contains(
"Unable to convert column a of type calendarinterval to JSON."))
}
+
+ test("roundtrip in to_json and from_json") {
+ val dfOne = Seq(Some(Tuple1(Tuple1(1))), None).toDF("struct")
+ val schemaOne = dfOne.schema(0).dataType.asInstanceOf[StructType]
+ val readBackOne = dfOne.select(to_json($"struct").as("json"))
+ .select(from_json($"json", schemaOne).as("struct"))
+ checkAnswer(dfOne, readBackOne)
+
+ val dfTwo = Seq(Some("""{"a":1}"""), None).toDF("json")
+ val schemaTwo = new StructType().add("a", IntegerType)
+ val readBackTwo = dfTwo.select(from_json($"json", schemaTwo).as("struct"))
+ .select(to_json($"struct").as("json"))
+ checkAnswer(dfTwo, readBackTwo)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org