You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2016/10/13 07:33:29 UTC
spark git commit: [SPARK-17850][CORE] Add a flag to ignore corrupt
files (branch 1.6)
Repository: spark
Updated Branches:
refs/heads/branch-1.6 d3890deb7 -> 585c5657f
[SPARK-17850][CORE] Add a flag to ignore corrupt files (branch 1.6)
## What changes were proposed in this pull request?
This is the patch for 1.6. It only adds Spark conf `spark.files.ignoreCorruptFiles` because SQL just uses HadoopRDD directly in 1.6. `spark.files.ignoreCorruptFiles` is `true` by default.
## How was this patch tested?
The added test.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #15454 from zsxwing/SPARK-17850-1.6.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/585c5657
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/585c5657
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/585c5657
Branch: refs/heads/branch-1.6
Commit: 585c5657f9452b7a1f4f6c9c0a9d933ebb4ed7b0
Parents: d3890de
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Thu Oct 13 00:33:00 2016 -0700
Committer: Mridul Muralidharan <mm...@HW11853.local>
Committed: Thu Oct 13 00:33:00 2016 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 6 +-
.../org/apache/spark/rdd/NewHadoopRDD.scala | 10 +++-
.../test/scala/org/apache/spark/FileSuite.scala | 62 +++++++++++++++++++-
3 files changed, 74 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/585c5657/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index f37c95b..463dd5b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -139,6 +139,9 @@ class HadoopRDD[K, V](
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
+ private val ignoreCorruptFiles =
+ sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true)
+
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
@@ -245,8 +248,7 @@ class HadoopRDD[K, V](
try {
finished = !reader.next(key, value)
} catch {
- case eof: EOFException =>
- finished = true
+ case _: EOFException if ignoreCorruptFiles => finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
http://git-wip-us.apache.org/repos/asf/spark/blob/585c5657/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 46fe1ba..5b5ddd5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -17,6 +17,7 @@
package org.apache.spark.rdd
+import java.io.EOFException
import java.text.SimpleDateFormat
import java.util.Date
@@ -84,6 +85,9 @@ class NewHadoopRDD[K, V](
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
+ private val ignoreCorruptFiles =
+ sparkContext.conf.getBoolean("spark.files.ignoreCorruptFiles", true)
+
def getConf: Configuration = {
val conf: Configuration = confBroadcast.value.value
if (shouldCloneJobConf) {
@@ -171,7 +175,11 @@ class NewHadoopRDD[K, V](
override def hasNext: Boolean = {
if (!finished && !havePair) {
- finished = !reader.nextKeyValue
+ try {
+ finished = !reader.nextKeyValue
+ } catch {
+ case _: EOFException if ignoreCorruptFiles => finished = true
+ }
if (finished) {
// Close and release the reader here; close() will also be called when the task
// completes, but for tasks that read from many files, it helps to release the
http://git-wip-us.apache.org/repos/asf/spark/blob/585c5657/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index fdb00aa..7e87092 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -17,7 +17,8 @@
package org.apache.spark
-import java.io.{File, FileWriter}
+import java.io._
+import java.util.zip.GZIPOutputStream
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.input.PortableDataStream
@@ -540,4 +541,63 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}.collect()
assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
}
+
+ test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") {
+ val inputFile = File.createTempFile("input-", ".gz")
+ try {
+ // Create a corrupt gzip file
+ val byteOutput = new ByteArrayOutputStream()
+ val gzip = new GZIPOutputStream(byteOutput)
+ try {
+ gzip.write(Array[Byte](1, 2, 3, 4))
+ } finally {
+ gzip.close()
+ }
+ val bytes = byteOutput.toByteArray
+ val o = new FileOutputStream(inputFile)
+ try {
+ // It's corrupt since we only write half of bytes into the file.
+ o.write(bytes.take(bytes.length / 2))
+ } finally {
+ o.close()
+ }
+
+ // Spark job should ignore corrupt files by default
+ sc = new SparkContext("local", "test")
+ // Test HadoopRDD
+ assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty)
+ // Test NewHadoopRDD
+ assert {
+ sc.newAPIHadoopFile(
+ inputFile.toURI.toString,
+ classOf[NewTextInputFormat],
+ classOf[LongWritable],
+ classOf[Text]).collect().isEmpty
+ }
+ sc.stop()
+
+ // Reading a corrupt gzip file should throw EOFException
+ val conf = new SparkConf().set("spark.files.ignoreCorruptFiles", "false")
+ sc = new SparkContext("local", "test", conf)
+ // Test HadoopRDD
+ var e = intercept[SparkException] {
+ sc.textFile(inputFile.toURI.toString).collect()
+ }
+ assert(e.getCause.isInstanceOf[EOFException])
+ assert(e.getCause.getMessage === "Unexpected end of input stream")
+ // Test NewHadoopRDD
+ e = intercept[SparkException] {
+ sc.newAPIHadoopFile(
+ inputFile.toURI.toString,
+ classOf[NewTextInputFormat],
+ classOf[LongWritable],
+ classOf[Text]).collect()
+ }
+ assert(e.getCause.isInstanceOf[EOFException])
+ assert(e.getCause.getMessage === "Unexpected end of input stream")
+ } finally {
+ inputFile.delete()
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org