You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/13 01:51:58 UTC

spark git commit: [SPARK-25387][SQL] Fix for NPE caused by bad CSV input

Repository: spark
Updated Branches:
  refs/heads/master ab25c9679 -> 083c94476


[SPARK-25387][SQL] Fix for NPE caused by bad CSV input

## What changes were proposed in this pull request?

The PR fixes NPE in `UnivocityParser` caused by malformed CSV input. In some cases, `uniVocity` parser can return `null` for bad input. In the PR, I propose to check result of parsing and not propagate NPE to upper layers.

## How was this patch tested?

I added a test which reproduce the issue and tested by `CSVSuite`.

Closes #22374 from MaxGekk/npe-on-bad-csv.

Lead-authored-by: Maxim Gekk <ma...@gmail.com>
Co-authored-by: Maxim Gekk <ma...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/083c9447
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/083c9447
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/083c9447

Branch: refs/heads/master
Commit: 083c9447671719e0bd67312e3d572f6160c06a4a
Parents: ab25c96
Author: Maxim Gekk <ma...@gmail.com>
Authored: Thu Sep 13 09:51:49 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Sep 13 09:51:49 2018 +0800

----------------------------------------------------------------------
 .../datasources/csv/CSVDataSource.scala         | 36 +++++++++++---------
 .../datasources/csv/UnivocityParser.scala       |  7 +++-
 .../execution/datasources/csv/CSVSuite.scala    | 11 +++++-
 3 files changed, 35 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/083c9447/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 2b86054..e840ff1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -240,23 +240,25 @@ object TextInputCSVDataSource extends CSVDataSource {
       sparkSession: SparkSession,
       csv: Dataset[String],
       maybeFirstLine: Option[String],
-      parsedOptions: CSVOptions): StructType = maybeFirstLine match {
-    case Some(firstLine) =>
-      val firstRow = new CsvParser(parsedOptions.asParserSettings).parseLine(firstLine)
-      val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
-      val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
-      val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions)
-      val tokenRDD = sampled.rdd.mapPartitions { iter =>
-        val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions)
-        val linesWithoutHeader =
-          CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions)
-        val parser = new CsvParser(parsedOptions.asParserSettings)
-        linesWithoutHeader.map(parser.parseLine)
-      }
-      CSVInferSchema.infer(tokenRDD, header, parsedOptions)
-    case None =>
-      // If the first line could not be read, just return the empty schema.
-      StructType(Nil)
+      parsedOptions: CSVOptions): StructType = {
+    val csvParser = new CsvParser(parsedOptions.asParserSettings)
+    maybeFirstLine.map(csvParser.parseLine(_)) match {
+      case Some(firstRow) if firstRow != null =>
+        val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+        val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
+        val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions)
+        val tokenRDD = sampled.rdd.mapPartitions { iter =>
+          val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions)
+          val linesWithoutHeader =
+            CSVUtils.filterHeaderLine(filteredLines, maybeFirstLine.get, parsedOptions)
+          val parser = new CsvParser(parsedOptions.asParserSettings)
+          linesWithoutHeader.map(parser.parseLine)
+        }
+        CSVInferSchema.infer(tokenRDD, header, parsedOptions)
+      case _ =>
+        // If the first line could not be read, just return the empty schema.
+        StructType(Nil)
+    }
   }
 
   private def createBaseDataset(

http://git-wip-us.apache.org/repos/asf/spark/blob/083c9447/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index e15af42..9088d43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -216,7 +216,12 @@ class UnivocityParser(
   }
 
   private def convert(tokens: Array[String]): InternalRow = {
-    if (tokens.length != parsedSchema.length) {
+    if (tokens == null) {
+      throw BadRecordException(
+        () => getCurrentInput,
+        () => None,
+        new RuntimeException("Malformed CSV record"))
+    } else if (tokens.length != parsedSchema.length) {
       // If the number of tokens doesn't match the schema, we should treat it as a malformed record.
       // However, we still have chance to parse some of the tokens, by adding extra null tokens in
       // the tail if the number is smaller, or by dropping extra tokens if the number is larger.

http://git-wip-us.apache.org/repos/asf/spark/blob/083c9447/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 2b39a0b..f70df0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -34,7 +34,7 @@ import org.apache.log4j.{AppenderSkeleton, LogManager}
 import org.apache.log4j.spi.LoggingEvent
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, UDT}
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
@@ -1811,4 +1811,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
     checkCount(2)
     countForMalformedCSV(0, Seq(""))
   }
+
+  test("SPARK-25387: bad input should not cause NPE") {
+    val schema = StructType(StructField("a", IntegerType) :: Nil)
+    val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
+
+    checkAnswer(spark.read.schema(schema).csv(input), Row(null))
+    checkAnswer(spark.read.option("multiLine", true).schema(schema).csv(input), Row(null))
+    assert(spark.read.csv(input).collect().toSet == Set(Row()))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org