You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2016/10/13 07:33:29 UTC

spark git commit: [SPARK-17850][CORE] Add a flag to ignore corrupt files (branch 1.6)

Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d3890deb7 -> 585c5657f


[SPARK-17850][CORE] Add a flag to ignore corrupt files (branch 1.6)

## What changes were proposed in this pull request?

This is the patch for 1.6. It only adds Spark conf `spark.files.ignoreCorruptFiles` because SQL just uses HadoopRDD directly in 1.6. `spark.files.ignoreCorruptFiles` is `true` by default.

## How was this patch tested?

The added test.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #15454 from zsxwing/SPARK-17850-1.6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/585c5657
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/585c5657
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/585c5657

Branch: refs/heads/branch-1.6
Commit: 585c5657f9452b7a1f4f6c9c0a9d933ebb4ed7b0
Parents: d3890de
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Thu Oct 13 00:33:00 2016 -0700
Committer: Mridul Muralidharan <mm...@HW11853.local>
Committed: Thu Oct 13 00:33:00 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  6 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     | 10 +++-
 .../test/scala/org/apache/spark/FileSuite.scala | 62 +++++++++++++++++++-
 3 files changed, 74 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/585c5657/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index f37c95b..463dd5b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -139,6 +139,9 @@ class HadoopRDD[K, V](
 
   private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
 
+  private val ignoreCorruptFiles =
+    sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true)
+
   // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
   protected def getJobConf(): JobConf = {
     val conf: Configuration = broadcastedConf.value.value
@@ -245,8 +248,7 @@ class HadoopRDD[K, V](
         try {
           finished = !reader.next(key, value)
         } catch {
-          case eof: EOFException =>
-            finished = true
+          case _: EOFException if ignoreCorruptFiles => finished = true
         }
         if (!finished) {
           inputMetrics.incRecordsRead(1)

http://git-wip-us.apache.org/repos/asf/spark/blob/585c5657/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 46fe1ba..5b5ddd5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.rdd
 
+import java.io.EOFException
 import java.text.SimpleDateFormat
 import java.util.Date
 
@@ -84,6 +85,9 @@ class NewHadoopRDD[K, V](
 
   private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
 
+  private val ignoreCorruptFiles =
+    sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true)
+
   def getConf: Configuration = {
     val conf: Configuration = confBroadcast.value.value
     if (shouldCloneJobConf) {
@@ -171,7 +175,11 @@ class NewHadoopRDD[K, V](
 
       override def hasNext: Boolean = {
         if (!finished && !havePair) {
-          finished = !reader.nextKeyValue
+          try {
+            finished = !reader.nextKeyValue
+          } catch {
+            case _: EOFException if ignoreCorruptFiles => finished = true
+          }
           if (finished) {
             // Close and release the reader here; close() will also be called when the task
             // completes, but for tasks that read from many files, it helps to release the

http://git-wip-us.apache.org/repos/asf/spark/blob/585c5657/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index fdb00aa..7e87092 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark
 
-import java.io.{File, FileWriter}
+import java.io._
+import java.util.zip.GZIPOutputStream
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.input.PortableDataStream
@@ -540,4 +541,63 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
         }.collect()
     assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
   }
+
+  test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") {
+    val inputFile = File.createTempFile("input-", ".gz")
+    try {
+      // Create a corrupt gzip file
+      val byteOutput = new ByteArrayOutputStream()
+      val gzip = new GZIPOutputStream(byteOutput)
+      try {
+        gzip.write(Array[Byte](1, 2, 3, 4))
+      } finally {
+        gzip.close()
+      }
+      val bytes = byteOutput.toByteArray
+      val o = new FileOutputStream(inputFile)
+      try {
+        // It's corrupt since we only write half of bytes into the file.
+        o.write(bytes.take(bytes.length / 2))
+      } finally {
+        o.close()
+      }
+
+      // Spark job should ignore corrupt files by default
+      sc = new SparkContext("local", "test")
+      // Test HadoopRDD
+      assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty)
+      // Test NewHadoopRDD
+      assert {
+        sc.newAPIHadoopFile(
+          inputFile.toURI.toString,
+          classOf[NewTextInputFormat],
+          classOf[LongWritable],
+          classOf[Text]).collect().isEmpty
+      }
+      sc.stop()
+
+      // Reading a corrupt gzip file should throw EOFException
+      val conf = new SparkConf().set("spark.files.ignoreCorruptFiles", "false")
+      sc = new SparkContext("local", "test", conf)
+      // Test HadoopRDD
+      var e = intercept[SparkException] {
+        sc.textFile(inputFile.toURI.toString).collect()
+      }
+      assert(e.getCause.isInstanceOf[EOFException])
+      assert(e.getCause.getMessage === "Unexpected end of input stream")
+      // Test NewHadoopRDD
+      e = intercept[SparkException] {
+        sc.newAPIHadoopFile(
+          inputFile.toURI.toString,
+          classOf[NewTextInputFormat],
+          classOf[LongWritable],
+          classOf[Text]).collect()
+      }
+      assert(e.getCause.isInstanceOf[EOFException])
+      assert(e.getCause.getMessage === "Unexpected end of input stream")
+    } finally {
+      inputFile.delete()
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org