You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/11/27 06:51:50 UTC

[spark] branch master updated: [SPARK-33566][CORE][SQL][SS][PYTHON] Make unescapedQuoteHandling option configurable when read CSV

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 433ae90  [SPARK-33566][CORE][SQL][SS][PYTHON] Make unescapedQuoteHandling option configurable when read CSV
433ae90 is described below

commit 433ae9064f55b8adb27b561e1ff17c32f0bf3465
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Fri Nov 27 15:47:39 2020 +0900

    [SPARK-33566][CORE][SQL][SS][PYTHON] Make unescapedQuoteHandling option configurable when read CSV
    
    ### What changes were proposed in this pull request?
    There are some differences between Spark CSV, opencsv and commons-csv, the typical case are described in SPARK-33566, When there are both unescaped quotes and unescaped qualifier in value,  the results of parsing are different.
    
    The reason for the difference is Spark use `STOP_AT_DELIMITER` as default `UnescapedQuoteHandling` to build `CsvParser` and it not configurable.
    
    On the other hand, opencsv and commons-csv use the parsing mechanism similar to `STOP_AT_CLOSING_QUOTE ` by default.
    
    So this pr make `unescapedQuoteHandling` option configurable to get the same parsing result as opencsv and commons-csv.
    
    ### Why are the changes needed?
    Make unescapedQuoteHandling option configurable when read CSV to make parsing more flexible。
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    - Pass the Jenkins or GitHub Action
    
    - Add a new case similar to that described in SPARK-33566
    
    Closes #30518 from LuciferYang/SPARK-33566.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 python/pyspark/sql/readwriter.py                   | 26 ++++++++++++++++++++--
 python/pyspark/sql/readwriter.pyi                  |  1 +
 python/pyspark/sql/streaming.py                    | 25 +++++++++++++++++++--
 python/pyspark/sql/streaming.pyi                   |  1 +
 .../apache/spark/sql/catalyst/csv/CSVOptions.scala |  8 ++++++-
 .../org/apache/spark/sql/DataFrameReader.scala     | 21 +++++++++++++++++
 .../spark/sql/streaming/DataStreamReader.scala     | 21 +++++++++++++++++
 .../sql/execution/datasources/csv/CSVSuite.scala   | 24 ++++++++++++++++++++
 8 files changed, 122 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index bb31e6a..d120daa 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -522,7 +522,8 @@ class DataFrameReader(OptionUtils):
             maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
             columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
             samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
-            pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
+            pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None,
+            unescapedQuoteHandling=None):
         r"""Loads a CSV file and returns the result as a  :class:`DataFrame`.
 
         This function will go through the input once to determine the input schema if
@@ -685,6 +686,26 @@ class DataFrameReader(OptionUtils):
         modifiedAfter (batch only) : an optional timestamp to only include files with
             modification times occurring after the specified time. The provided timestamp
             must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+        unescapedQuoteHandling : str, optional
+            defines how the CsvParser will handle values with unescaped quotes. If None is
+            set, it uses the default value, ``STOP_AT_DELIMITER``.
+
+            * ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate
+              the quote character and proceed parsing the value as a quoted value, until a closing
+              quote is found.
+            * ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value
+              as an unquoted value. This will make the parser accumulate all characters of the current
+              parsed value until the delimiter is found. If no delimiter is found in the value, the
+              parser will continue accumulating characters from the input until a delimiter or line
+              ending is found.
+            * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value
+              as an unquoted value. This will make the parser accumulate all characters until the
+              delimiter or a line ending is found in the input.
+            * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed
+              for the given value will be skipped and the value set in nullValue will be produced
+              instead.
+            * ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException
+              will be thrown.
 
         Examples
         --------
@@ -708,7 +729,8 @@ class DataFrameReader(OptionUtils):
             charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
             enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
             pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
-            modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter)
+            modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter,
+            unescapedQuoteHandling=unescapedQuoteHandling)
         if isinstance(path, str):
             path = [path]
         if type(path) == list:
diff --git a/python/pyspark/sql/readwriter.pyi b/python/pyspark/sql/readwriter.pyi
index 64c5697..c3b9a42 100644
--- a/python/pyspark/sql/readwriter.pyi
+++ b/python/pyspark/sql/readwriter.pyi
@@ -113,6 +113,7 @@ class DataFrameReader(OptionUtils):
         lineSep: Optional[str] = ...,
         pathGlobFilter: Optional[Union[bool, str]] = ...,
         recursiveFileLookup: Optional[Union[bool, str]] = ...,
+        unescapedQuoteHandling: Optional[str] = ...,
     ) -> DataFrame: ...
     def orc(
         self,
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index e7b2fa1..365b5f3 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -761,7 +761,7 @@ class DataStreamReader(OptionUtils):
             maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
             columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
             enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
-            pathGlobFilter=None, recursiveFileLookup=None):
+            pathGlobFilter=None, recursiveFileLookup=None, unescapedQuoteHandling=None):
         r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
 
         This function will go through the input once to determine the input schema if
@@ -900,6 +900,26 @@ class DataStreamReader(OptionUtils):
         recursiveFileLookup : str or bool, optional
             recursively scan a directory for files. Using this option disables
             `partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_.  # noqa
+        unescapedQuoteHandling : str, optional
+            defines how the CsvParser will handle values with unescaped quotes. If None is
+            set, it uses the default value, ``STOP_AT_DELIMITER``.
+
+            * ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate
+              the quote character and proceed parsing the value as a quoted value, until a closing
+              quote is found.
+            * ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value
+              as an unquoted value. This will make the parser accumulate all characters of the current
+              parsed value until the delimiter is found. If no delimiter is found in the value, the
+              parser will continue accumulating characters from the input until a delimiter or line
+              ending is found.
+            * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value
+              as an unquoted value. This will make the parser accumulate all characters until the
+              delimiter or a line ending is found in the input.
+            * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed
+              for the given value will be skipped and the value set in nullValue will be produced
+              instead.
+            * ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException
+              will be thrown.
 
         .. versionadded:: 2.0.0
 
@@ -926,7 +946,8 @@ class DataStreamReader(OptionUtils):
             columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
             charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
             emptyValue=emptyValue, locale=locale, lineSep=lineSep,
-            pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
+            pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
+            unescapedQuoteHandling=unescapedQuoteHandling)
         if isinstance(path, str):
             return self._df(self._jreader.csv(path))
         else:
diff --git a/python/pyspark/sql/streaming.pyi b/python/pyspark/sql/streaming.pyi
index 56ce140..829610a 100644
--- a/python/pyspark/sql/streaming.pyi
+++ b/python/pyspark/sql/streaming.pyi
@@ -149,6 +149,7 @@ class DataStreamReader(OptionUtils):
         lineSep: Optional[str] = ...,
         pathGlobFilter: Optional[Union[bool, str]] = ...,
         recursiveFileLookup: Optional[Union[bool, str]] = ...,
+        unescapedQuoteHandling: Optional[str] = ...,
     ) -> DataFrame: ...
 
 class DataStreamWriter:
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index f2191fc..ec40599 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -213,6 +213,12 @@ class CSVOptions(
   }
   val lineSeparatorInWrite: Option[String] = lineSeparator
 
+  /**
+   * The handling method to be used when unescaped quotes are found in the input.
+   */
+  val unescapedQuoteHandling: UnescapedQuoteHandling = UnescapedQuoteHandling.valueOf(parameters
+    .getOrElse("unescapedQuoteHandling", "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))
+
   def asWriterSettings: CsvWriterSettings = {
     val writerSettings = new CsvWriterSettings()
     val format = writerSettings.getFormat
@@ -258,7 +264,7 @@ class CSVOptions(
     settings.setNullValue(nullValue)
     settings.setEmptyValue(emptyValueInRead)
     settings.setMaxCharsPerColumn(maxCharsPerColumn)
-    settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
+    settings.setUnescapedQuoteHandling(unescapedQuoteHandling)
     settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine)
     lineSeparatorInRead.foreach { _ =>
       settings.setNormalizeLineEndingsWithinQuotes(!multiLine)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b26bc64..8f96f0b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -727,6 +727,27 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
    * a record can have.</li>
    * <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
    * for any given value being read. By default, it is -1 meaning unlimited length</li>
+   * <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser
+   * will handle values with unescaped quotes.
+   *   <ul>
+   *     <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate
+   *     the quote character and proceed parsing the value as a quoted value, until a closing
+   *     quote is found.</li>
+   *     <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value
+   *     as an unquoted value. This will make the parser accumulate all characters of the current
+   *     parsed value until the delimiter is found. If no
+   *     delimiter is found in the value, the parser will continue accumulating characters from
+   *     the input until a delimiter or line ending is found.</li>
+   *     <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value
+   *     as an unquoted value. This will make the parser accumulate all characters until the
+   *     delimiter or a line ending is found in the input.</li>
+   *     <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed
+   *     for the given value will be skipped and the value set in nullValue will be produced
+   *     instead.</li>
+   *     <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException
+   *     will be thrown.</li>
+   *   </ul>
+   * </li>
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
    *    during parsing. It supports the following case-insensitive modes. Note that Spark tries
    *    to parse only required columns in CSV under column pruning. Therefore, corrupt records
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 9bc4acd..7f4ef8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -396,6 +396,27 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * a record can have.</li>
    * <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
    * for any given value being read. By default, it is -1 meaning unlimited length</li>
+   * <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser
+   * will handle values with unescaped quotes.
+   *   <ul>
+   *     <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate
+   *     the quote character and proceed parsing the value as a quoted value, until a closing
+   *     quote is found.</li>
+   *     <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value
+   *     as an unquoted value. This will make the parser accumulate all characters of the current
+   *     parsed value until the delimiter is found. If no delimiter is found in the value, the
+   *     parser will continue accumulating characters from the input until a delimiter or line
+   *     ending is found.</li>
+   *     <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value
+   *     as an unquoted value. This will make the parser accumulate all characters until the
+   *     delimiter or a line ending is found in the input.</li>
+   *     <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed
+   *     for the given value will be skipped and the value set in nullValue will be produced
+   *     instead.</li>
+   *     <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException
+   *     will be thrown.</li>
+   *   </ul>
+   * </li>
    * <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
    *    during parsing. It supports the following case-insensitive modes.
    *   <ul>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index a236814..30f0e45 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -2428,6 +2428,30 @@ abstract class CSVSuite
       assert(readback.collect sameElements Array(Row("0"), Row("1"), Row("2")))
     }
   }
+
+  test("SPARK-33566: configure UnescapedQuoteHandling to parse " +
+    "unescaped quotes and unescaped delimiter data correctly") {
+    withTempPath { path =>
+      val dataPath = path.getCanonicalPath
+      val row1 = Row("""a,""b,c""", "xyz")
+      val row2 = Row("""a,b,c""", """x""yz""")
+      // Generate the test data, use `,` as delimiter and `"` as quotes, but they didn't escape.
+      Seq(
+        """c1,c2""",
+        s""""${row1.getString(0)}","${row1.getString(1)}"""",
+        s""""${row2.getString(0)}","${row2.getString(1)}"""")
+        .toDF().repartition(1).write.text(dataPath)
+      // Without configure UnescapedQuoteHandling to STOP_AT_CLOSING_QUOTE,
+      // the result will be Row(""""a,""b""", """c""""), Row("""a,b,c""", """"x""yz"""")
+      val result = spark.read
+        .option("inferSchema", "true")
+        .option("header", "true")
+        .option("unescapedQuoteHandling", "STOP_AT_CLOSING_QUOTE")
+        .csv(dataPath).collect()
+      val exceptResults = Array(row1, row2)
+      assert(result.sameElements(exceptResults))
+    }
+  }
 }
 
 class CSVv1Suite extends CSVSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org