You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/02/24 02:16:37 UTC

spark git commit: [SPARK-5935][SQL] Accept MapType in the schema provided to a JSON dataset.

Repository: spark
Updated Branches:
  refs/heads/master 59536cc87 -> 48376bfe9


[SPARK-5935][SQL] Accept MapType in the schema provided to a JSON dataset.

JIRA: https://issues.apache.org/jira/browse/SPARK-5935

Author: Yin Huai <yh...@databricks.com>
Author: Yin Huai <hu...@cse.ohio-state.edu>

Closes #4710 from yhuai/jsonMapType and squashes the following commits:

3e40390 [Yin Huai] Remove unnecessary changes.
f8e6267 [Yin Huai] Fix test.
baa36e3 [Yin Huai] Accept MapType in the schema provided to jsonFile/jsonRDD.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/48376bfe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/48376bfe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/48376bfe

Branch: refs/heads/master
Commit: 48376bfe9c97bf31279918def6c6615849c88f4d
Parents: 59536cc
Author: Yin Huai <yh...@databricks.com>
Authored: Mon Feb 23 17:16:34 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Feb 23 17:16:34 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/json/JsonRDD.scala     |  3 ++
 .../org/apache/spark/sql/json/JsonSuite.scala   | 56 ++++++++++++++++++++
 .../apache/spark/sql/json/TestJsonData.scala    | 17 ++++++
 3 files changed, 76 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/48376bfe/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 3b8dde1..d83bdc2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -416,6 +416,9 @@ private[sql] object JsonRDD extends Logging {
         case NullType => null
         case ArrayType(elementType, _) =>
           value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
+        case MapType(StringType, valueType, _) =>
+          val map = value.asInstanceOf[Map[String, Any]]
+          map.mapValues(enforceCorrectType(_, valueType)).map(identity)
         case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct)
         case DateType => toDate(value)
         case TimestampType => toTimestamp(value)

http://git-wip-us.apache.org/repos/asf/spark/blob/48376bfe/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index c94e44b..005f20b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -657,6 +657,62 @@ class JsonSuite extends QueryTest {
     )
   }
 
+  test("Applying schemas with MapType") {
+    val schemaWithSimpleMap = StructType(
+      StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
+    val jsonWithSimpleMap = jsonRDD(mapType1, schemaWithSimpleMap)
+
+    jsonWithSimpleMap.registerTempTable("jsonWithSimpleMap")
+
+    checkAnswer(
+      sql("select map from jsonWithSimpleMap"),
+      Row(Map("a" -> 1)) ::
+      Row(Map("b" -> 2)) ::
+      Row(Map("c" -> 3)) ::
+      Row(Map("c" -> 1, "d" -> 4)) ::
+      Row(Map("e" -> null)) :: Nil
+    )
+
+    checkAnswer(
+      sql("select map['c'] from jsonWithSimpleMap"),
+      Row(null) ::
+      Row(null) ::
+      Row(3) ::
+      Row(1) ::
+      Row(null) :: Nil
+    )
+
+    val innerStruct = StructType(
+      StructField("field1", ArrayType(IntegerType, true), true) ::
+      StructField("field2", IntegerType, true) :: Nil)
+    val schemaWithComplexMap = StructType(
+      StructField("map", MapType(StringType, innerStruct, true), false) :: Nil)
+
+    val jsonWithComplexMap = jsonRDD(mapType2, schemaWithComplexMap)
+
+    jsonWithComplexMap.registerTempTable("jsonWithComplexMap")
+
+    checkAnswer(
+      sql("select map from jsonWithComplexMap"),
+      Row(Map("a" -> Row(Seq(1, 2, 3, null), null))) ::
+      Row(Map("b" -> Row(null, 2))) ::
+      Row(Map("c" -> Row(Seq(), 4))) ::
+      Row(Map("c" -> Row(null, 3), "d" -> Row(Seq(null), null))) ::
+      Row(Map("e" -> null)) ::
+      Row(Map("f" -> Row(null, null))) :: Nil
+    )
+
+    checkAnswer(
+      sql("select map['a'].field1, map['c'].field2 from jsonWithComplexMap"),
+      Row(Seq(1, 2, 3, null), null) ::
+      Row(null, null) ::
+      Row(null, 4) ::
+      Row(null, 3) ::
+      Row(null, null) ::
+      Row(null, null) :: Nil
+    )
+  }
+
   test("SPARK-2096 Correctly parse dot notations") {
     val jsonDF = jsonRDD(complexFieldAndType2)
     jsonDF.registerTempTable("jsonTable")

http://git-wip-us.apache.org/repos/asf/spark/blob/48376bfe/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
index 3370b3c..15698f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
@@ -146,6 +146,23 @@ object TestJsonData {
           ]]
       }""" :: Nil)
 
+  val mapType1 =
+    TestSQLContext.sparkContext.parallelize(
+      """{"map": {"a": 1}}""" ::
+      """{"map": {"b": 2}}""" ::
+      """{"map": {"c": 3}}""" ::
+      """{"map": {"c": 1, "d": 4}}""" ::
+      """{"map": {"e": null}}""" :: Nil)
+
+  val mapType2 =
+    TestSQLContext.sparkContext.parallelize(
+      """{"map": {"a": {"field1": [1, 2, 3, null]}}}""" ::
+      """{"map": {"b": {"field2": 2}}}""" ::
+      """{"map": {"c": {"field1": [], "field2": 4}}}""" ::
+      """{"map": {"c": {"field2": 3}, "d": {"field1": [null]}}}""" ::
+      """{"map": {"e": null}}""" ::
+      """{"map": {"f": {"field1": null}}}""" :: Nil)
+
   val nullsInArrays =
     TestSQLContext.sparkContext.parallelize(
       """{"field1":[[null], [[["Test"]]]]}""" ::


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org