You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/10 07:34:23 UTC

[spark] branch master updated: [SPARK-31065][SQL] Match schema_of_json to the schema inference of JSON data source

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 815c792  [SPARK-31065][SQL] Match schema_of_json to the schema inference of JSON data source
815c792 is described below

commit 815c7929c290d6eed86dc5c924f9f7d48cff179d
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Tue Mar 10 00:33:32 2020 -0700

    [SPARK-31065][SQL] Match schema_of_json to the schema inference of JSON data source
    
    ### What changes were proposed in this pull request?
    
    This PR proposes two things:
    
    1. Convert `null` to `string` type during schema inference of `schema_of_json` as JSON datasource does. This is a bug fix as well because `null` string is not the proper DDL formatted string and it is unable for SQL parser to recognise it as a type string. We should match it to JSON datasource and return a string type so `schema_of_json` returns a proper DDL formatted string.
    
    2. Let `schema_of_json` respect `dropFieldIfAllNull` option during schema inference.
    
    ### Why are the changes needed?
    
    To let `schema_of_json` return a proper DDL formatted string, and respect `dropFieldIfAllNull` option.
    
    ### Does this PR introduce any user-facing change?
    Yes, it does.
    
    ```scala
    import collection.JavaConverters._
    import org.apache.spark.sql.functions._
    
    spark.range(1).select(schema_of_json(lit("""{"id": ""}"""))).show()
    spark.range(1).select(schema_of_json(lit("""{"id": "a", "drop": {"drop": null}}"""), Map("dropFieldIfAllNull" -> "true").asJava)).show(false)
    ```
    
    **Before:**
    
    ```
    struct<id:null>
    struct<drop:struct<drop:null>,id:string>
    ```
    
    **After:**
    
    ```
    struct<id:string>
    struct<id:string>
    ```
    
    ### How was this patch tested?
    
    Manually tested, and unittests were added.
    
    Closes #27854 from HyukjinKwon/SPARK-31065.
    
    Authored-by: HyukjinKwon <gu...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../sql/catalyst/expressions/jsonExpressions.scala | 13 +++++++-
 .../spark/sql/catalyst/json/JsonInferSchema.scala  | 13 ++++----
 .../org/apache/spark/sql/JsonFunctionsSuite.scala  | 36 ++++++++++++++++++++++
 3 files changed, 54 insertions(+), 8 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index aa4b464..4c2a511 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -777,7 +777,18 @@ case class SchemaOfJson(
   override def eval(v: InternalRow): Any = {
     val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser =>
       parser.nextToken()
-      jsonInferSchema.inferField(parser)
+      // To match with schema inference from JSON datasource.
+      jsonInferSchema.inferField(parser) match {
+        case st: StructType =>
+          jsonInferSchema.canonicalizeType(st, jsonOptions).getOrElse(StructType(Nil))
+        case at: ArrayType if at.elementType.isInstanceOf[StructType] =>
+          jsonInferSchema
+            .canonicalizeType(at.elementType, jsonOptions)
+            .map(ArrayType(_, containsNull = at.containsNull))
+            .getOrElse(ArrayType(StructType(Nil), containsNull = at.containsNull))
+        case other: DataType =>
+          jsonInferSchema.canonicalizeType(other, jsonOptions).getOrElse(StringType)
+      }
     }
 
     UTF8String.fromString(dt.catalogString)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 82dd6d0..3dd8694 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -92,12 +92,10 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
     }
     json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition, mergeResult)
 
-    canonicalizeType(rootType, options) match {
-      case Some(st: StructType) => st
-      case _ =>
-        // canonicalizeType erases all empty structs, including the only one we want to keep
-        StructType(Nil)
-    }
+    canonicalizeType(rootType, options)
+      .find(_.isInstanceOf[StructType])
+      // canonicalizeType erases all empty structs, including the only one we want to keep
+      .getOrElse(StructType(Nil)).asInstanceOf[StructType]
   }
 
   /**
@@ -198,7 +196,8 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
    * Recursively canonicalizes inferred types, e.g., removes StructTypes with no fields,
    * drops NullTypes or converts them to StringType based on provided options.
    */
-  private def canonicalizeType(tpe: DataType, options: JSONOptions): Option[DataType] = tpe match {
+  private[catalyst] def canonicalizeType(
+      tpe: DataType, options: JSONOptions): Option[DataType] = tpe match {
     case at: ArrayType =>
       canonicalizeType(at.elementType, options)
         .map(t => at.copy(elementType = t))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index ebc2f57..65e1dde 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -674,4 +674,40 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
       spark.range(1).select(schema_of_json(input)),
       Seq(Row("struct<id:bigint,price:double>")))
   }
+
+  test("SPARK-31065: schema_of_json - null and empty strings as strings") {
+    Seq("""{"id": null}""", """{"id": ""}""").foreach { input =>
+      checkAnswer(
+        spark.range(1).select(schema_of_json(input)),
+        Seq(Row("struct<id:string>")))
+    }
+  }
+
+  test("SPARK-31065: schema_of_json - 'dropFieldIfAllNull' option") {
+    val options = Map("dropFieldIfAllNull" -> "true")
+    // Structs
+    checkAnswer(
+      spark.range(1).select(
+        schema_of_json(
+          lit("""{"id": "a", "drop": {"drop": null}}"""),
+          options.asJava)),
+      Seq(Row("struct<id:string>")))
+
+    // Array of structs
+    checkAnswer(
+      spark.range(1).select(
+        schema_of_json(
+          lit("""[{"id": "a", "drop": {"drop": null}}]"""),
+          options.asJava)),
+      Seq(Row("array<struct<id:string>>")))
+
+    // Other types are not affected.
+    checkAnswer(
+      spark.range(1).select(
+        schema_of_json(
+          lit("""null"""),
+          options.asJava)),
+      Seq(Row("string")))
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org