You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/27 05:33:56 UTC
[spark] branch master updated: [SPARK-27286][SQL] Handles
exceptions on proceeding to next record in FilePartitionReader
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6bcd480 [SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader
6bcd480 is described below
commit 6bcd4805d2eda3293a1f601b05e9d1f36b26b241
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Tue Mar 26 22:33:34 2019 -0700
[SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader
## What changes were proposed in this pull request?
In data source V2, the method `PartitionReader.next()` has side effects. When the method is called, the current reader proceeds to the next record.
This might throw RuntimeException/IOException and File source V2 framework should handle these exceptions.
## How was this patch tested?
Unit test.
Closes #24225 from gengliangwang/corruptFile.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../datasources/v2/FilePartitionReader.scala | 13 +++++++-
.../sql/execution/datasources/csv/CSVSuite.scala | 37 +++++++++++++++++++++-
2 files changed, 48 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
index d76d69d..7ecd516 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -58,7 +58,18 @@ class FilePartitionReader[T](readers: Iterator[PartitionedFileReader[T]])
return false
}
}
- if (currentReader.next()) {
+
+ // In PartitionReader.next(), the current reader proceeds to next record.
+ // It might throw RuntimeException/IOException and Spark should handle these exceptions.
+ val hasNext = try {
+ currentReader.next()
+ } catch {
+ case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
+ logWarning(
+ s"Skipped the rest of the content in the corrupted file: $currentReader", e)
+ false
+ }
+ if (hasNext) {
true
} else {
close()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index b2b0f39..6584a30 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -17,12 +17,13 @@
package org.apache.spark.sql.execution.datasources.csv
-import java.io.File
+import java.io.{ByteArrayOutputStream, EOFException, File, FileOutputStream}
import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException}
import java.nio.file.Files
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.Locale
+import java.util.zip.GZIPOutputStream
import scala.collection.JavaConverters._
import scala.util.Properties
@@ -1171,6 +1172,40 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
}
}
+ test("Enabling/disabling ignoreCorruptFiles") {
+ val inputFile = File.createTempFile("input-", ".gz")
+ try {
+ // Create a corrupt gzip file
+ val byteOutput = new ByteArrayOutputStream()
+ val gzip = new GZIPOutputStream(byteOutput)
+ try {
+ gzip.write(Array[Byte](1, 2, 3, 4))
+ } finally {
+ gzip.close()
+ }
+ val bytes = byteOutput.toByteArray
+ val o = new FileOutputStream(inputFile)
+ try {
+ // It's corrupt since we only write half of bytes into the file.
+ o.write(bytes.take(bytes.length / 2))
+ } finally {
+ o.close()
+ }
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+ val e = intercept[SparkException] {
+ spark.read.csv(inputFile.toURI.toString).collect()
+ }
+ assert(e.getCause.isInstanceOf[EOFException])
+ assert(e.getCause.getMessage === "Unexpected end of input stream")
+ }
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+ assert(spark.read.csv(inputFile.toURI.toString).collect().isEmpty)
+ }
+ } finally {
+ inputFile.delete()
+ }
+ }
+
test("SPARK-19610: Parse normal multi-line CSV files") {
val primitiveFieldAndType = Seq(
""""
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org