You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/03/22 13:30:58 UTC

spark git commit: [SPARK-13953][SQL] Specifying the field name for corrupted record via option at JSON datasource

Repository: spark
Updated Branches:
  refs/heads/master f2e855fba -> 4e09a0d5e


[SPARK-13953][SQL] Specifying the field name for corrupted record via option at JSON datasource

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-13953

Currently, JSON data source creates a new field in `PERMISSIVE` mode for storing malformed string.
This field can be renamed via `spark.sql.columnNameOfCorruptRecord` option but it is a global configuration.

This PR make that option can be applied per read and can be specified via `option()`. This will overwrites `spark.sql.columnNameOfCorruptRecord` if it is set.

## How was this patch tested?

Unit tests were used and `./dev/run_tests` for coding style tests.

Author: hyukjinkwon <gu...@gmail.com>

Closes #11881 from HyukjinKwon/SPARK-13953.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e09a0d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e09a0d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e09a0d5

Branch: refs/heads/master
Commit: 4e09a0d5ea50d1cfc936bc87cf3372b4a0aa7dc2
Parents: f2e855f
Author: hyukjinkwon <gu...@gmail.com>
Authored: Tue Mar 22 20:30:48 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Mar 22 20:30:48 2016 +0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  5 ++++-
 .../org/apache/spark/sql/DataFrameReader.scala  | 20 +++++++++++++++----
 .../datasources/json/JSONOptions.scala          |  1 +
 .../datasources/json/JSONRelation.scala         | 10 ++++++++--
 .../execution/datasources/json/JsonSuite.scala  | 21 ++++++++++++++++++++
 5 files changed, 50 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4e09a0d5/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index bae9e69..cca57a3 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -166,10 +166,13 @@ class DataFrameReader(object):
                 during parsing.
                 *  ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
                   record and puts the malformed string into a new field configured by \
-                 ``spark.sql.columnNameOfCorruptRecord``. When a schema is set by user, it sets \
+                 ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
                  ``null`` for extra fields.
                 *  ``DROPMALFORMED`` : ignores the whole corrupted records.
                 *  ``FAILFAST`` : throws an exception when it meets corrupted records.
+            *  ``columnNameOfCorruptRecord`` (default ``_corrupt_record``): allows renaming the \
+                 new field having malformed string created by ``PERMISSIVE`` mode. \
+                 This overrides ``spark.sql.columnNameOfCorruptRecord``.
 
         >>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
         >>> df1.dtypes

http://git-wip-us.apache.org/repos/asf/spark/blob/4e09a0d5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 0dc0d44..1d4693f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -293,11 +293,14 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    * during parsing.<li>
    * <ul>
    *  <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the
-   *  malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When
+   *  malformed string into a new field configured by `columnNameOfCorruptRecord`. When
    *  a schema is set by user, it sets `null` for extra fields.</li>
    *  <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *  <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
    * </ul>
+   * <li>`columnNameOfCorruptRecord` (default `_corrupt_record`): allows renaming the new field
+   * having malformed string created by `PERMISSIVE` mode. This overrides
+   * `spark.sql.columnNameOfCorruptRecord`.<li>
    *
    * @since 1.4.0
    */
@@ -326,11 +329,14 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    * during parsing.<li>
    * <ul>
    *  <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the
-   *  malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When
+   *  malformed string into a new field configured by `columnNameOfCorruptRecord`. When
    *  a schema is set by user, it sets `null` for extra fields.</li>
    *  <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *  <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
    * </ul>
+   * <li>`columnNameOfCorruptRecord` (default `_corrupt_record`): allows renaming the new field
+   * having malformed string created by `PERMISSIVE` mode. This overrides
+   * `spark.sql.columnNameOfCorruptRecord`.<li>
    *
    * @since 1.6.0
    */
@@ -360,8 +366,14 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
    */
   def json(jsonRDD: RDD[String]): DataFrame = {
     val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap)
+    val columnNameOfCorruptRecord =
+      parsedOptions.columnNameOfCorruptRecord
+        .getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
     val schema = userSpecifiedSchema.getOrElse {
-      InferSchema.infer(jsonRDD, sqlContext.conf.columnNameOfCorruptRecord, parsedOptions)
+      InferSchema.infer(
+        jsonRDD,
+        columnNameOfCorruptRecord,
+        parsedOptions)
     }
 
     Dataset.newDataFrame(
@@ -371,7 +383,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
         JacksonParser.parse(
           jsonRDD,
           schema,
-          sqlContext.conf.columnNameOfCorruptRecord,
+          columnNameOfCorruptRecord,
           parsedOptions))(sqlContext))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4e09a0d5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index 93c3d47..c0ad9ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -51,6 +51,7 @@ private[sql] class JSONOptions(
     parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
   val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
   private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+  val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
 
   // Parse mode flags
   if (!ParseModes.isValidMode(parseMode)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/4e09a0d5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 4f8de25..3bf0af0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -51,6 +51,9 @@ class DefaultSource extends FileFormat with DataSourceRegister {
       None
     } else {
       val parsedOptions: JSONOptions = new JSONOptions(options)
+      val columnNameOfCorruptRecord =
+        parsedOptions.columnNameOfCorruptRecord
+          .getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
       val jsonFiles = files.filterNot { status =>
         val name = status.getPath.getName
         name.startsWith("_") || name.startsWith(".")
@@ -58,7 +61,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
 
       val jsonSchema = InferSchema.infer(
         createBaseRdd(sqlContext, jsonFiles),
-        sqlContext.conf.columnNameOfCorruptRecord,
+        columnNameOfCorruptRecord,
         parsedOptions)
       checkConstraints(jsonSchema)
 
@@ -102,10 +105,13 @@ class DefaultSource extends FileFormat with DataSourceRegister {
 
     val parsedOptions: JSONOptions = new JSONOptions(options)
     val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_)))
+    val columnNameOfCorruptRecord =
+      parsedOptions.columnNameOfCorruptRecord
+        .getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
     val rows = JacksonParser.parse(
       createBaseRdd(sqlContext, jsonFiles),
       requiredDataSchema,
-      sqlContext.conf.columnNameOfCorruptRecord,
+      columnNameOfCorruptRecord,
       parsedOptions)
 
     rows.mapPartitions { iterator =>

http://git-wip-us.apache.org/repos/asf/spark/blob/4e09a0d5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 0a5699b..c108d81 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1067,6 +1067,27 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     }
   }
 
+  test("SPARK-13953 Rename the corrupt record field via option") {
+    val jsonDF = sqlContext.read
+      .option("columnNameOfCorruptRecord", "_malformed")
+      .json(corruptRecords)
+    val schema = StructType(
+      StructField("_malformed", StringType, true) ::
+        StructField("a", StringType, true) ::
+        StructField("b", StringType, true) ::
+        StructField("c", StringType, true) :: Nil)
+
+    assert(schema === jsonDF.schema)
+    checkAnswer(
+      jsonDF.selectExpr("a", "b", "c", "_malformed"),
+      Row(null, null, null, "{") ::
+        Row(null, null, null, """{"a":1, b:2}""") ::
+        Row(null, null, null, """{"a":{, b:3}""") ::
+        Row("str_a_4", "str_b_4", "str_c_4", null) ::
+        Row(null, null, null, "]") :: Nil
+    )
+  }
+
   test("SPARK-4068: nulls in arrays") {
     val jsonDF = sqlContext.read.json(nullsInArrays)
     jsonDF.registerTempTable("jsonTable")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org