You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/11/27 06:51:50 UTC
[spark] branch master updated: [SPARK-33566][CORE][SQL][SS][PYTHON]
Make unescapedQuoteHandling option configurable when read CSV
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 433ae90 [SPARK-33566][CORE][SQL][SS][PYTHON] Make unescapedQuoteHandling option configurable when read CSV
433ae90 is described below
commit 433ae9064f55b8adb27b561e1ff17c32f0bf3465
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Fri Nov 27 15:47:39 2020 +0900
[SPARK-33566][CORE][SQL][SS][PYTHON] Make unescapedQuoteHandling option configurable when read CSV
### What changes were proposed in this pull request?
There are some differences between Spark CSV, opencsv and commons-csv, the typical case are described in SPARK-33566, When there are both unescaped quotes and unescaped qualifier in value, the results of parsing are different.
The reason for the difference is Spark use `STOP_AT_DELIMITER` as default `UnescapedQuoteHandling` to build `CsvParser` and it not configurable.
On the other hand, opencsv and commons-csv use the parsing mechanism similar to `STOP_AT_CLOSING_QUOTE ` by default.
So this pr make `unescapedQuoteHandling` option configurable to get the same parsing result as opencsv and commons-csv.
### Why are the changes needed?
Make unescapedQuoteHandling option configurable when read CSV to make parsing more flexible。
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Pass the Jenkins or GitHub Action
- Add a new case similar to that described in SPARK-33566
Closes #30518 from LuciferYang/SPARK-33566.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
python/pyspark/sql/readwriter.py | 26 ++++++++++++++++++++--
python/pyspark/sql/readwriter.pyi | 1 +
python/pyspark/sql/streaming.py | 25 +++++++++++++++++++--
python/pyspark/sql/streaming.pyi | 1 +
.../apache/spark/sql/catalyst/csv/CSVOptions.scala | 8 ++++++-
.../org/apache/spark/sql/DataFrameReader.scala | 21 +++++++++++++++++
.../spark/sql/streaming/DataStreamReader.scala | 21 +++++++++++++++++
.../sql/execution/datasources/csv/CSVSuite.scala | 24 ++++++++++++++++++++
8 files changed, 122 insertions(+), 5 deletions(-)
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index bb31e6a..d120daa 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -522,7 +522,8 @@ class DataFrameReader(OptionUtils):
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
- pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
+ pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None,
+ unescapedQuoteHandling=None):
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -685,6 +686,26 @@ class DataFrameReader(OptionUtils):
modifiedAfter (batch only) : an optional timestamp to only include files with
modification times occurring after the specified time. The provided timestamp
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
+ unescapedQuoteHandling : str, optional
+ defines how the CsvParser will handle values with unescaped quotes. If None is
+ set, it uses the default value, ``STOP_AT_DELIMITER``.
+
+ * ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate
+ the quote character and proceed parsing the value as a quoted value, until a closing
+ quote is found.
+ * ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value
+ as an unquoted value. This will make the parser accumulate all characters of the current
+ parsed value until the delimiter is found. If no delimiter is found in the value, the
+ parser will continue accumulating characters from the input until a delimiter or line
+ ending is found.
+ * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value
+ as an unquoted value. This will make the parser accumulate all characters until the
+ delimiter or a line ending is found in the input.
+ * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed
+ for the given value will be skipped and the value set in nullValue will be produced
+ instead.
+ * ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException
+ will be thrown.
Examples
--------
@@ -708,7 +729,8 @@ class DataFrameReader(OptionUtils):
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
- modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter)
+ modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter,
+ unescapedQuoteHandling=unescapedQuoteHandling)
if isinstance(path, str):
path = [path]
if type(path) == list:
diff --git a/python/pyspark/sql/readwriter.pyi b/python/pyspark/sql/readwriter.pyi
index 64c5697..c3b9a42 100644
--- a/python/pyspark/sql/readwriter.pyi
+++ b/python/pyspark/sql/readwriter.pyi
@@ -113,6 +113,7 @@ class DataFrameReader(OptionUtils):
lineSep: Optional[str] = ...,
pathGlobFilter: Optional[Union[bool, str]] = ...,
recursiveFileLookup: Optional[Union[bool, str]] = ...,
+ unescapedQuoteHandling: Optional[str] = ...,
) -> DataFrame: ...
def orc(
self,
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index e7b2fa1..365b5f3 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -761,7 +761,7 @@ class DataStreamReader(OptionUtils):
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
- pathGlobFilter=None, recursiveFileLookup=None):
+ pathGlobFilter=None, recursiveFileLookup=None, unescapedQuoteHandling=None):
r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@@ -900,6 +900,26 @@ class DataStreamReader(OptionUtils):
recursiveFileLookup : str or bool, optional
recursively scan a directory for files. Using this option disables
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa
+ unescapedQuoteHandling : str, optional
+ defines how the CsvParser will handle values with unescaped quotes. If None is
+ set, it uses the default value, ``STOP_AT_DELIMITER``.
+
+ * ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate
+ the quote character and proceed parsing the value as a quoted value, until a closing
+ quote is found.
+ * ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value
+ as an unquoted value. This will make the parser accumulate all characters of the current
+ parsed value until the delimiter is found. If no delimiter is found in the value, the
+ parser will continue accumulating characters from the input until a delimiter or line
+ ending is found.
+ * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value
+ as an unquoted value. This will make the parser accumulate all characters until the
+ delimiter or a line ending is found in the input.
+ * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed
+ for the given value will be skipped and the value set in nullValue will be produced
+ instead.
+ * ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException
+ will be thrown.
.. versionadded:: 2.0.0
@@ -926,7 +946,8 @@ class DataStreamReader(OptionUtils):
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
emptyValue=emptyValue, locale=locale, lineSep=lineSep,
- pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
+ pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
+ unescapedQuoteHandling=unescapedQuoteHandling)
if isinstance(path, str):
return self._df(self._jreader.csv(path))
else:
diff --git a/python/pyspark/sql/streaming.pyi b/python/pyspark/sql/streaming.pyi
index 56ce140..829610a 100644
--- a/python/pyspark/sql/streaming.pyi
+++ b/python/pyspark/sql/streaming.pyi
@@ -149,6 +149,7 @@ class DataStreamReader(OptionUtils):
lineSep: Optional[str] = ...,
pathGlobFilter: Optional[Union[bool, str]] = ...,
recursiveFileLookup: Optional[Union[bool, str]] = ...,
+ unescapedQuoteHandling: Optional[str] = ...,
) -> DataFrame: ...
class DataStreamWriter:
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index f2191fc..ec40599 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -213,6 +213,12 @@ class CSVOptions(
}
val lineSeparatorInWrite: Option[String] = lineSeparator
+ /**
+ * The handling method to be used when unescaped quotes are found in the input.
+ */
+ val unescapedQuoteHandling: UnescapedQuoteHandling = UnescapedQuoteHandling.valueOf(parameters
+ .getOrElse("unescapedQuoteHandling", "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))
+
def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
@@ -258,7 +264,7 @@ class CSVOptions(
settings.setNullValue(nullValue)
settings.setEmptyValue(emptyValueInRead)
settings.setMaxCharsPerColumn(maxCharsPerColumn)
- settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
+ settings.setUnescapedQuoteHandling(unescapedQuoteHandling)
settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine)
lineSeparatorInRead.foreach { _ =>
settings.setNormalizeLineEndingsWithinQuotes(!multiLine)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b26bc64..8f96f0b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -727,6 +727,27 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
+ * <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser
+ * will handle values with unescaped quotes.
+ * <ul>
+ * <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate
+ * the quote character and proceed parsing the value as a quoted value, until a closing
+ * quote is found.</li>
+ * <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value
+ * as an unquoted value. This will make the parser accumulate all characters of the current
+ * parsed value until the delimiter is found. If no
+ * delimiter is found in the value, the parser will continue accumulating characters from
+ * the input until a delimiter or line ending is found.</li>
+ * <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value
+ * as an unquoted value. This will make the parser accumulate all characters until the
+ * delimiter or a line ending is found in the input.</li>
+ * <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed
+ * for the given value will be skipped and the value set in nullValue will be produced
+ * instead.</li>
+ * <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException
+ * will be thrown.</li>
+ * </ul>
+ * </li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing. It supports the following case-insensitive modes. Note that Spark tries
* to parse only required columns in CSV under column pruning. Therefore, corrupt records
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 9bc4acd..7f4ef8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -396,6 +396,27 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
* for any given value being read. By default, it is -1 meaning unlimited length</li>
+ * <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser
+ * will handle values with unescaped quotes.
+ * <ul>
+ * <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate
+ * the quote character and proceed parsing the value as a quoted value, until a closing
+ * quote is found.</li>
+ * <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value
+ * as an unquoted value. This will make the parser accumulate all characters of the current
+ * parsed value until the delimiter is found. If no delimiter is found in the value, the
+ * parser will continue accumulating characters from the input until a delimiter or line
+ * ending is found.</li>
+ * <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value
+ * as an unquoted value. This will make the parser accumulate all characters until the
+ * delimiter or a line ending is found in the input.</li>
+ * <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed
+ * for the given value will be skipped and the value set in nullValue will be produced
+ * instead.</li>
+ * <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException
+ * will be thrown.</li>
+ * </ul>
+ * </li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing. It supports the following case-insensitive modes.
* <ul>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index a236814..30f0e45 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -2428,6 +2428,30 @@ abstract class CSVSuite
assert(readback.collect sameElements Array(Row("0"), Row("1"), Row("2")))
}
}
+
+ test("SPARK-33566: configure UnescapedQuoteHandling to parse " +
+ "unescaped quotes and unescaped delimiter data correctly") {
+ withTempPath { path =>
+ val dataPath = path.getCanonicalPath
+ val row1 = Row("""a,""b,c""", "xyz")
+ val row2 = Row("""a,b,c""", """x""yz""")
+ // Generate the test data, use `,` as delimiter and `"` as quotes, but they didn't escape.
+ Seq(
+ """c1,c2""",
+ s""""${row1.getString(0)}","${row1.getString(1)}"""",
+ s""""${row2.getString(0)}","${row2.getString(1)}"""")
+ .toDF().repartition(1).write.text(dataPath)
+ // Without configure UnescapedQuoteHandling to STOP_AT_CLOSING_QUOTE,
+ // the result will be Row(""""a,""b""", """c""""), Row("""a,b,c""", """"x""yz"""")
+ val result = spark.read
+ .option("inferSchema", "true")
+ .option("header", "true")
+ .option("unescapedQuoteHandling", "STOP_AT_CLOSING_QUOTE")
+ .csv(dataPath).collect()
+ val exceptResults = Array(row1, row2)
+ assert(result.sameElements(exceptResults))
+ }
+ }
}
class CSVv1Suite extends CSVSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org