You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/15 04:07:06 UTC
spark git commit: [SPARK-23049][SQL]
`spark.sql.files.ignoreCorruptFiles` should work for ORC files
Repository: spark
Updated Branches:
refs/heads/master b98ffa4d6 -> 9a96bfc8b
[SPARK-23049][SQL] `spark.sql.files.ignoreCorruptFiles` should work for ORC files
## What changes were proposed in this pull request?
When `spark.sql.files.ignoreCorruptFiles=true`, we should ignore corrupted ORC files.
## How was this patch tested?
Pass the Jenkins with a newly added test case.
Author: Dongjoon Hyun <do...@apache.org>
Closes #20240 from dongjoon-hyun/SPARK-23049.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a96bfc8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a96bfc8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a96bfc8
Branch: refs/heads/master
Commit: 9a96bfc8bf021cb4b6c62fac6ce1bcf87affcd43
Parents: b98ffa4
Author: Dongjoon Hyun <do...@apache.org>
Authored: Mon Jan 15 12:06:56 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Jan 15 12:06:56 2018 +0800
----------------------------------------------------------------------
.../execution/datasources/orc/OrcUtils.scala | 29 ++++++++----
.../datasources/orc/OrcQuerySuite.scala | 47 ++++++++++++++++++++
.../datasources/parquet/ParquetQuerySuite.scala | 23 ++++++++--
.../spark/sql/hive/orc/OrcFileFormat.scala | 8 +++-
.../spark/sql/hive/orc/OrcFileOperator.scala | 28 +++++++++---
5 files changed, 117 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9a96bfc8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index 13a2399..460194b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.orc.{OrcFile, Reader, TypeDescription}
+import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
@@ -50,23 +51,35 @@ object OrcUtils extends Logging {
paths
}
- def readSchema(file: Path, conf: Configuration): Option[TypeDescription] = {
+ def readSchema(file: Path, conf: Configuration, ignoreCorruptFiles: Boolean)
+ : Option[TypeDescription] = {
val fs = file.getFileSystem(conf)
val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
- val reader = OrcFile.createReader(file, readerOptions)
- val schema = reader.getSchema
- if (schema.getFieldNames.size == 0) {
- None
- } else {
- Some(schema)
+ try {
+ val reader = OrcFile.createReader(file, readerOptions)
+ val schema = reader.getSchema
+ if (schema.getFieldNames.size == 0) {
+ None
+ } else {
+ Some(schema)
+ }
+ } catch {
+ case e: org.apache.orc.FileFormatException =>
+ if (ignoreCorruptFiles) {
+ logWarning(s"Skipped the footer in the corrupted file: $file", e)
+ None
+ } else {
+ throw new SparkException(s"Could not read footer for file: $file", e)
+ }
}
}
def readSchema(sparkSession: SparkSession, files: Seq[FileStatus])
: Option[StructType] = {
+ val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
val conf = sparkSession.sessionState.newHadoopConf()
// TODO: We need to support merge schema. Please see SPARK-11412.
- files.map(_.getPath).flatMap(readSchema(_, conf)).headOption.map { schema =>
+ files.map(_.getPath).flatMap(readSchema(_, conf, ignoreCorruptFiles)).headOption.map { schema =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/9a96bfc8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index e00e057..f58c331 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -31,6 +31,7 @@ import org.apache.orc.OrcConf.COMPRESS
import org.apache.orc.mapred.OrcStruct
import org.apache.orc.mapreduce.OrcInputFormat
+import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator}
@@ -531,6 +532,52 @@ abstract class OrcQueryTest extends OrcTest {
val df = spark.read.orc(path1.getCanonicalPath, path2.getCanonicalPath)
assert(df.count() == 20)
}
+
+ test("Enabling/disabling ignoreCorruptFiles") {
+ def testIgnoreCorruptFiles(): Unit = {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+ spark.range(1).toDF("a").write.orc(new Path(basePath, "first").toString)
+ spark.range(1, 2).toDF("a").write.orc(new Path(basePath, "second").toString)
+ spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+ val df = spark.read.orc(
+ new Path(basePath, "first").toString,
+ new Path(basePath, "second").toString,
+ new Path(basePath, "third").toString)
+ checkAnswer(df, Seq(Row(0), Row(1)))
+ }
+ }
+
+ def testIgnoreCorruptFilesWithoutSchemaInfer(): Unit = {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+ spark.range(1).toDF("a").write.orc(new Path(basePath, "first").toString)
+ spark.range(1, 2).toDF("a").write.orc(new Path(basePath, "second").toString)
+ spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+ val df = spark.read.schema("a long").orc(
+ new Path(basePath, "first").toString,
+ new Path(basePath, "second").toString,
+ new Path(basePath, "third").toString)
+ checkAnswer(df, Seq(Row(0), Row(1)))
+ }
+ }
+
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+ testIgnoreCorruptFiles()
+ testIgnoreCorruptFilesWithoutSchemaInfer()
+ }
+
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+ val m1 = intercept[SparkException] {
+ testIgnoreCorruptFiles()
+ }.getMessage
+ assert(m1.contains("Could not read footer for file"))
+ val m2 = intercept[SparkException] {
+ testIgnoreCorruptFilesWithoutSchemaInfer()
+ }.getMessage
+ assert(m2.contains("Malformed ORC file"))
+ }
+ }
}
class OrcQuerySuite extends OrcQueryTest with SharedSQLContext {
http://git-wip-us.apache.org/repos/asf/spark/blob/9a96bfc8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 4c8c9ef..6ad88ed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -320,14 +320,27 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)
- checkAnswer(
- df,
- Seq(Row(0), Row(1)))
+ checkAnswer(df, Seq(Row(0), Row(1)))
+ }
+ }
+
+ def testIgnoreCorruptFilesWithoutSchemaInfer(): Unit = {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+ spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
+ spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
+ spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+ val df = spark.read.schema("a long").parquet(
+ new Path(basePath, "first").toString,
+ new Path(basePath, "second").toString,
+ new Path(basePath, "third").toString)
+ checkAnswer(df, Seq(Row(0), Row(1)))
}
}
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
testIgnoreCorruptFiles()
+ testIgnoreCorruptFilesWithoutSchemaInfer()
}
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
@@ -335,6 +348,10 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
testIgnoreCorruptFiles()
}
assert(exception.getMessage().contains("is not a Parquet file"))
+ val exception2 = intercept[SparkException] {
+ testIgnoreCorruptFilesWithoutSchemaInfer()
+ }
+ assert(exception2.getMessage().contains("is not a Parquet file"))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/9a96bfc8/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 95741c7..237ed9b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -59,9 +59,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
+ val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
OrcFileOperator.readSchema(
files.map(_.getPath.toString),
- Some(sparkSession.sessionState.newHadoopConf())
+ Some(sparkSession.sessionState.newHadoopConf()),
+ ignoreCorruptFiles
)
}
@@ -129,6 +131,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+ val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
(file: PartitionedFile) => {
val conf = broadcastedHadoopConf.value.value
@@ -138,7 +141,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
// SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
// case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file
// using the given physical schema. Instead, we simply return an empty iterator.
- val isEmptyFile = OrcFileOperator.readSchema(Seq(filePath.toString), Some(conf)).isEmpty
+ val isEmptyFile =
+ OrcFileOperator.readSchema(Seq(filePath.toString), Some(conf), ignoreCorruptFiles).isEmpty
if (isEmptyFile) {
Iterator.empty
} else {
http://git-wip-us.apache.org/repos/asf/spark/blob/9a96bfc8/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 5a3fcd7..80e44ca 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -17,11 +17,14 @@
package org.apache.spark.sql.hive.orc
+import java.io.IOException
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader}
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
+import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -46,7 +49,10 @@ private[hive] object OrcFileOperator extends Logging {
* create the result reader from that file. If no such file is found, it returns `None`.
* @todo Needs to consider all files when schema evolution is taken into account.
*/
- def getFileReader(basePath: String, config: Option[Configuration] = None): Option[Reader] = {
+ def getFileReader(basePath: String,
+ config: Option[Configuration] = None,
+ ignoreCorruptFiles: Boolean = false)
+ : Option[Reader] = {
def isWithNonEmptySchema(path: Path, reader: Reader): Boolean = {
reader.getObjectInspector match {
case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() == 0 =>
@@ -65,16 +71,28 @@ private[hive] object OrcFileOperator extends Logging {
}
listOrcFiles(basePath, conf).iterator.map { path =>
- path -> OrcFile.createReader(fs, path)
+ val reader = try {
+ Some(OrcFile.createReader(fs, path))
+ } catch {
+ case e: IOException =>
+ if (ignoreCorruptFiles) {
+ logWarning(s"Skipped the footer in the corrupted file: $path", e)
+ None
+ } else {
+ throw new SparkException(s"Could not read footer for file: $path", e)
+ }
+ }
+ path -> reader
}.collectFirst {
- case (path, reader) if isWithNonEmptySchema(path, reader) => reader
+ case (path, Some(reader)) if isWithNonEmptySchema(path, reader) => reader
}
}
- def readSchema(paths: Seq[String], conf: Option[Configuration]): Option[StructType] = {
+ def readSchema(paths: Seq[String], conf: Option[Configuration], ignoreCorruptFiles: Boolean)
+ : Option[StructType] = {
// Take the first file where we can open a valid reader if we can find one. Otherwise just
// return None to indicate we can't infer the schema.
- paths.flatMap(getFileReader(_, conf)).headOption.map { reader =>
+ paths.flatMap(getFileReader(_, conf, ignoreCorruptFiles)).headOption.map { reader =>
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
logDebug(s"Reading schema from file $paths, got Hive schema string: $schema")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org