You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/08/25 15:32:34 UTC
[spark] branch branch-3.0 updated: [SPARK-32614][SQL] Don't apply
comment processing if 'comment' unset for CSV
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 68ff809 [SPARK-32614][SQL] Don't apply comment processing if 'comment' unset for CSV
68ff809 is described below
commit 68ff809060c312fee1fbe8d46657e6c7aec62dba
Author: Sean Owen <sr...@gmail.com>
AuthorDate: Wed Aug 26 00:25:58 2020 +0900
[SPARK-32614][SQL] Don't apply comment processing if 'comment' unset for CSV
Spark's CSV source can optionally ignore lines starting with a comment char. Some code paths check to see if it's set before applying comment logic (i.e. not set to default of `\0`), but many do not, including the one that passes the option to Univocity. This means that rows beginning with a null char were being treated as comments even when 'disabled'.
To avoid dropping rows that start with a null char when this is not requested or intended. See JIRA for an example.
Nothing beyond the effect of the bug fix.
Existing tests plus new test case.
Closes #29516 from srowen/SPARK-32614.
Authored-by: Sean Owen <sr...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit a9d4e60a90d4d6765642e6bf7810da117af6437b)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
dev/deps/spark-deps-hadoop-2.7-hive-1.2 | 2 +-
dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 2 +-
dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 2 +-
pom.xml | 2 +-
.../spark/sql/catalyst/csv/CSVExprUtils.scala | 11 ++++++++---
.../apache/spark/sql/catalyst/csv/CSVOptions.scala | 10 ++++++++--
.../sql/execution/datasources/csv/CSVSuite.scala | 22 +++++++++++++++++-----
7 files changed, 37 insertions(+), 14 deletions(-)
diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 b/dev/deps/spark-deps-hadoop-2.7-hive-1.2
index 82d5a06..e32ea64 100644
--- a/dev/deps/spark-deps-hadoop-2.7-hive-1.2
+++ b/dev/deps/spark-deps-hadoop-2.7-hive-1.2
@@ -200,7 +200,7 @@ stream/2.9.6//stream-2.9.6.jar
stringtemplate/3.2.1//stringtemplate-3.2.1.jar
super-csv/2.2.0//super-csv-2.2.0.jar
threeten-extra/1.5.0//threeten-extra-1.5.0.jar
-univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
+univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
xercesImpl/2.12.0//xercesImpl-2.12.0.jar
xml-apis/1.4.01//xml-apis-1.4.01.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3
index 17c787e..168d619 100644
--- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3
@@ -213,7 +213,7 @@ stream/2.9.6//stream-2.9.6.jar
super-csv/2.2.0//super-csv-2.2.0.jar
threeten-extra/1.5.0//threeten-extra-1.5.0.jar
transaction-api/1.1//transaction-api-1.1.jar
-univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
+univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
velocity/1.5//velocity-1.5.jar
xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
xercesImpl/2.12.0//xercesImpl-2.12.0.jar
diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3
index b5a10b5..d730b4a 100644
--- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3
@@ -229,7 +229,7 @@ super-csv/2.2.0//super-csv-2.2.0.jar
threeten-extra/1.5.0//threeten-extra-1.5.0.jar
token-provider/1.0.1//token-provider-1.0.1.jar
transaction-api/1.1//transaction-api-1.1.jar
-univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
+univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
velocity/1.5//velocity-1.5.jar
woodstox-core/5.0.3//woodstox-core-5.0.3.jar
xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
diff --git a/pom.xml b/pom.xml
index 1bf5de0..32a1308 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2287,7 +2287,7 @@
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
- <version>2.8.3</version>
+ <version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala
index 3e83c1d..efe41883 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala
@@ -25,8 +25,13 @@ object CSVExprUtils {
* This is currently being used in CSV reading path and CSV schema inference.
*/
def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
- iter.filter { line =>
- line.trim.nonEmpty && !line.startsWith(options.comment.toString)
+ if (options.isCommentSet) {
+ val commentPrefix = options.comment.toString
+ iter.filter { line =>
+ line.trim.nonEmpty && !line.startsWith(commentPrefix)
+ }
+ } else {
+ iter.filter(_.trim.nonEmpty)
}
}
@@ -34,7 +39,7 @@ object CSVExprUtils {
if (options.isCommentSet) {
val commentPrefix = options.comment.toString
iter.dropWhile { line =>
- line.trim.isEmpty || line.trim.startsWith(commentPrefix)
+ line.trim.isEmpty || line.startsWith(commentPrefix)
}
} else {
iter.dropWhile(_.trim.isEmpty)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index 9d09cab..f2191fc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -220,7 +220,9 @@ class CSVOptions(
format.setQuote(quote)
format.setQuoteEscape(escape)
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
- format.setComment(comment)
+ if (isCommentSet) {
+ format.setComment(comment)
+ }
lineSeparatorInWrite.foreach(format.setLineSeparator)
writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
@@ -242,7 +244,11 @@ class CSVOptions(
format.setQuoteEscape(escape)
lineSeparator.foreach(format.setLineSeparator)
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
- format.setComment(comment)
+ if (isCommentSet) {
+ format.setComment(comment)
+ } else {
+ settings.setCommentProcessingEnabled(false)
+ }
settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead)
settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index d54146a..9ba2cab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1902,25 +1902,26 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa
test("SPARK-25387: bad input should not cause NPE") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
- val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
+ val input = spark.createDataset(Seq("\u0001\u0000\u0001234"))
checkAnswer(spark.read.schema(schema).csv(input), Row(null))
checkAnswer(spark.read.option("multiLine", true).schema(schema).csv(input), Row(null))
- assert(spark.read.csv(input).collect().toSet == Set(Row()))
+ assert(spark.read.schema(schema).csv(input).collect().toSet == Set(Row(null)))
}
test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") {
val schema = StructType(
StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil)
- val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
+ val input = spark.createDataset(Seq("\u0001\u0000\u0001234"))
checkAnswer(
spark.read
.option("columnNameOfCorruptRecord", "_corrupt_record")
.schema(schema)
.csv(input),
- Row(null, null))
- assert(spark.read.csv(input).collect().toSet == Set(Row()))
+ Row(null, "\u0001\u0000\u0001234"))
+ assert(spark.read.schema(schema).csv(input).collect().toSet ==
+ Set(Row(null, "\u0001\u0000\u0001234")))
}
test("field names of inferred schema shouldn't compare to the first row") {
@@ -2353,6 +2354,17 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa
checkAnswer(csv, Row(null))
}
}
+
+ test("SPARK-32614: don't treat rows starting with null char as comment") {
+ withTempPath { path =>
+ Seq("\u0000foo", "bar", "baz").toDS.write.text(path.getCanonicalPath)
+ val df = spark.read.format("csv")
+ .option("header", "false")
+ .option("inferSchema", "true")
+ .load(path.getCanonicalPath)
+ assert(df.count() == 3)
+ }
+ }
}
class CSVv1Suite extends CSVSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org