You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/08/25 15:32:34 UTC

[spark] branch branch-3.0 updated: [SPARK-32614][SQL] Don't apply comment processing if 'comment' unset for CSV

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 68ff809  [SPARK-32614][SQL] Don't apply comment processing if 'comment' unset for CSV
68ff809 is described below

commit 68ff809060c312fee1fbe8d46657e6c7aec62dba
Author: Sean Owen <sr...@gmail.com>
AuthorDate: Wed Aug 26 00:25:58 2020 +0900

    [SPARK-32614][SQL] Don't apply comment processing if 'comment' unset for CSV
    
    Spark's CSV source can optionally ignore lines starting with a comment char. Some code paths check to see if it's set before applying comment logic (i.e. not set to default of `\0`), but many do not, including the one that passes the option to Univocity. This means that rows beginning with a null char were being treated as comments even when 'disabled'.
    
    To avoid dropping rows that start with a null char when this is not requested or intended. See JIRA for an example.
    
    Nothing beyond the effect of the bug fix.
    
    Existing tests plus new test case.
    
    Closes #29516 from srowen/SPARK-32614.
    
    Authored-by: Sean Owen <sr...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit a9d4e60a90d4d6765642e6bf7810da117af6437b)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 dev/deps/spark-deps-hadoop-2.7-hive-1.2            |  2 +-
 dev/deps/spark-deps-hadoop-2.7-hive-2.3            |  2 +-
 dev/deps/spark-deps-hadoop-3.2-hive-2.3            |  2 +-
 pom.xml                                            |  2 +-
 .../spark/sql/catalyst/csv/CSVExprUtils.scala      | 11 ++++++++---
 .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 10 ++++++++--
 .../sql/execution/datasources/csv/CSVSuite.scala   | 22 +++++++++++++++++-----
 7 files changed, 37 insertions(+), 14 deletions(-)

diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 b/dev/deps/spark-deps-hadoop-2.7-hive-1.2
index 82d5a06..e32ea64 100644
--- a/dev/deps/spark-deps-hadoop-2.7-hive-1.2
+++ b/dev/deps/spark-deps-hadoop-2.7-hive-1.2
@@ -200,7 +200,7 @@ stream/2.9.6//stream-2.9.6.jar
 stringtemplate/3.2.1//stringtemplate-3.2.1.jar
 super-csv/2.2.0//super-csv-2.2.0.jar
 threeten-extra/1.5.0//threeten-extra-1.5.0.jar
-univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
+univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
 xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
 xercesImpl/2.12.0//xercesImpl-2.12.0.jar
 xml-apis/1.4.01//xml-apis-1.4.01.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3
index 17c787e..168d619 100644
--- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3
@@ -213,7 +213,7 @@ stream/2.9.6//stream-2.9.6.jar
 super-csv/2.2.0//super-csv-2.2.0.jar
 threeten-extra/1.5.0//threeten-extra-1.5.0.jar
 transaction-api/1.1//transaction-api-1.1.jar
-univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
+univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
 velocity/1.5//velocity-1.5.jar
 xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
 xercesImpl/2.12.0//xercesImpl-2.12.0.jar
diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3
index b5a10b5..d730b4a 100644
--- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3
@@ -229,7 +229,7 @@ super-csv/2.2.0//super-csv-2.2.0.jar
 threeten-extra/1.5.0//threeten-extra-1.5.0.jar
 token-provider/1.0.1//token-provider-1.0.1.jar
 transaction-api/1.1//transaction-api-1.1.jar
-univocity-parsers/2.8.3//univocity-parsers-2.8.3.jar
+univocity-parsers/2.9.0//univocity-parsers-2.9.0.jar
 velocity/1.5//velocity-1.5.jar
 woodstox-core/5.0.3//woodstox-core-5.0.3.jar
 xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar
diff --git a/pom.xml b/pom.xml
index 1bf5de0..32a1308 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2287,7 +2287,7 @@
       <dependency>
         <groupId>com.univocity</groupId>
         <artifactId>univocity-parsers</artifactId>
-        <version>2.8.3</version>
+        <version>2.9.0</version>
       </dependency>
       <dependency>
         <groupId>org.apache.hive</groupId>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala
index 3e83c1d..efe41883 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVExprUtils.scala
@@ -25,8 +25,13 @@ object CSVExprUtils {
    * This is currently being used in CSV reading path and CSV schema inference.
    */
   def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
-    iter.filter { line =>
-      line.trim.nonEmpty && !line.startsWith(options.comment.toString)
+    if (options.isCommentSet) {
+      val commentPrefix = options.comment.toString
+      iter.filter { line =>
+        line.trim.nonEmpty && !line.startsWith(commentPrefix)
+      }
+    } else {
+      iter.filter(_.trim.nonEmpty)
     }
   }
 
@@ -34,7 +39,7 @@ object CSVExprUtils {
     if (options.isCommentSet) {
       val commentPrefix = options.comment.toString
       iter.dropWhile { line =>
-        line.trim.isEmpty || line.trim.startsWith(commentPrefix)
+        line.trim.isEmpty || line.startsWith(commentPrefix)
       }
     } else {
       iter.dropWhile(_.trim.isEmpty)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index 9d09cab..f2191fc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -220,7 +220,9 @@ class CSVOptions(
     format.setQuote(quote)
     format.setQuoteEscape(escape)
     charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
-    format.setComment(comment)
+    if (isCommentSet) {
+      format.setComment(comment)
+    }
     lineSeparatorInWrite.foreach(format.setLineSeparator)
 
     writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
@@ -242,7 +244,11 @@ class CSVOptions(
     format.setQuoteEscape(escape)
     lineSeparator.foreach(format.setLineSeparator)
     charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
-    format.setComment(comment)
+    if (isCommentSet) {
+      format.setComment(comment)
+    } else {
+      settings.setCommentProcessingEnabled(false)
+    }
 
     settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead)
     settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index d54146a..9ba2cab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1902,25 +1902,26 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa
 
   test("SPARK-25387: bad input should not cause NPE") {
     val schema = StructType(StructField("a", IntegerType) :: Nil)
-    val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
+    val input = spark.createDataset(Seq("\u0001\u0000\u0001234"))
 
     checkAnswer(spark.read.schema(schema).csv(input), Row(null))
     checkAnswer(spark.read.option("multiLine", true).schema(schema).csv(input), Row(null))
-    assert(spark.read.csv(input).collect().toSet == Set(Row()))
+    assert(spark.read.schema(schema).csv(input).collect().toSet == Set(Row(null)))
   }
 
   test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not cause NPE") {
     val schema = StructType(
       StructField("a", IntegerType) :: StructField("_corrupt_record", StringType) :: Nil)
-    val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
+    val input = spark.createDataset(Seq("\u0001\u0000\u0001234"))
 
     checkAnswer(
       spark.read
         .option("columnNameOfCorruptRecord", "_corrupt_record")
         .schema(schema)
         .csv(input),
-      Row(null, null))
-    assert(spark.read.csv(input).collect().toSet == Set(Row()))
+      Row(null, "\u0001\u0000\u0001234"))
+    assert(spark.read.schema(schema).csv(input).collect().toSet ==
+      Set(Row(null, "\u0001\u0000\u0001234")))
   }
 
   test("field names of inferred schema shouldn't compare to the first row") {
@@ -2353,6 +2354,17 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa
       checkAnswer(csv, Row(null))
     }
   }
+
+  test("SPARK-32614: don't treat rows starting with null char as comment") {
+    withTempPath { path =>
+      Seq("\u0000foo", "bar", "baz").toDS.write.text(path.getCanonicalPath)
+      val df = spark.read.format("csv")
+        .option("header", "false")
+        .option("inferSchema", "true")
+        .load(path.getCanonicalPath)
+      assert(df.count() == 3)
+    }
+  }
 }
 
 class CSVv1Suite extends CSVSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org