You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2016/10/12 20:52:53 UTC
spark git commit: [SPARK-17850][CORE] Add a flag to ignore corrupt
files
Repository: spark
Updated Branches:
refs/heads/master eb69335cd -> 47776e7c0
[SPARK-17850][CORE] Add a flag to ignore corrupt files
## What changes were proposed in this pull request?
Add a flag to ignore corrupt files. For Spark core, the configuration is `spark.files.ignoreCorruptFiles`. For Spark SQL, it's `spark.sql.files.ignoreCorruptFiles`.
## How was this patch tested?
The added unit tests
Author: Shixiong Zhu <sh...@databricks.com>
Closes #15422 from zsxwing/SPARK-17850.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47776e7c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47776e7c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47776e7c
Branch: refs/heads/master
Commit: 47776e7c0c68590fe446cef910900b1aaead06f9
Parents: eb69335
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Oct 12 13:51:53 2016 -0700
Committer: Mridul Muralidharan <mm...@HW11853.local>
Committed: Wed Oct 12 13:51:53 2016 -0700
----------------------------------------------------------------------
.../apache/spark/internal/config/package.scala | 5 ++
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 8 ++-
.../org/apache/spark/rdd/NewHadoopRDD.scala | 10 +++-
.../test/scala/org/apache/spark/FileSuite.scala | 62 +++++++++++++++++++-
.../sql/execution/datasources/FileScanRDD.scala | 30 +++++++++-
.../org/apache/spark/sql/internal/SQLConf.scala | 8 +++
.../datasources/FileSourceStrategySuite.scala | 37 +++++++++++-
7 files changed, 153 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 5a71015..517fc3e 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -156,4 +156,9 @@ package object config {
.doc("Port to use for the block managed on the driver.")
.fallbackConf(BLOCK_MANAGER_PORT)
+ private[spark] val IGNORE_CORRUPT_FILES = ConfigBuilder("spark.files.ignoreCorruptFiles")
+ .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
+ "encountering corrupt files and contents that have been read will still be returned.")
+ .booleanConf
+ .createWithDefault(false)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 4640b5d..e1cf393 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import java.io.EOFException
+import java.io.IOException
import java.text.SimpleDateFormat
import java.util.Date
@@ -43,6 +43,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.scheduler.{HDFSCacheTaskLocation, HostTaskLocation}
import org.apache.spark.storage.StorageLevel
@@ -139,6 +140,8 @@ class HadoopRDD[K, V](
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
+ private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
+
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value
@@ -253,8 +256,7 @@ class HadoopRDD[K, V](
try {
finished = !reader.next(key, value)
} catch {
- case eof: EOFException =>
- finished = true
+ case e: IOException if ignoreCorruptFiles => finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 1c7aec9..baf31fb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -17,6 +17,7 @@
package org.apache.spark.rdd
+import java.io.IOException
import java.text.SimpleDateFormat
import java.util.Date
@@ -33,6 +34,7 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
@@ -85,6 +87,8 @@ class NewHadoopRDD[K, V](
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
+ private val ignoreCorruptFiles = sparkContext.conf.get(IGNORE_CORRUPT_FILES)
+
def getConf: Configuration = {
val conf: Configuration = confBroadcast.value.value
if (shouldCloneJobConf) {
@@ -179,7 +183,11 @@ class NewHadoopRDD[K, V](
override def hasNext: Boolean = {
if (!finished && !havePair) {
- finished = !reader.nextKeyValue
+ try {
+ finished = !reader.nextKeyValue
+ } catch {
+ case e: IOException if ignoreCorruptFiles => finished = true
+ }
if (finished) {
// Close and release the reader here; close() will also be called when the task
// completes, but for tasks that read from many files, it helps to release the
http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 993834f..cc52bb1 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -17,7 +17,8 @@
package org.apache.spark
-import java.io.{File, FileWriter}
+import java.io._
+import java.util.zip.GZIPOutputStream
import scala.io.Source
@@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.apache.spark.input.PortableDataStream
+import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
@@ -541,4 +543,62 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
}.collect()
assert(inputPaths.toSet === Set(s"$outDir/part-00000", s"$outDir/part-00001"))
}
+
+ test("spark.files.ignoreCorruptFiles should work both HadoopRDD and NewHadoopRDD") {
+ val inputFile = File.createTempFile("input-", ".gz")
+ try {
+ // Create a corrupt gzip file
+ val byteOutput = new ByteArrayOutputStream()
+ val gzip = new GZIPOutputStream(byteOutput)
+ try {
+ gzip.write(Array[Byte](1, 2, 3, 4))
+ } finally {
+ gzip.close()
+ }
+ val bytes = byteOutput.toByteArray
+ val o = new FileOutputStream(inputFile)
+ try {
+ // It's corrupt since we only write half of bytes into the file.
+ o.write(bytes.take(bytes.length / 2))
+ } finally {
+ o.close()
+ }
+
+ // Reading a corrupt gzip file should throw EOFException
+ sc = new SparkContext("local", "test")
+ // Test HadoopRDD
+ var e = intercept[SparkException] {
+ sc.textFile(inputFile.toURI.toString).collect()
+ }
+ assert(e.getCause.isInstanceOf[EOFException])
+ assert(e.getCause.getMessage === "Unexpected end of input stream")
+ // Test NewHadoopRDD
+ e = intercept[SparkException] {
+ sc.newAPIHadoopFile(
+ inputFile.toURI.toString,
+ classOf[NewTextInputFormat],
+ classOf[LongWritable],
+ classOf[Text]).collect()
+ }
+ assert(e.getCause.isInstanceOf[EOFException])
+ assert(e.getCause.getMessage === "Unexpected end of input stream")
+ sc.stop()
+
+ val conf = new SparkConf().set(IGNORE_CORRUPT_FILES, true)
+ sc = new SparkContext("local", "test", conf)
+ // Test HadoopRDD
+ assert(sc.textFile(inputFile.toURI.toString).collect().isEmpty)
+ // Test NewHadoopRDD
+ assert {
+ sc.newAPIHadoopFile(
+ inputFile.toURI.toString,
+ classOf[NewTextInputFormat],
+ classOf[LongWritable],
+ classOf[Text]).collect().isEmpty
+ }
+ } finally {
+ inputFile.delete()
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index c66da3a..8994457 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.datasources
+import java.io.IOException
+
import scala.collection.mutable
import org.apache.spark.{Partition => RDDPartition, TaskContext}
@@ -25,6 +27,7 @@ import org.apache.spark.rdd.{InputFileNameHolder, RDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.vectorized.ColumnarBatch
+import org.apache.spark.util.NextIterator
/**
* A part (i.e. "block") of a single file that should be read, along with partition column values
@@ -62,6 +65,8 @@ class FileScanRDD(
@transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
+ private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+
override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
private val inputMetrics = context.taskMetrics().inputMetrics
@@ -119,7 +124,30 @@ class FileScanRDD(
InputFileNameHolder.setInputFileName(currentFile.filePath)
try {
- currentIterator = readFunction(currentFile)
+ if (ignoreCorruptFiles) {
+ currentIterator = new NextIterator[Object] {
+ private val internalIter = readFunction(currentFile)
+
+ override def getNext(): AnyRef = {
+ try {
+ if (internalIter.hasNext) {
+ internalIter.next()
+ } else {
+ finished = true
+ null
+ }
+ } catch {
+ case e: IOException =>
+ finished = true
+ null
+ }
+ }
+
+ override def close(): Unit = {}
+ }
+ } else {
+ currentIterator = readFunction(currentFile)
+ }
} catch {
case e: java.io.FileNotFoundException =>
throw new java.io.FileNotFoundException(
http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 8cbfc4c..9e7c1ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -576,6 +576,12 @@ object SQLConf {
.doubleConf
.createWithDefault(0.05)
+ val IGNORE_CORRUPT_FILES = SQLConfigBuilder("spark.sql.files.ignoreCorruptFiles")
+ .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
+ "encountering corrupt files and contents that have been read will still be returned.")
+ .booleanConf
+ .createWithDefault(false)
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
@@ -743,6 +749,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def warehousePath: String = new Path(getConf(WAREHOUSE_PATH)).toString
+ def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
+
override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
http://git-wip-us.apache.org/repos/asf/spark/blob/47776e7c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 45411fa..c5deb31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.execution.datasources
-import java.io.File
+import java.io._
import java.util.concurrent.atomic.AtomicInteger
+import java.util.zip.GZIPOutputStream
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, Path, RawLocalFileSystem}
@@ -441,6 +442,40 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
}
+ test("spark.files.ignoreCorruptFiles should work in SQL") {
+ val inputFile = File.createTempFile("input-", ".gz")
+ try {
+ // Create a corrupt gzip file
+ val byteOutput = new ByteArrayOutputStream()
+ val gzip = new GZIPOutputStream(byteOutput)
+ try {
+ gzip.write(Array[Byte](1, 2, 3, 4))
+ } finally {
+ gzip.close()
+ }
+ val bytes = byteOutput.toByteArray
+ val o = new FileOutputStream(inputFile)
+ try {
+ // It's corrupt since we only write half of bytes into the file.
+ o.write(bytes.take(bytes.length / 2))
+ } finally {
+ o.close()
+ }
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+ val e = intercept[SparkException] {
+ spark.read.text(inputFile.toURI.toString).collect()
+ }
+ assert(e.getCause.isInstanceOf[EOFException])
+ assert(e.getCause.getMessage === "Unexpected end of input stream")
+ }
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+ assert(spark.read.text(inputFile.toURI.toString).collect().isEmpty)
+ }
+ } finally {
+ inputFile.delete()
+ }
+ }
+
// Helpers for checking the arguments passed to the FileFormat.
protected val checkPartitionSchema =
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org