You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/10/09 23:57:31 UTC

git commit: [SPARK-3339][SQL] Support for skipping json lines that fail to parse

Repository: spark
Updated Branches:
  refs/heads/master 1faa1135a -> 1c7f0ab30


[SPARK-3339][SQL] Support for skipping json lines that fail to parse

This PR aims to provide a way to skip/query corrupt JSON records. To do so, we introduce an internal column to hold corrupt records (the default name is `_corrupt_record`. This name can be changed by setting the value of `spark.sql.columnNameOfCorruptRecord`). When there is a parsing error, we will put the corrupt record in its unparsed format to the internal column. Users can skip/query this column through SQL.

* To query those corrupt records
```
-- For Hive parser
SELECT `_corrupt_record`
FROM jsonTable
WHERE `_corrupt_record` IS NOT NULL
-- For our SQL parser
SELECT _corrupt_record
FROM jsonTable
WHERE _corrupt_record IS NOT NULL
```
* To skip corrupt records and query regular records
```
-- For Hive parser
SELECT field1, field2
FROM jsonTable
WHERE `_corrupt_record` IS NULL
-- For our SQL parser
SELECT field1, field2
FROM jsonTable
WHERE _corrupt_record IS NULL
```

Generally, it is not recommended to change the name of the internal column. If the name has to be changed to avoid possible name conflicts, you can use `sqlContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, <new column name>)` or `sqlContext.sql(SET spark.sql.columnNameOfCorruptRecord=<new column name>)`.

Author: Yin Huai <hu...@cse.ohio-state.edu>

Closes #2680 from yhuai/corruptJsonRecord and squashes the following commits:

4c9828e [Yin Huai] Merge remote-tracking branch 'upstream/master' into corruptJsonRecord
309616a [Yin Huai] Change the default name of corrupt record to "_corrupt_record".
b4a3632 [Yin Huai] Merge remote-tracking branch 'upstream/master' into corruptJsonRecord
9375ae9 [Yin Huai] Set the column name of corrupt json record back to the default one after the unit test.
ee584c0 [Yin Huai] Provide a way to query corrupt json records as unparsed strings.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c7f0ab3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c7f0ab3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c7f0ab3

Branch: refs/heads/master
Commit: 1c7f0ab302de9f82b1bd6da852d133823bc67c66
Parents: 1faa113
Author: Yin Huai <hu...@cse.ohio-state.edu>
Authored: Thu Oct 9 14:57:27 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Oct 9 14:57:27 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |  4 ++
 .../scala/org/apache/spark/sql/SQLContext.scala | 14 +++--
 .../spark/sql/api/java/JavaSQLContext.scala     | 16 +++--
 .../org/apache/spark/sql/json/JsonRDD.scala     | 30 ++++++----
 .../org/apache/spark/sql/json/JsonSuite.scala   | 62 +++++++++++++++++++-
 .../apache/spark/sql/json/TestJsonData.scala    |  9 +++
 6 files changed, 116 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1c7f0ab3/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index f6f4cf3..07e6e2e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -35,6 +35,7 @@ private[spark] object SQLConf {
   val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
   val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
   val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
+  val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
 
   // This is only used for the thriftserver
   val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
@@ -131,6 +132,9 @@ private[sql] trait SQLConf {
   private[spark] def inMemoryPartitionPruning: Boolean =
     getConf(IN_MEMORY_PARTITION_PRUNING, "false").toBoolean
 
+  private[spark] def columnNameOfCorruptRecord: String =
+    getConf(COLUMN_NAME_OF_CORRUPT_RECORD, "_corrupt_record")
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/1c7f0ab3/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 35561ca..014e1e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -195,9 +195,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   @Experimental
   def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = {
+    val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
     val appliedSchema =
-      Option(schema).getOrElse(JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, 1.0)))
-    val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema)
+      Option(schema).getOrElse(
+        JsonRDD.nullTypeToStringType(
+          JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord)))
+    val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
     applySchema(rowRDD, appliedSchema)
   }
 
@@ -206,8 +209,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   @Experimental
   def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = {
-    val appliedSchema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, samplingRatio))
-    val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema)
+    val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
+    val appliedSchema =
+      JsonRDD.nullTypeToStringType(
+        JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
+    val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
     applySchema(rowRDD, appliedSchema)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1c7f0ab3/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index c006c43..f8171c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -148,8 +148,12 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
    * It goes through the entire dataset once to determine the schema.
    */
   def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = {
-    val appliedScalaSchema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0))
-    val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema)
+    val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
+    val appliedScalaSchema =
+      JsonRDD.nullTypeToStringType(
+        JsonRDD.inferSchema(json.rdd, 1.0, columnNameOfCorruptJsonRecord))
+    val scalaRowRDD =
+      JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord)
     val logicalPlan =
       LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext)
     new JavaSchemaRDD(sqlContext, logicalPlan)
@@ -162,10 +166,14 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
    */
   @Experimental
   def jsonRDD(json: JavaRDD[String], schema: StructType): JavaSchemaRDD = {
+    val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
     val appliedScalaSchema =
       Option(asScalaDataType(schema)).getOrElse(
-        JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0))).asInstanceOf[SStructType]
-    val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema)
+        JsonRDD.nullTypeToStringType(
+          JsonRDD.inferSchema(
+            json.rdd, 1.0, columnNameOfCorruptJsonRecord))).asInstanceOf[SStructType]
+    val scalaRowRDD = JsonRDD.jsonStringToRow(
+      json.rdd, appliedScalaSchema, columnNameOfCorruptJsonRecord)
     val logicalPlan =
       LogicalRDD(appliedScalaSchema.toAttributes, scalaRowRDD)(sqlContext)
     new JavaSchemaRDD(sqlContext, logicalPlan)

http://git-wip-us.apache.org/repos/asf/spark/blob/1c7f0ab3/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index fbc2965..61ee960 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -22,6 +22,7 @@ import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper}
 import scala.math.BigDecimal
 import java.sql.Timestamp
 
+import com.fasterxml.jackson.core.JsonProcessingException
 import com.fasterxml.jackson.databind.ObjectMapper
 
 import org.apache.spark.rdd.RDD
@@ -35,16 +36,19 @@ private[sql] object JsonRDD extends Logging {
 
   private[sql] def jsonStringToRow(
       json: RDD[String],
-      schema: StructType): RDD[Row] = {
-    parseJson(json).map(parsed => asRow(parsed, schema))
+      schema: StructType,
+      columnNameOfCorruptRecords: String): RDD[Row] = {
+    parseJson(json, columnNameOfCorruptRecords).map(parsed => asRow(parsed, schema))
   }
 
   private[sql] def inferSchema(
       json: RDD[String],
-      samplingRatio: Double = 1.0): StructType = {
+      samplingRatio: Double = 1.0,
+      columnNameOfCorruptRecords: String): StructType = {
     require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
     val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1)
-    val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _)
+    val allKeys =
+      parseJson(schemaData, columnNameOfCorruptRecords).map(allKeysWithValueTypes).reduce(_ ++ _)
     createSchema(allKeys)
   }
 
@@ -274,7 +278,9 @@ private[sql] object JsonRDD extends Logging {
     case atom => atom
   }
 
-  private def parseJson(json: RDD[String]): RDD[Map[String, Any]] = {
+  private def parseJson(
+      json: RDD[String],
+      columnNameOfCorruptRecords: String): RDD[Map[String, Any]] = {
     // According to [Jackson-72: https://jira.codehaus.org/browse/JACKSON-72],
     // ObjectMapper will not return BigDecimal when
     // "DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS" is disabled
@@ -289,12 +295,16 @@ private[sql] object JsonRDD extends Logging {
       // For example: for {"key": 1, "key":2}, we will get "key"->2.
       val mapper = new ObjectMapper()
       iter.flatMap { record =>
-        val parsed = mapper.readValue(record, classOf[Object]) match {
-          case map: java.util.Map[_, _] => scalafy(map).asInstanceOf[Map[String, Any]] :: Nil
-          case list: java.util.List[_] => scalafy(list).asInstanceOf[Seq[Map[String, Any]]]
-        }
+        try {
+          val parsed = mapper.readValue(record, classOf[Object]) match {
+            case map: java.util.Map[_, _] => scalafy(map).asInstanceOf[Map[String, Any]] :: Nil
+            case list: java.util.List[_] => scalafy(list).asInstanceOf[Seq[Map[String, Any]]]
+          }
 
-        parsed
+          parsed
+        } catch {
+          case e: JsonProcessingException => Map(columnNameOfCorruptRecords -> record) :: Nil
+        }
       }
     })
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1c7f0ab3/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 3cfcb2b..7bb08f1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -21,6 +21,8 @@ import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.json.JsonRDD.{enforceCorrectType, compatibleType}
 import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext._
 
 import java.sql.Timestamp
@@ -644,7 +646,65 @@ class JsonSuite extends QueryTest {
       ("str_a_1", null, null) ::
       ("str_a_2", null, null) ::
       (null, "str_b_3", null) ::
-      ("str_a_4", "str_b_4", "str_c_4") ::Nil
+      ("str_a_4", "str_b_4", "str_c_4") :: Nil
     )
   }
+
+  test("Corrupt records") {
+    // Test if we can query corrupt records.
+    val oldColumnNameOfCorruptRecord = TestSQLContext.columnNameOfCorruptRecord
+    TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
+
+    val jsonSchemaRDD = jsonRDD(corruptRecords)
+    jsonSchemaRDD.registerTempTable("jsonTable")
+
+    val schema = StructType(
+      StructField("_unparsed", StringType, true) ::
+      StructField("a", StringType, true) ::
+      StructField("b", StringType, true) ::
+      StructField("c", StringType, true) :: Nil)
+
+    assert(schema === jsonSchemaRDD.schema)
+
+    // In HiveContext, backticks should be used to access columns starting with a underscore.
+    checkAnswer(
+      sql(
+        """
+          |SELECT a, b, c, _unparsed
+          |FROM jsonTable
+        """.stripMargin),
+      (null, null, null, "{") ::
+      (null, null, null, "") ::
+      (null, null, null, """{"a":1, b:2}""") ::
+      (null, null, null, """{"a":{, b:3}""") ::
+      ("str_a_4", "str_b_4", "str_c_4", null) ::
+      (null, null, null, "]") :: Nil
+    )
+
+    checkAnswer(
+      sql(
+        """
+          |SELECT a, b, c
+          |FROM jsonTable
+          |WHERE _unparsed IS NULL
+        """.stripMargin),
+      ("str_a_4", "str_b_4", "str_c_4") :: Nil
+    )
+
+    checkAnswer(
+      sql(
+        """
+          |SELECT _unparsed
+          |FROM jsonTable
+          |WHERE _unparsed IS NOT NULL
+        """.stripMargin),
+      Seq("{") ::
+      Seq("") ::
+      Seq("""{"a":1, b:2}""") ::
+      Seq("""{"a":{, b:3}""") ::
+      Seq("]") :: Nil
+    )
+
+    TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, oldColumnNameOfCorruptRecord)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1c7f0ab3/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
index fc833b8..eaca9f0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala
@@ -143,4 +143,13 @@ object TestJsonData {
       """[{"a":"str_a_2"}, {"b":"str_b_3"}]""" ::
       """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
       """[]""" :: Nil)
+
+  val corruptRecords =
+    TestSQLContext.sparkContext.parallelize(
+      """{""" ::
+      """""" ::
+      """{"a":1, b:2}""" ::
+      """{"a":{, b:3}""" ::
+      """{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
+      """]""" :: Nil)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org