You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/02/28 02:01:02 UTC

spark git commit: [SPARK-23448][SQL] Clarify JSON and CSV parser behavior in document

Repository: spark
Updated Branches:
  refs/heads/master 23ac3aaba -> b14993e1f


[SPARK-23448][SQL] Clarify JSON and CSV parser behavior in document

## What changes were proposed in this pull request?

Clarify JSON and CSV reader behavior in document.

JSON doesn't support partial results for corrupted records.
CSV only supports partial results for the records with more or less tokens.

## How was this patch tested?

Pass existing tests.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #20666 from viirya/SPARK-23448-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b14993e1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b14993e1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b14993e1

Branch: refs/heads/master
Commit: b14993e1fcb68e1c946a671c6048605ab4afdf58
Parents: 23ac3aa
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Wed Feb 28 11:00:54 2018 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Wed Feb 28 11:00:54 2018 +0900

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                | 30 +++++++++++---------
 python/pyspark/sql/streaming.py                 | 30 +++++++++++---------
 .../spark/sql/catalyst/json/JacksonParser.scala |  3 ++
 .../org/apache/spark/sql/DataFrameReader.scala  | 22 +++++++-------
 .../datasources/csv/UnivocityParser.scala       |  5 ++++
 .../spark/sql/streaming/DataStreamReader.scala  | 22 +++++++-------
 6 files changed, 64 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b14993e1/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 49af1bc..9d05ac7 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -209,13 +209,13 @@ class DataFrameReader(OptionUtils):
         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
 
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
-                 record, and puts the malformed string into a field configured by \
-                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
-                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
-                 schema. If a schema does not have the field, it drops corrupt records during \
-                 parsing. When inferring a schema, it implicitly adds a \
-                 ``columnNameOfCorruptRecord`` field in an output schema.
+                * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
+                  into a field configured by ``columnNameOfCorruptRecord``, and sets other \
+                  fields to ``null``. To keep corrupt records, an user can set a string type \
+                  field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
+                  schema does not have the field, it drops corrupt records during parsing. \
+                  When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \
+                  field in an output schema.
                 *  ``DROPMALFORMED`` : ignores the whole corrupted records.
                 *  ``FAILFAST`` : throws an exception when it meets corrupted records.
 
@@ -393,13 +393,15 @@ class DataFrameReader(OptionUtils):
         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
 
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
-                  record, and puts the malformed string into a field configured by \
-                  ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
-                  a string type field named ``columnNameOfCorruptRecord`` in an \
-                  user-defined schema. If a schema does not have the field, it drops corrupt \
-                  records during parsing. When a length of parsed CSV tokens is shorter than \
-                  an expected length of a schema, it sets `null` for extra fields.
+                * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
+                  into a field configured by ``columnNameOfCorruptRecord``, and sets other \
+                  fields to ``null``. To keep corrupt records, an user can set a string type \
+                  field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
+                  schema does not have the field, it drops corrupt records during parsing. \
+                  A record with less/more tokens than schema is not a corrupted record to CSV. \
+                  When it meets a record having fewer tokens than the length of the schema, \
+                  sets ``null`` to extra fields. When the record has more tokens than the \
+                  length of the schema, it drops extra tokens.
                 * ``DROPMALFORMED`` : ignores the whole corrupted records.
                 * ``FAILFAST`` : throws an exception when it meets corrupted records.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b14993e1/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index e2a97ac..cc622de 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -442,13 +442,13 @@ class DataStreamReader(OptionUtils):
         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
 
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
-                 record, and puts the malformed string into a field configured by \
-                 ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
-                 a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
-                 schema. If a schema does not have the field, it drops corrupt records during \
-                 parsing. When inferring a schema, it implicitly adds a \
-                 ``columnNameOfCorruptRecord`` field in an output schema.
+                * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
+                  into a field configured by ``columnNameOfCorruptRecord``, and sets other \
+                  fields to ``null``. To keep corrupt records, an user can set a string type \
+                  field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
+                  schema does not have the field, it drops corrupt records during parsing. \
+                  When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \
+                  field in an output schema.
                 *  ``DROPMALFORMED`` : ignores the whole corrupted records.
                 *  ``FAILFAST`` : throws an exception when it meets corrupted records.
 
@@ -621,13 +621,15 @@ class DataStreamReader(OptionUtils):
         :param mode: allows a mode for dealing with corrupt records during parsing. If None is
                      set, it uses the default value, ``PERMISSIVE``.
 
-                * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
-                  record, and puts the malformed string into a field configured by \
-                  ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
-                  a string type field named ``columnNameOfCorruptRecord`` in an \
-                  user-defined schema. If a schema does not have the field, it drops corrupt \
-                  records during parsing. When a length of parsed CSV tokens is shorter than \
-                  an expected length of a schema, it sets `null` for extra fields.
+                * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
+                  into a field configured by ``columnNameOfCorruptRecord``, and sets other \
+                  fields to ``null``. To keep corrupt records, an user can set a string type \
+                  field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
+                  schema does not have the field, it drops corrupt records during parsing. \
+                  A record with less/more tokens than schema is not a corrupted record to CSV. \
+                  When it meets a record having fewer tokens than the length of the schema, \
+                  sets ``null`` to extra fields. When the record has more tokens than the \
+                  length of the schema, it drops extra tokens.
                 * ``DROPMALFORMED`` : ignores the whole corrupted records.
                 * ``FAILFAST`` : throws an exception when it meets corrupted records.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b14993e1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index bd144c9..7f69569 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -357,6 +357,9 @@ class JacksonParser(
       }
     } catch {
       case e @ (_: RuntimeException | _: JsonProcessingException) =>
+        // JSON parser currently doesn't support partial results for corrupted records.
+        // For such records, all fields other than the field configured by
+        // `columnNameOfCorruptRecord` are set to `null`.
         throw BadRecordException(() => recordLiteral(record), () => None, e)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b14993e1/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 4274f12..0139913 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -345,12 +345,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
    * during parsing.
    *   <ul>
-   *     <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
-   *     the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
-   *     corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
-   *     in an user-defined schema. If a schema does not have the field, it drops corrupt records
-   *     during parsing. When inferring a schema, it implicitly adds a `columnNameOfCorruptRecord`
-   *     field in an output schema.</li>
+   *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
+   *     field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To
+   *     keep corrupt records, an user can set a string type field named
+   *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the
+   *     field, it drops corrupt records during parsing. When inferring a schema, it implicitly
+   *     adds a `columnNameOfCorruptRecord` field in an output schema.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
    *   </ul>
@@ -550,12 +550,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
    *    during parsing. It supports the following case-insensitive modes.
    *   <ul>
-   *     <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
-   *     the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
+   *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
+   *     field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
    *     corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
    *     in an user-defined schema. If a schema does not have the field, it drops corrupt records
-   *     during parsing. When a length of parsed CSV tokens is shorter than an expected length
-   *     of a schema, it sets `null` for extra fields.</li>
+   *     during parsing. A record with less/more tokens than schema is not a corrupted record to
+   *     CSV. When it meets a record having fewer tokens than the length of the schema, sets
+   *     `null` to extra fields. When the record has more tokens than the length of the schema,
+   *     it drops extra tokens.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
    *   </ul>

http://git-wip-us.apache.org/repos/asf/spark/blob/b14993e1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 7d6d7e7..3d6cc30 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -203,6 +203,8 @@ class UnivocityParser(
           case _: BadRecordException => None
         }
       }
+      // For records with less or more tokens than the schema, tries to return partial results
+      // if possible.
       throw BadRecordException(
         () => getCurrentInput,
         () => getPartialResult(),
@@ -218,6 +220,9 @@ class UnivocityParser(
         row
       } catch {
         case NonFatal(e) =>
+          // For corrupted records with the number of tokens same as the schema,
+          // CSV reader doesn't support partial results. All fields other than the field
+          // configured by `columnNameOfCorruptRecord` are set to `null`.
           throw BadRecordException(() => getCurrentInput, () => None, e)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b14993e1/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index f238516..61e22fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -236,12 +236,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
    * during parsing.
    *   <ul>
-   *     <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
-   *     the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
-   *     corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
-   *     in an user-defined schema. If a schema does not have the field, it drops corrupt records
-   *     during parsing. When inferring a schema, it implicitly adds a `columnNameOfCorruptRecord`
-   *     field in an output schema.</li>
+   *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
+   *     field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To
+   *     keep corrupt records, an user can set a string type field named
+   *     `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the
+   *     field, it drops corrupt records during parsing. When inferring a schema, it implicitly
+   *     adds a `columnNameOfCorruptRecord` field in an output schema.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
    *   </ul>
@@ -316,12 +316,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
    *    during parsing. It supports the following case-insensitive modes.
    *   <ul>
-   *     <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
-   *     the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
+   *     <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
+   *     field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
    *     corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
    *     in an user-defined schema. If a schema does not have the field, it drops corrupt records
-   *     during parsing. When a length of parsed CSV tokens is shorter than an expected length
-   *     of a schema, it sets `null` for extra fields.</li>
+   *     during parsing. A record with less/more tokens than schema is not a corrupted record to
+   *     CSV. When it meets a record having fewer tokens than the length of the schema, sets
+   *     `null` to extra fields. When the record has more tokens than the length of the schema,
+   *     it drops extra tokens.</li>
    *     <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
    *     <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
    *   </ul>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org