You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/15 04:07:06 UTC

spark git commit: [SPARK-23049][SQL] `spark.sql.files.ignoreCorruptFiles` should work for ORC files

Repository: spark
Updated Branches:
  refs/heads/master b98ffa4d6 -> 9a96bfc8b


[SPARK-23049][SQL] `spark.sql.files.ignoreCorruptFiles` should work for ORC files

## What changes were proposed in this pull request?

When `spark.sql.files.ignoreCorruptFiles=true`, we should ignore corrupted ORC files.

## How was this patch tested?

Pass the Jenkins with a newly added test case.

Author: Dongjoon Hyun <do...@apache.org>

Closes #20240 from dongjoon-hyun/SPARK-23049.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a96bfc8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a96bfc8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a96bfc8

Branch: refs/heads/master
Commit: 9a96bfc8bf021cb4b6c62fac6ce1bcf87affcd43
Parents: b98ffa4
Author: Dongjoon Hyun <do...@apache.org>
Authored: Mon Jan 15 12:06:56 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Jan 15 12:06:56 2018 +0800

----------------------------------------------------------------------
 .../execution/datasources/orc/OrcUtils.scala    | 29 ++++++++----
 .../datasources/orc/OrcQuerySuite.scala         | 47 ++++++++++++++++++++
 .../datasources/parquet/ParquetQuerySuite.scala | 23 ++++++++--
 .../spark/sql/hive/orc/OrcFileFormat.scala      |  8 +++-
 .../spark/sql/hive/orc/OrcFileOperator.scala    | 28 +++++++++---
 5 files changed, 117 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9a96bfc8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index 13a2399..460194b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.orc.{OrcFile, Reader, TypeDescription}
 
+import org.apache.spark.SparkException
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
@@ -50,23 +51,35 @@ object OrcUtils extends Logging {
     paths
   }
 
-  def readSchema(file: Path, conf: Configuration): Option[TypeDescription] = {
+  def readSchema(file: Path, conf: Configuration, ignoreCorruptFiles: Boolean)
+      : Option[TypeDescription] = {
     val fs = file.getFileSystem(conf)
     val readerOptions = OrcFile.readerOptions(conf).filesystem(fs)
-    val reader = OrcFile.createReader(file, readerOptions)
-    val schema = reader.getSchema
-    if (schema.getFieldNames.size == 0) {
-      None
-    } else {
-      Some(schema)
+    try {
+      val reader = OrcFile.createReader(file, readerOptions)
+      val schema = reader.getSchema
+      if (schema.getFieldNames.size == 0) {
+        None
+      } else {
+        Some(schema)
+      }
+    } catch {
+      case e: org.apache.orc.FileFormatException =>
+        if (ignoreCorruptFiles) {
+          logWarning(s"Skipped the footer in the corrupted file: $file", e)
+          None
+        } else {
+          throw new SparkException(s"Could not read footer for file: $file", e)
+        }
     }
   }
 
   def readSchema(sparkSession: SparkSession, files: Seq[FileStatus])
       : Option[StructType] = {
+    val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
     val conf = sparkSession.sessionState.newHadoopConf()
     // TODO: We need to support merge schema. Please see SPARK-11412.
-    files.map(_.getPath).flatMap(readSchema(_, conf)).headOption.map { schema =>
+    files.map(_.getPath).flatMap(readSchema(_, conf, ignoreCorruptFiles)).headOption.map { schema =>
       logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
       CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9a96bfc8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index e00e057..f58c331 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -31,6 +31,7 @@ import org.apache.orc.OrcConf.COMPRESS
 import org.apache.orc.mapred.OrcStruct
 import org.apache.orc.mapreduce.OrcInputFormat
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator}
@@ -531,6 +532,52 @@ abstract class OrcQueryTest extends OrcTest {
     val df = spark.read.orc(path1.getCanonicalPath, path2.getCanonicalPath)
     assert(df.count() == 20)
   }
+
+  test("Enabling/disabling ignoreCorruptFiles") {
+    def testIgnoreCorruptFiles(): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        spark.range(1).toDF("a").write.orc(new Path(basePath, "first").toString)
+        spark.range(1, 2).toDF("a").write.orc(new Path(basePath, "second").toString)
+        spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+        val df = spark.read.orc(
+          new Path(basePath, "first").toString,
+          new Path(basePath, "second").toString,
+          new Path(basePath, "third").toString)
+        checkAnswer(df, Seq(Row(0), Row(1)))
+      }
+    }
+
+    def testIgnoreCorruptFilesWithoutSchemaInfer(): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        spark.range(1).toDF("a").write.orc(new Path(basePath, "first").toString)
+        spark.range(1, 2).toDF("a").write.orc(new Path(basePath, "second").toString)
+        spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+        val df = spark.read.schema("a long").orc(
+          new Path(basePath, "first").toString,
+          new Path(basePath, "second").toString,
+          new Path(basePath, "third").toString)
+        checkAnswer(df, Seq(Row(0), Row(1)))
+      }
+    }
+
+    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+      testIgnoreCorruptFiles()
+      testIgnoreCorruptFilesWithoutSchemaInfer()
+    }
+
+    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+      val m1 = intercept[SparkException] {
+        testIgnoreCorruptFiles()
+      }.getMessage
+      assert(m1.contains("Could not read footer for file"))
+      val m2 = intercept[SparkException] {
+        testIgnoreCorruptFilesWithoutSchemaInfer()
+      }.getMessage
+      assert(m2.contains("Malformed ORC file"))
+    }
+  }
 }
 
 class OrcQuerySuite extends OrcQueryTest with SharedSQLContext {

http://git-wip-us.apache.org/repos/asf/spark/blob/9a96bfc8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 4c8c9ef..6ad88ed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -320,14 +320,27 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
           new Path(basePath, "first").toString,
           new Path(basePath, "second").toString,
           new Path(basePath, "third").toString)
-        checkAnswer(
-          df,
-          Seq(Row(0), Row(1)))
+        checkAnswer(df, Seq(Row(0), Row(1)))
+      }
+    }
+
+    def testIgnoreCorruptFilesWithoutSchemaInfer(): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
+        spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
+        spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+        val df = spark.read.schema("a long").parquet(
+          new Path(basePath, "first").toString,
+          new Path(basePath, "second").toString,
+          new Path(basePath, "third").toString)
+        checkAnswer(df, Seq(Row(0), Row(1)))
       }
     }
 
     withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
       testIgnoreCorruptFiles()
+      testIgnoreCorruptFilesWithoutSchemaInfer()
     }
 
     withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
@@ -335,6 +348,10 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
         testIgnoreCorruptFiles()
       }
       assert(exception.getMessage().contains("is not a Parquet file"))
+      val exception2 = intercept[SparkException] {
+        testIgnoreCorruptFilesWithoutSchemaInfer()
+      }
+      assert(exception2.getMessage().contains("is not a Parquet file"))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9a96bfc8/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 95741c7..237ed9b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -59,9 +59,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
       sparkSession: SparkSession,
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
+    val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
     OrcFileOperator.readSchema(
       files.map(_.getPath.toString),
-      Some(sparkSession.sessionState.newHadoopConf())
+      Some(sparkSession.sessionState.newHadoopConf()),
+      ignoreCorruptFiles
     )
   }
 
@@ -129,6 +131,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
 
     val broadcastedHadoopConf =
       sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+    val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
 
     (file: PartitionedFile) => {
       val conf = broadcastedHadoopConf.value.value
@@ -138,7 +141,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
       // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
       // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file
       // using the given physical schema. Instead, we simply return an empty iterator.
-      val isEmptyFile = OrcFileOperator.readSchema(Seq(filePath.toString), Some(conf)).isEmpty
+      val isEmptyFile =
+        OrcFileOperator.readSchema(Seq(filePath.toString), Some(conf), ignoreCorruptFiles).isEmpty
       if (isEmptyFile) {
         Iterator.empty
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/9a96bfc8/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 5a3fcd7..80e44ca 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.sql.hive.orc
 
+import java.io.IOException
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader}
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
 
+import org.apache.spark.SparkException
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -46,7 +49,10 @@ private[hive] object OrcFileOperator extends Logging {
    *       create the result reader from that file.  If no such file is found, it returns `None`.
    * @todo Needs to consider all files when schema evolution is taken into account.
    */
-  def getFileReader(basePath: String, config: Option[Configuration] = None): Option[Reader] = {
+  def getFileReader(basePath: String,
+      config: Option[Configuration] = None,
+      ignoreCorruptFiles: Boolean = false)
+      : Option[Reader] = {
     def isWithNonEmptySchema(path: Path, reader: Reader): Boolean = {
       reader.getObjectInspector match {
         case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() == 0 =>
@@ -65,16 +71,28 @@ private[hive] object OrcFileOperator extends Logging {
     }
 
     listOrcFiles(basePath, conf).iterator.map { path =>
-      path -> OrcFile.createReader(fs, path)
+      val reader = try {
+        Some(OrcFile.createReader(fs, path))
+      } catch {
+        case e: IOException =>
+          if (ignoreCorruptFiles) {
+            logWarning(s"Skipped the footer in the corrupted file: $path", e)
+            None
+          } else {
+            throw new SparkException(s"Could not read footer for file: $path", e)
+          }
+      }
+      path -> reader
     }.collectFirst {
-      case (path, reader) if isWithNonEmptySchema(path, reader) => reader
+      case (path, Some(reader)) if isWithNonEmptySchema(path, reader) => reader
     }
   }
 
-  def readSchema(paths: Seq[String], conf: Option[Configuration]): Option[StructType] = {
+  def readSchema(paths: Seq[String], conf: Option[Configuration], ignoreCorruptFiles: Boolean)
+      : Option[StructType] = {
     // Take the first file where we can open a valid reader if we can find one.  Otherwise just
     // return None to indicate we can't infer the schema.
-    paths.flatMap(getFileReader(_, conf)).headOption.map { reader =>
+    paths.flatMap(getFileReader(_, conf, ignoreCorruptFiles)).headOption.map { reader =>
       val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
       val schema = readerInspector.getTypeName
       logDebug(s"Reading schema from file $paths, got Hive schema string: $schema")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org