You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/06/20 01:19:36 UTC

spark git commit: [SPARK-8093] [SQL] Remove empty structs inferred from JSON documents

Repository: spark
Updated Branches:
  refs/heads/master 1fa29c2df -> 9814b971f


[SPARK-8093] [SQL] Remove empty structs inferred from JSON documents

Author: Nathan Howell <nh...@godaddy.com>

Closes #6799 from NathanHowell/spark-8093 and squashes the following commits:

76ac3e8 [Nathan Howell] [SPARK-8093] [SQL] Remove empty structs inferred from JSON documents


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9814b971
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9814b971
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9814b971

Branch: refs/heads/master
Commit: 9814b971f07dff8a99f1b8ad2adf70614f1c690b
Parents: 1fa29c2
Author: Nathan Howell <nh...@godaddy.com>
Authored: Fri Jun 19 16:19:28 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Jun 19 16:19:28 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/json/InferSchema.scala | 52 +++++++++++++-------
 .../org/apache/spark/sql/json/JsonSuite.scala   |  4 ++
 .../apache/spark/sql/json/TestJsonData.scala    |  9 ++++
 3 files changed, 48 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9814b971/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
index 565d102..afe2c6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
@@ -43,7 +43,7 @@ private[sql] object InferSchema {
     }
 
     // perform schema inference on each row and merge afterwards
-    schemaData.mapPartitions { iter =>
+    val rootType = schemaData.mapPartitions { iter =>
       val factory = new JsonFactory()
       iter.map { row =>
         try {
@@ -55,8 +55,13 @@ private[sql] object InferSchema {
             StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
         }
       }
-    }.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType) match {
-      case st: StructType => nullTypeToStringType(st)
+    }.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType)
+
+    canonicalizeType(rootType) match {
+      case Some(st: StructType) => st
+      case _ =>
+        // canonicalizeType erases all empty structs, including the only one we want to keep
+        StructType(Seq())
     }
   }
 
@@ -116,22 +121,35 @@ private[sql] object InferSchema {
     }
   }
 
-  private def nullTypeToStringType(struct: StructType): StructType = {
-    val fields = struct.fields.map {
-      case StructField(fieldName, dataType, nullable, _) =>
-        val newType = dataType match {
-          case NullType => StringType
-          case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull)
-          case ArrayType(struct: StructType, containsNull) =>
-            ArrayType(nullTypeToStringType(struct), containsNull)
-          case struct: StructType => nullTypeToStringType(struct)
-          case other: DataType => other
-        }
+  /**
+   * Convert NullType to StringType and remove StructTypes with no fields
+   */
+  private def canonicalizeType: DataType => Option[DataType] = {
+    case at@ArrayType(elementType, _) =>
+      for {
+        canonicalType <- canonicalizeType(elementType)
+      } yield {
+        at.copy(canonicalType)
+      }
 
-        StructField(fieldName, newType, nullable)
-    }
+    case StructType(fields) =>
+      val canonicalFields = for {
+        field <- fields
+        if field.name.nonEmpty
+        canonicalType <- canonicalizeType(field.dataType)
+      } yield {
+        field.copy(dataType = canonicalType)
+      }
+
+      if (canonicalFields.nonEmpty) {
+        Some(StructType(canonicalFields))
+      } else {
+        // per SPARK-8093: empty structs should be deleted
+        None
+      }
 
-    StructType(fields)
+    case NullType => Some(StringType)
+    case other => Some(other)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9814b971/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 945d437..c32d9f8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -1103,4 +1103,8 @@ class JsonSuite extends QueryTest with TestJsonData {
     }
   }
 
+  test("SPARK-8093 Erase empty structs") {
+    val emptySchema = InferSchema(emptyRecords, 1.0, "")
+    assert(StructType(Seq()) === emptySchema)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9814b971/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
index b6a6a8d..eb62066 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
@@ -189,5 +189,14 @@ trait TestJsonData {
       """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
       """]""" :: Nil)
 
+  def emptyRecords: RDD[String] =
+    ctx.sparkContext.parallelize(
+      """{""" ::
+        """""" ::
+        """{"a": {}}""" ::
+        """{"a": {"b": {}}}""" ::
+        """{"b": [{"c": {}}]}""" ::
+        """]""" :: Nil)
+
   def empty: RDD[String] = ctx.sparkContext.parallelize(Seq[String]())
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org