You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/13 01:51:58 UTC
spark git commit: [SPARK-25387][SQL] Fix for NPE caused by bad CSV
input
Repository: spark
Updated Branches:
refs/heads/master ab25c9679 -> 083c94476
[SPARK-25387][SQL] Fix for NPE caused by bad CSV input
## What changes were proposed in this pull request?
The PR fixes NPE in `UnivocityParser` caused by malformed CSV input. In some cases, `uniVocity` parser can return `null` for bad input. In the PR, I propose to check result of parsing and not propagate NPE to upper layers.
## How was this patch tested?
I added a test which reproduce the issue and tested by `CSVSuite`.
Closes #22374 from MaxGekk/npe-on-bad-csv.
Lead-authored-by: Maxim Gekk <ma...@gmail.com>
Co-authored-by: Maxim Gekk <ma...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/083c9447
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/083c9447
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/083c9447
Branch: refs/heads/master
Commit: 083c9447671719e0bd67312e3d572f6160c06a4a
Parents: ab25c96
Author: Maxim Gekk <ma...@gmail.com>
Authored: Thu Sep 13 09:51:49 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Sep 13 09:51:49 2018 +0800
----------------------------------------------------------------------
.../datasources/csv/CSVDataSource.scala | 36 +++++++++++---------
.../datasources/csv/UnivocityParser.scala | 7 +++-
.../execution/datasources/csv/CSVSuite.scala | 11 +++++-
3 files changed, 35 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/083c9447/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 2b86054..e840ff1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -240,23 +240,25 @@ object TextInputCSVDataSource extends CSVDataSource {
sparkSession: SparkSession,
csv: Dataset[String],
maybeFirstLine: Option[String],
- parsedOptions: CSVOptions): StructType = maybeFirstLine match {
- case Some(firstLine) =>
- val firstRow = new CsvParser(parsedOptions.asParserSettings).parseLine(firstLine)
- val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
- val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
- val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions)
- val tokenRDD = sampled.rdd.mapPartitions { iter =>
- val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions)
- val linesWithoutHeader =
- CSVUtils.filterHeaderLine(filteredLines, firstLine, parsedOptions)
- val parser = new CsvParser(parsedOptions.asParserSettings)
- linesWithoutHeader.map(parser.parseLine)
- }
- CSVInferSchema.infer(tokenRDD, header, parsedOptions)
- case None =>
- // If the first line could not be read, just return the empty schema.
- StructType(Nil)
+ parsedOptions: CSVOptions): StructType = {
+ val csvParser = new CsvParser(parsedOptions.asParserSettings)
+ maybeFirstLine.map(csvParser.parseLine(_)) match {
+ case Some(firstRow) if firstRow != null =>
+ val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+ val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
+ val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions)
+ val tokenRDD = sampled.rdd.mapPartitions { iter =>
+ val filteredLines = CSVUtils.filterCommentAndEmpty(iter, parsedOptions)
+ val linesWithoutHeader =
+ CSVUtils.filterHeaderLine(filteredLines, maybeFirstLine.get, parsedOptions)
+ val parser = new CsvParser(parsedOptions.asParserSettings)
+ linesWithoutHeader.map(parser.parseLine)
+ }
+ CSVInferSchema.infer(tokenRDD, header, parsedOptions)
+ case _ =>
+ // If the first line could not be read, just return the empty schema.
+ StructType(Nil)
+ }
}
private def createBaseDataset(
http://git-wip-us.apache.org/repos/asf/spark/blob/083c9447/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index e15af42..9088d43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -216,7 +216,12 @@ class UnivocityParser(
}
private def convert(tokens: Array[String]): InternalRow = {
- if (tokens.length != parsedSchema.length) {
+ if (tokens == null) {
+ throw BadRecordException(
+ () => getCurrentInput,
+ () => None,
+ new RuntimeException("Malformed CSV record"))
+ } else if (tokens.length != parsedSchema.length) {
// If the number of tokens doesn't match the schema, we should treat it as a malformed record.
// However, we still have chance to parse some of the tokens, by adding extra null tokens in
// the tail if the number is smaller, or by dropping extra tokens if the number is larger.
http://git-wip-us.apache.org/repos/asf/spark/blob/083c9447/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 2b39a0b..f70df0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -34,7 +34,7 @@ import org.apache.log4j.{AppenderSkeleton, LogManager}
import org.apache.log4j.spi.LoggingEvent
import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, UDT}
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
@@ -1811,4 +1811,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
checkCount(2)
countForMalformedCSV(0, Seq(""))
}
+
+ test("SPARK-25387: bad input should not cause NPE") {
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
+
+ checkAnswer(spark.read.schema(schema).csv(input), Row(null))
+ checkAnswer(spark.read.option("multiLine", true).schema(schema).csv(input), Row(null))
+ assert(spark.read.csv(input).collect().toSet == Set(Row()))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org