You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/12/17 08:19:00 UTC

spark git commit: [SPARK-12057][SQL] Prevent failure on corrupt JSON records

Repository: spark
Updated Branches:
  refs/heads/master 437583f69 -> 9d66c4216


[SPARK-12057][SQL] Prevent failure on corrupt JSON records

This PR makes JSON parser and schema inference handle more cases where we have unparsed records. It is based on #10043. The last commit fixes the failed test and updates the logic of schema inference.

Regarding the schema inference change, if we have something like
```
{"f1":1}
[1,2,3]
```
originally, we will get a DF without any column.
After this change, we will get a DF with columns `f1` and `_corrupt_record`. Basically, for the second row, `[1,2,3]` will be the value of `_corrupt_record`.

When merge this PR, please make sure that the author is simplyianm.

JIRA: https://issues.apache.org/jira/browse/SPARK-12057

Closes #10043

Author: Ian Macalinao <me...@ian.pw>
Author: Yin Huai <yh...@databricks.com>

Closes #10288 from yhuai/handleCorruptJson.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d66c421
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d66c421
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d66c421

Branch: refs/heads/master
Commit: 9d66c4216ad830812848c657bbcd8cd50949e199
Parents: 437583f
Author: Yin Huai <yh...@databricks.com>
Authored: Wed Dec 16 23:18:53 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Dec 16 23:18:53 2015 -0800

----------------------------------------------------------------------
 .../datasources/json/InferSchema.scala          | 37 +++++++++++++++++---
 .../datasources/json/JacksonParser.scala        | 19 ++++++----
 .../execution/datasources/json/JsonSuite.scala  | 37 ++++++++++++++++++++
 .../datasources/json/TestJsonData.scala         |  9 ++++-
 4 files changed, 90 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9d66c421/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 922fd5b..59ba4ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -61,7 +61,10 @@ private[json] object InferSchema {
             StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
         }
       }
-    }.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType)
+    }.treeAggregate[DataType](
+      StructType(Seq()))(
+      compatibleRootType(columnNameOfCorruptRecords),
+      compatibleRootType(columnNameOfCorruptRecords))
 
     canonicalizeType(rootType) match {
       case Some(st: StructType) => st
@@ -170,12 +173,38 @@ private[json] object InferSchema {
     case other => Some(other)
   }
 
+  private def withCorruptField(
+      struct: StructType,
+      columnNameOfCorruptRecords: String): StructType = {
+    if (!struct.fieldNames.contains(columnNameOfCorruptRecords)) {
+      // If this given struct does not have a column used for corrupt records,
+      // add this field.
+      struct.add(columnNameOfCorruptRecords, StringType, nullable = true)
+    } else {
+      // Otherwise, just return this struct.
+      struct
+    }
+  }
+
   /**
    * Remove top-level ArrayType wrappers and merge the remaining schemas
    */
-  private def compatibleRootType: (DataType, DataType) => DataType = {
-    case (ArrayType(ty1, _), ty2) => compatibleRootType(ty1, ty2)
-    case (ty1, ArrayType(ty2, _)) => compatibleRootType(ty1, ty2)
+  private def compatibleRootType(
+      columnNameOfCorruptRecords: String): (DataType, DataType) => DataType = {
+    // Since we support array of json objects at the top level,
+    // we need to check the element type and find the root level data type.
+    case (ArrayType(ty1, _), ty2) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
+    case (ty1, ArrayType(ty2, _)) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
+    // If we see any other data type at the root level, we get records that cannot be
+    // parsed. So, we use the struct as the data type and add the corrupt field to the schema.
+    case (struct: StructType, NullType) => struct
+    case (NullType, struct: StructType) => struct
+    case (struct: StructType, o) if !o.isInstanceOf[StructType] =>
+      withCorruptField(struct, columnNameOfCorruptRecords)
+    case (o, struct: StructType) if !o.isInstanceOf[StructType] =>
+      withCorruptField(struct, columnNameOfCorruptRecords)
+    // If we get anything else, we call compatibleType.
+    // Usually, when we reach here, ty1 and ty2 are two StructTypes.
     case (ty1, ty2) => compatibleType(ty1, ty2)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9d66c421/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index bfa1405..55a1c24 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
+private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
+
 object JacksonParser {
 
   def parse(
@@ -110,7 +112,7 @@ object JacksonParser {
           lowerCaseValue.equals("-inf")) {
           value.toFloat
         } else {
-          sys.error(s"Cannot parse $value as FloatType.")
+          throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
         }
 
       case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) =>
@@ -127,7 +129,7 @@ object JacksonParser {
           lowerCaseValue.equals("-inf")) {
           value.toDouble
         } else {
-          sys.error(s"Cannot parse $value as DoubleType.")
+          throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
         }
 
       case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) =>
@@ -174,7 +176,11 @@ object JacksonParser {
         convertField(factory, parser, udt.sqlType)
 
       case (token, dataType) =>
-        sys.error(s"Failed to parse a value for data type $dataType (current token: $token).")
+        // We cannot parse this token based on the given data type. So, we throw a
+        // SparkSQLJsonProcessingException and this exception will be caught by
+        // parseJson method.
+        throw new SparkSQLJsonProcessingException(
+          s"Failed to parse a value for data type $dataType (current token: $token).")
     }
   }
 
@@ -267,15 +273,14 @@ object JacksonParser {
                   array.toArray[InternalRow](schema)
                 }
               case _ =>
-                sys.error(
-                  s"Failed to parse record $record. Please make sure that each line of " +
-                    "the file (or each string in the RDD) is a valid JSON object or " +
-                    "an array of JSON objects.")
+                failedRecord(record)
             }
           }
         } catch {
           case _: JsonProcessingException =>
             failedRecord(record)
+          case _: SparkSQLJsonProcessingException =>
+            failedRecord(record)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9d66c421/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index ba7718c..baa258a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1427,4 +1427,41 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       }
     }
   }
+
+  test("SPARK-12057 additional corrupt records do not throw exceptions") {
+    // Test if we can query corrupt records.
+    withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
+      withTempTable("jsonTable") {
+        val schema = StructType(
+          StructField("_unparsed", StringType, true) ::
+            StructField("dummy", StringType, true) :: Nil)
+
+        {
+          // We need to make sure we can infer the schema.
+          val jsonDF = sqlContext.read.json(additionalCorruptRecords)
+          assert(jsonDF.schema === schema)
+        }
+
+        {
+          val jsonDF = sqlContext.read.schema(schema).json(additionalCorruptRecords)
+          jsonDF.registerTempTable("jsonTable")
+
+          // In HiveContext, backticks should be used to access columns starting with a underscore.
+          checkAnswer(
+            sql(
+              """
+                |SELECT dummy, _unparsed
+                |FROM jsonTable
+              """.stripMargin),
+            Row("test", null) ::
+              Row(null, """[1,2,3]""") ::
+              Row(null, """":"test", "a":1}""") ::
+              Row(null, """42""") ::
+              Row(null, """     ","ian":"test"}""") :: Nil
+          )
+        }
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9d66c421/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index 713d1da..cb61f7e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -188,6 +188,14 @@ private[json] trait TestJsonData {
       """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
       """]""" :: Nil)
 
+  def additionalCorruptRecords: RDD[String] =
+    sqlContext.sparkContext.parallelize(
+      """{"dummy":"test"}""" ::
+      """[1,2,3]""" ::
+      """":"test", "a":1}""" ::
+      """42""" ::
+      """     ","ian":"test"}""" :: Nil)
+
   def emptyRecords: RDD[String] =
     sqlContext.sparkContext.parallelize(
       """{""" ::
@@ -197,7 +205,6 @@ private[json] trait TestJsonData {
         """{"b": [{"c": {}}]}""" ::
         """]""" :: Nil)
 
-
   lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)
 
   def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org