You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/09/12 00:24:08 UTC

git commit: [SPARK-3390][SQL] sqlContext.jsonRDD fails on a complex structure of JSON array and JSON object nesting

Repository: spark
Updated Branches:
  refs/heads/master ca83f1e2c -> 4bc9e046c


[SPARK-3390][SQL] sqlContext.jsonRDD fails on a complex structure of JSON array and JSON object nesting

This PR aims to correctly handle JSON arrays in the type of `ArrayType(...(ArrayType(StructType)))`.

JIRA: https://issues.apache.org/jira/browse/SPARK-3390.

Author: Yin Huai <hu...@cse.ohio-state.edu>

Closes #2364 from yhuai/SPARK-3390 and squashes the following commits:

46db418 [Yin Huai] Handle JSON arrays in the type of ArrayType(...(ArrayType(StructType))).


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4bc9e046
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4bc9e046
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4bc9e046

Branch: refs/heads/master
Commit: 4bc9e046cb8922923dff254e3e621fb4de656f98
Parents: ca83f1e
Author: Yin Huai <hu...@cse.ohio-state.edu>
Authored: Thu Sep 11 15:23:33 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Sep 11 15:23:33 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/json/JsonRDD.scala     | 66 ++++++++++++--------
 .../org/apache/spark/sql/json/JsonSuite.scala   | 29 ++++++++-
 .../apache/spark/sql/json/TestJsonData.scala    | 30 ++++++++-
 3 files changed, 96 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4bc9e046/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 70062ea..8732218 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -68,8 +68,15 @@ private[sql] object JsonRDD extends Logging {
       val (topLevel, structLike) = values.partition(_.size == 1)
       val topLevelFields = topLevel.filter {
         name => resolved.get(prefix ++ name).get match {
-          case ArrayType(StructType(Nil), _) => false
-          case ArrayType(_, _) => true
+          case ArrayType(elementType, _) => {
+            def hasInnerStruct(t: DataType): Boolean = t match {
+              case s: StructType => false
+              case ArrayType(t1, _) => hasInnerStruct(t1)
+              case o => true
+            }
+
+            hasInnerStruct(elementType)
+          }
           case struct: StructType => false
           case _ => true
         }
@@ -84,7 +91,18 @@ private[sql] object JsonRDD extends Logging {
           val dataType = resolved.get(prefix :+ name).get
           dataType match {
             case array: ArrayType =>
-              Some(StructField(name, ArrayType(structType, array.containsNull), nullable = true))
+              // The pattern of this array is ArrayType(...(ArrayType(StructType))).
+              // Since the inner struct of array is a placeholder (StructType(Nil)),
+              // we need to replace this placeholder with the actual StructType (structType).
+              def getActualArrayType(
+                  innerStruct: StructType,
+                  currentArray: ArrayType): ArrayType = currentArray match {
+                case ArrayType(s: StructType, containsNull) =>
+                  ArrayType(innerStruct, containsNull)
+                case ArrayType(a: ArrayType, containsNull) =>
+                  ArrayType(getActualArrayType(innerStruct, a), containsNull)
+              }
+              Some(StructField(name, getActualArrayType(structType, array), nullable = true))
             case struct: StructType => Some(StructField(name, structType, nullable = true))
             // dataType is StringType means that we have resolved type conflicts involving
             // primitive types and complex types. So, the type of name has been relaxed to
@@ -168,8 +186,7 @@ private[sql] object JsonRDD extends Logging {
   /**
    * Returns the element type of an JSON array. We go through all elements of this array
    * to detect any possible type conflict. We use [[compatibleType]] to resolve
-   * type conflicts. Right now, when the element of an array is another array, we
-   * treat the element as String.
+   * type conflicts.
    */
   private def typeOfArray(l: Seq[Any]): ArrayType = {
     val containsNull = l.exists(v => v == null)
@@ -216,18 +233,24 @@ private[sql] object JsonRDD extends Logging {
       }
       case (key: String, array: Seq[_]) => {
         // The value associated with the key is an array.
-        typeOfArray(array) match {
+        // Handle inner structs of an array.
+        def buildKeyPathForInnerStructs(v: Any, t: DataType): Seq[(String, DataType)] = t match {
           case ArrayType(StructType(Nil), containsNull) => {
             // The elements of this arrays are structs.
-            array.asInstanceOf[Seq[Map[String, Any]]].flatMap {
+            v.asInstanceOf[Seq[Map[String, Any]]].flatMap {
               element => allKeysWithValueTypes(element)
             }.map {
-              case (k, dataType) => (s"$key.$k", dataType)
-            } :+ (key, ArrayType(StructType(Nil), containsNull))
+              case (k, t) => (s"$key.$k", t)
+            }
           }
-          case ArrayType(elementType, containsNull) =>
-            (key, ArrayType(elementType, containsNull)) :: Nil
+          case ArrayType(t1, containsNull) =>
+            v.asInstanceOf[Seq[Any]].flatMap {
+              element => buildKeyPathForInnerStructs(element, t1)
+            }
+          case other => Nil
         }
+        val elementType = typeOfArray(array)
+        buildKeyPathForInnerStructs(array, elementType) :+ (key, elementType)
       }
       case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil
     }
@@ -339,8 +362,6 @@ private[sql] object JsonRDD extends Logging {
       null
     } else {
       desiredType match {
-        case ArrayType(elementType, _) =>
-          value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
         case StringType => toString(value)
         case IntegerType => value.asInstanceOf[IntegerType.JvmType]
         case LongType => toLong(value)
@@ -348,6 +369,10 @@ private[sql] object JsonRDD extends Logging {
         case DecimalType => toDecimal(value)
         case BooleanType => value.asInstanceOf[BooleanType.JvmType]
         case NullType => null
+
+        case ArrayType(elementType, _) =>
+          value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
+        case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct)
       }
     }
   }
@@ -356,22 +381,9 @@ private[sql] object JsonRDD extends Logging {
     // TODO: Reuse the row instead of creating a new one for every record.
     val row = new GenericMutableRow(schema.fields.length)
     schema.fields.zipWithIndex.foreach {
-      // StructType
-      case (StructField(name, fields: StructType, _), i) =>
-        row.update(i, json.get(name).flatMap(v => Option(v)).map(
-          v => asRow(v.asInstanceOf[Map[String, Any]], fields)).orNull)
-
-      // ArrayType(StructType)
-      case (StructField(name, ArrayType(structType: StructType, _), _), i) =>
-        row.update(i,
-          json.get(name).flatMap(v => Option(v)).map(
-            v => v.asInstanceOf[Seq[Any]].map(
-              e => asRow(e.asInstanceOf[Map[String, Any]], structType))).orNull)
-
-      // Other cases
       case (StructField(name, dataType, _), i) =>
         row.update(i, json.get(name).flatMap(v => Option(v)).map(
-          enforceCorrectType(_, dataType)).getOrElse(null))
+          enforceCorrectType(_, dataType)).orNull)
     }
 
     row

http://git-wip-us.apache.org/repos/asf/spark/blob/4bc9e046/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 301d482..b50d938 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -591,8 +591,35 @@ class JsonSuite extends QueryTest {
       (true, "str1") :: Nil
     )
     checkAnswer(
-      sql("select complexArrayOfStruct[0].field1[1].inner2[0], complexArrayOfStruct[1].field2[0][1] from jsonTable"),
+      sql(
+        """
+          |select complexArrayOfStruct[0].field1[1].inner2[0], complexArrayOfStruct[1].field2[0][1]
+          |from jsonTable
+        """.stripMargin),
       ("str2", 6) :: Nil
     )
   }
+
+  test("SPARK-3390 Complex arrays") {
+    val jsonSchemaRDD = jsonRDD(complexFieldAndType2)
+    jsonSchemaRDD.registerTempTable("jsonTable")
+
+    checkAnswer(
+      sql(
+        """
+          |select arrayOfArray1[0][0][0], arrayOfArray1[1][0][1], arrayOfArray1[1][1][0]
+          |from jsonTable
+        """.stripMargin),
+      (5, 7, 8) :: Nil
+    )
+    checkAnswer(
+      sql(
+        """
+          |select arrayOfArray2[0][0][0].inner1, arrayOfArray2[1][0],
+          |arrayOfArray2[1][1][1].inner2[0], arrayOfArray2[2][0][0].inner3[0][0].inner4
+          |from jsonTable
+        """.stripMargin),
+      ("str1", Nil, "str4", 2) :: Nil
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4bc9e046/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
index b3f95f0..5f0b395 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
@@ -106,6 +106,34 @@ object TestJsonData {
               "inner1": "str4"
             }],
             "field2": [[5, 6], [7, 8]]
-          }]
+          }],
+          "arrayOfArray1": [
+          [
+            [5]
+          ],
+          [
+            [6, 7],
+            [8]
+          ]],
+          "arrayOfArray2": [
+          [
+            [
+              {
+                "inner1": "str1"
+              }
+            ]
+          ],
+          [
+            [],
+            [
+              {"inner2": ["str3", "str33"]},
+              {"inner2": ["str4"], "inner1": "str11"}
+            ]
+          ],
+          [
+            [
+              {"inner3": [[{"inner4": 2}]]}
+            ]
+          ]]
       }""" :: Nil)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org