You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/27 05:33:56 UTC

[spark] branch master updated: [SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bcd480  [SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader
6bcd480 is described below

commit 6bcd4805d2eda3293a1f601b05e9d1f36b26b241
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Tue Mar 26 22:33:34 2019 -0700

    [SPARK-27286][SQL] Handles exceptions on proceeding to next record in FilePartitionReader
    
    ## What changes were proposed in this pull request?
    
    In data source V2, the method `PartitionReader.next()` has side effects. When the method is called, the current reader proceeds to the next record.
    This might throw RuntimeException/IOException and File source V2 framework should handle these exceptions.
    
    ## How was this patch tested?
    
    Unit test.
    
    Closes #24225 from gengliangwang/corruptFile.
    
    Authored-by: Gengliang Wang <ge...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../datasources/v2/FilePartitionReader.scala       | 13 +++++++-
 .../sql/execution/datasources/csv/CSVSuite.scala   | 37 +++++++++++++++++++++-
 2 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
index d76d69d..7ecd516 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -58,7 +58,18 @@ class FilePartitionReader[T](readers: Iterator[PartitionedFileReader[T]])
         return false
       }
     }
-    if (currentReader.next()) {
+
+    // In PartitionReader.next(), the current reader proceeds to next record.
+    // It might throw RuntimeException/IOException and Spark should handle these exceptions.
+    val hasNext = try {
+      currentReader.next()
+    } catch {
+      case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
+        logWarning(
+          s"Skipped the rest of the content in the corrupted file: $currentReader", e)
+        false
+    }
+    if (hasNext) {
       true
     } else {
       close()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index b2b0f39..6584a30 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.sql.execution.datasources.csv
 
-import java.io.File
+import java.io.{ByteArrayOutputStream, EOFException, File, FileOutputStream}
 import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException}
 import java.nio.file.Files
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
 import java.util.Locale
+import java.util.zip.GZIPOutputStream
 
 import scala.collection.JavaConverters._
 import scala.util.Properties
@@ -1171,6 +1172,40 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
     }
   }
 
+  test("Enabling/disabling ignoreCorruptFiles") {
+    val inputFile = File.createTempFile("input-", ".gz")
+    try {
+      // Create a corrupt gzip file
+      val byteOutput = new ByteArrayOutputStream()
+      val gzip = new GZIPOutputStream(byteOutput)
+      try {
+        gzip.write(Array[Byte](1, 2, 3, 4))
+      } finally {
+        gzip.close()
+      }
+      val bytes = byteOutput.toByteArray
+      val o = new FileOutputStream(inputFile)
+      try {
+        // It's corrupt since we only write half of bytes into the file.
+        o.write(bytes.take(bytes.length / 2))
+      } finally {
+        o.close()
+      }
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val e = intercept[SparkException] {
+          spark.read.csv(inputFile.toURI.toString).collect()
+        }
+        assert(e.getCause.isInstanceOf[EOFException])
+        assert(e.getCause.getMessage === "Unexpected end of input stream")
+      }
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+        assert(spark.read.csv(inputFile.toURI.toString).collect().isEmpty)
+      }
+    } finally {
+      inputFile.delete()
+    }
+  }
+
   test("SPARK-19610: Parse normal multi-line CSV files") {
     val primitiveFieldAndType = Seq(
       """"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org