You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/12/17 08:19:00 UTC
spark git commit: [SPARK-12057][SQL] Prevent failure on corrupt JSON
records
Repository: spark
Updated Branches:
refs/heads/master 437583f69 -> 9d66c4216
[SPARK-12057][SQL] Prevent failure on corrupt JSON records
This PR makes JSON parser and schema inference handle more cases where we have unparsed records. It is based on #10043. The last commit fixes the failed test and updates the logic of schema inference.
Regarding the schema inference change, if we have something like
```
{"f1":1}
[1,2,3]
```
originally, we will get a DF without any column.
After this change, we will get a DF with columns `f1` and `_corrupt_record`. Basically, for the second row, `[1,2,3]` will be the value of `_corrupt_record`.
When merge this PR, please make sure that the author is simplyianm.
JIRA: https://issues.apache.org/jira/browse/SPARK-12057
Closes #10043
Author: Ian Macalinao <me...@ian.pw>
Author: Yin Huai <yh...@databricks.com>
Closes #10288 from yhuai/handleCorruptJson.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9d66c421
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9d66c421
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9d66c421
Branch: refs/heads/master
Commit: 9d66c4216ad830812848c657bbcd8cd50949e199
Parents: 437583f
Author: Yin Huai <yh...@databricks.com>
Authored: Wed Dec 16 23:18:53 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Dec 16 23:18:53 2015 -0800
----------------------------------------------------------------------
.../datasources/json/InferSchema.scala | 37 +++++++++++++++++---
.../datasources/json/JacksonParser.scala | 19 ++++++----
.../execution/datasources/json/JsonSuite.scala | 37 ++++++++++++++++++++
.../datasources/json/TestJsonData.scala | 9 ++++-
4 files changed, 90 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9d66c421/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 922fd5b..59ba4ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -61,7 +61,10 @@ private[json] object InferSchema {
StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
}
}
- }.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType)
+ }.treeAggregate[DataType](
+ StructType(Seq()))(
+ compatibleRootType(columnNameOfCorruptRecords),
+ compatibleRootType(columnNameOfCorruptRecords))
canonicalizeType(rootType) match {
case Some(st: StructType) => st
@@ -170,12 +173,38 @@ private[json] object InferSchema {
case other => Some(other)
}
+ private def withCorruptField(
+ struct: StructType,
+ columnNameOfCorruptRecords: String): StructType = {
+ if (!struct.fieldNames.contains(columnNameOfCorruptRecords)) {
+ // If this given struct does not have a column used for corrupt records,
+ // add this field.
+ struct.add(columnNameOfCorruptRecords, StringType, nullable = true)
+ } else {
+ // Otherwise, just return this struct.
+ struct
+ }
+ }
+
/**
* Remove top-level ArrayType wrappers and merge the remaining schemas
*/
- private def compatibleRootType: (DataType, DataType) => DataType = {
- case (ArrayType(ty1, _), ty2) => compatibleRootType(ty1, ty2)
- case (ty1, ArrayType(ty2, _)) => compatibleRootType(ty1, ty2)
+ private def compatibleRootType(
+ columnNameOfCorruptRecords: String): (DataType, DataType) => DataType = {
+ // Since we support array of json objects at the top level,
+ // we need to check the element type and find the root level data type.
+ case (ArrayType(ty1, _), ty2) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
+ case (ty1, ArrayType(ty2, _)) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
+ // If we see any other data type at the root level, we get records that cannot be
+ // parsed. So, we use the struct as the data type and add the corrupt field to the schema.
+ case (struct: StructType, NullType) => struct
+ case (NullType, struct: StructType) => struct
+ case (struct: StructType, o) if !o.isInstanceOf[StructType] =>
+ withCorruptField(struct, columnNameOfCorruptRecords)
+ case (o, struct: StructType) if !o.isInstanceOf[StructType] =>
+ withCorruptField(struct, columnNameOfCorruptRecords)
+ // If we get anything else, we call compatibleType.
+ // Usually, when we reach here, ty1 and ty2 are two StructTypes.
case (ty1, ty2) => compatibleType(ty1, ty2)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/9d66c421/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index bfa1405..55a1c24 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
+private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
+
object JacksonParser {
def parse(
@@ -110,7 +112,7 @@ object JacksonParser {
lowerCaseValue.equals("-inf")) {
value.toFloat
} else {
- sys.error(s"Cannot parse $value as FloatType.")
+ throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
}
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) =>
@@ -127,7 +129,7 @@ object JacksonParser {
lowerCaseValue.equals("-inf")) {
value.toDouble
} else {
- sys.error(s"Cannot parse $value as DoubleType.")
+ throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
}
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) =>
@@ -174,7 +176,11 @@ object JacksonParser {
convertField(factory, parser, udt.sqlType)
case (token, dataType) =>
- sys.error(s"Failed to parse a value for data type $dataType (current token: $token).")
+ // We cannot parse this token based on the given data type. So, we throw a
+ // SparkSQLJsonProcessingException and this exception will be caught by
+ // parseJson method.
+ throw new SparkSQLJsonProcessingException(
+ s"Failed to parse a value for data type $dataType (current token: $token).")
}
}
@@ -267,15 +273,14 @@ object JacksonParser {
array.toArray[InternalRow](schema)
}
case _ =>
- sys.error(
- s"Failed to parse record $record. Please make sure that each line of " +
- "the file (or each string in the RDD) is a valid JSON object or " +
- "an array of JSON objects.")
+ failedRecord(record)
}
}
} catch {
case _: JsonProcessingException =>
failedRecord(record)
+ case _: SparkSQLJsonProcessingException =>
+ failedRecord(record)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/9d66c421/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index ba7718c..baa258a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1427,4 +1427,41 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}
}
+
+ test("SPARK-12057 additional corrupt records do not throw exceptions") {
+ // Test if we can query corrupt records.
+ withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
+ withTempTable("jsonTable") {
+ val schema = StructType(
+ StructField("_unparsed", StringType, true) ::
+ StructField("dummy", StringType, true) :: Nil)
+
+ {
+ // We need to make sure we can infer the schema.
+ val jsonDF = sqlContext.read.json(additionalCorruptRecords)
+ assert(jsonDF.schema === schema)
+ }
+
+ {
+ val jsonDF = sqlContext.read.schema(schema).json(additionalCorruptRecords)
+ jsonDF.registerTempTable("jsonTable")
+
+ // In HiveContext, backticks should be used to access columns starting with a underscore.
+ checkAnswer(
+ sql(
+ """
+ |SELECT dummy, _unparsed
+ |FROM jsonTable
+ """.stripMargin),
+ Row("test", null) ::
+ Row(null, """[1,2,3]""") ::
+ Row(null, """":"test", "a":1}""") ::
+ Row(null, """42""") ::
+ Row(null, """ ","ian":"test"}""") :: Nil
+ )
+ }
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/9d66c421/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index 713d1da..cb61f7e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -188,6 +188,14 @@ private[json] trait TestJsonData {
"""{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
"""]""" :: Nil)
+ def additionalCorruptRecords: RDD[String] =
+ sqlContext.sparkContext.parallelize(
+ """{"dummy":"test"}""" ::
+ """[1,2,3]""" ::
+ """":"test", "a":1}""" ::
+ """42""" ::
+ """ ","ian":"test"}""" :: Nil)
+
def emptyRecords: RDD[String] =
sqlContext.sparkContext.parallelize(
"""{""" ::
@@ -197,7 +205,6 @@ private[json] trait TestJsonData {
"""{"b": [{"c": {}}]}""" ::
"""]""" :: Nil)
-
lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)
def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org