You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/06/20 01:19:36 UTC
spark git commit: [SPARK-8093] [SQL] Remove empty structs inferred
from JSON documents
Repository: spark
Updated Branches:
refs/heads/master 1fa29c2df -> 9814b971f
[SPARK-8093] [SQL] Remove empty structs inferred from JSON documents
Author: Nathan Howell <nh...@godaddy.com>
Closes #6799 from NathanHowell/spark-8093 and squashes the following commits:
76ac3e8 [Nathan Howell] [SPARK-8093] [SQL] Remove empty structs inferred from JSON documents
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9814b971
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9814b971
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9814b971
Branch: refs/heads/master
Commit: 9814b971f07dff8a99f1b8ad2adf70614f1c690b
Parents: 1fa29c2
Author: Nathan Howell <nh...@godaddy.com>
Authored: Fri Jun 19 16:19:28 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Jun 19 16:19:28 2015 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/json/InferSchema.scala | 52 +++++++++++++-------
.../org/apache/spark/sql/json/JsonSuite.scala | 4 ++
.../apache/spark/sql/json/TestJsonData.scala | 9 ++++
3 files changed, 48 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9814b971/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
index 565d102..afe2c6c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala
@@ -43,7 +43,7 @@ private[sql] object InferSchema {
}
// perform schema inference on each row and merge afterwards
- schemaData.mapPartitions { iter =>
+ val rootType = schemaData.mapPartitions { iter =>
val factory = new JsonFactory()
iter.map { row =>
try {
@@ -55,8 +55,13 @@ private[sql] object InferSchema {
StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
}
}
- }.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType) match {
- case st: StructType => nullTypeToStringType(st)
+ }.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType)
+
+ canonicalizeType(rootType) match {
+ case Some(st: StructType) => st
+ case _ =>
+ // canonicalizeType erases all empty structs, including the only one we want to keep
+ StructType(Seq())
}
}
@@ -116,22 +121,35 @@ private[sql] object InferSchema {
}
}
- private def nullTypeToStringType(struct: StructType): StructType = {
- val fields = struct.fields.map {
- case StructField(fieldName, dataType, nullable, _) =>
- val newType = dataType match {
- case NullType => StringType
- case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull)
- case ArrayType(struct: StructType, containsNull) =>
- ArrayType(nullTypeToStringType(struct), containsNull)
- case struct: StructType => nullTypeToStringType(struct)
- case other: DataType => other
- }
+ /**
+ * Convert NullType to StringType and remove StructTypes with no fields
+ */
+ private def canonicalizeType: DataType => Option[DataType] = {
+ case at@ArrayType(elementType, _) =>
+ for {
+ canonicalType <- canonicalizeType(elementType)
+ } yield {
+ at.copy(canonicalType)
+ }
- StructField(fieldName, newType, nullable)
- }
+ case StructType(fields) =>
+ val canonicalFields = for {
+ field <- fields
+ if field.name.nonEmpty
+ canonicalType <- canonicalizeType(field.dataType)
+ } yield {
+ field.copy(dataType = canonicalType)
+ }
+
+ if (canonicalFields.nonEmpty) {
+ Some(StructType(canonicalFields))
+ } else {
+ // per SPARK-8093: empty structs should be deleted
+ None
+ }
- StructType(fields)
+ case NullType => Some(StringType)
+ case other => Some(other)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/9814b971/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 945d437..c32d9f8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -1103,4 +1103,8 @@ class JsonSuite extends QueryTest with TestJsonData {
}
}
+ test("SPARK-8093 Erase empty structs") {
+ val emptySchema = InferSchema(emptyRecords, 1.0, "")
+ assert(StructType(Seq()) === emptySchema)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/9814b971/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
index b6a6a8d..eb62066 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
@@ -189,5 +189,14 @@ trait TestJsonData {
"""{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
"""]""" :: Nil)
+ def emptyRecords: RDD[String] =
+ ctx.sparkContext.parallelize(
+ """{""" ::
+ """""" ::
+ """{"a": {}}""" ::
+ """{"a": {"b": {}}}""" ::
+ """{"b": [{"c": {}}]}""" ::
+ """]""" :: Nil)
+
def empty: RDD[String] = ctx.sparkContext.parallelize(Seq[String]())
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org