You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/10/31 12:10:45 UTC

spark git commit: [SPARK-11226][SQL] Empty line in json file should be skipped

Repository: spark
Updated Branches:
  refs/heads/master 3c471885d -> 97b3c8fb4


[SPARK-11226][SQL] Empty line in json file should be skipped

Currently the empty line in json file will be parsed into Row with all null field values. But in json, "{}" represents a json object, empty line is supposed to be skipped.

Make a trivial change for this.

Author: Jeff Zhang <zj...@apache.org>

Closes #9211 from zjffdu/SPARK-11226.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97b3c8fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97b3c8fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97b3c8fb

Branch: refs/heads/master
Commit: 97b3c8fb470f0d3c1cdb1aeb27f675e695442e87
Parents: 3c47188
Author: Jeff Zhang <zj...@apache.org>
Authored: Sat Oct 31 11:10:37 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat Oct 31 11:10:37 2015 +0000

----------------------------------------------------------------------
 .../datasources/json/JacksonParser.scala        | 46 +++++++++++---------
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 11 +++++
 .../execution/datasources/json/JsonSuite.scala  |  3 --
 3 files changed, 36 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/97b3c8fb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index b2e5201..4f53eeb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -245,29 +245,33 @@ private[sql] object JacksonParser {
       val factory = new JsonFactory()
 
       iter.flatMap { record =>
-        try {
-          Utils.tryWithResource(factory.createParser(record)) { parser =>
-            parser.nextToken()
-
-            convertField(factory, parser, schema) match {
-              case null => failedRecord(record)
-              case row: InternalRow => row :: Nil
-              case array: ArrayData =>
-                if (array.numElements() == 0) {
-                  Nil
-                } else {
-                  array.toArray[InternalRow](schema)
-                }
-              case _ =>
-                sys.error(
-                  s"Failed to parse record $record. Please make sure that each line of the file " +
-                    "(or each string in the RDD) is a valid JSON object or " +
-                    "an array of JSON objects.")
+        if (record.trim.isEmpty) {
+          Nil
+        } else {
+          try {
+            Utils.tryWithResource(factory.createParser(record)) { parser =>
+              parser.nextToken()
+
+              convertField(factory, parser, schema) match {
+                case null => failedRecord(record)
+                case row: InternalRow => row :: Nil
+                case array: ArrayData =>
+                  if (array.numElements() == 0) {
+                    Nil
+                  } else {
+                    array.toArray[InternalRow](schema)
+                  }
+                case _ =>
+                  sys.error(
+                    s"Failed to parse record $record. Please make sure that each line of " +
+                      "the file (or each string in the RDD) is a valid JSON object or " +
+                      "an array of JSON objects.")
+              }
             }
+          } catch {
+            case _: JsonProcessingException =>
+              failedRecord(record)
           }
-        } catch {
-          case _: JsonProcessingException =>
-            failedRecord(record)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/97b3c8fb/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 5a616fa..5413ef1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -225,6 +225,17 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
       Seq(Row("1"), Row("2")))
   }
 
+  test("SPARK-11226 Skip empty line in json file") {
+    sqlContext.read.json(
+      sparkContext.parallelize(
+        Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "")))
+      .registerTempTable("d")
+
+    checkAnswer(
+      sql("select count(1) from d"),
+      Seq(Row(3)))
+  }
+
   test("SPARK-8828 sum should return null if all input values are null") {
     withSQLConf(SQLConf.USE_SQL_AGGREGATE2.key -> "true") {
       withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {

http://git-wip-us.apache.org/repos/asf/spark/blob/97b3c8fb/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index d3fd409..28b8f02 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -959,7 +959,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       withTempTable("jsonTable") {
         val jsonDF = sqlContext.read.json(corruptRecords)
         jsonDF.registerTempTable("jsonTable")
-
         val schema = StructType(
           StructField("_unparsed", StringType, true) ::
           StructField("a", StringType, true) ::
@@ -976,7 +975,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
               |FROM jsonTable
             """.stripMargin),
           Row(null, null, null, "{") ::
-            Row(null, null, null, "") ::
             Row(null, null, null, """{"a":1, b:2}""") ::
             Row(null, null, null, """{"a":{, b:3}""") ::
             Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -1001,7 +999,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
               |WHERE _unparsed IS NOT NULL
             """.stripMargin),
           Row("{") ::
-            Row("") ::
             Row("""{"a":1, b:2}""") ::
             Row("""{"a":{, b:3}""") ::
             Row("]") :: Nil


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org