You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/04/27 03:52:53 UTC

[spark] branch branch-2.4 updated: [SPARK-25595][2.4] Ignore corrupt Avro files if flag IGNORE_CORRUPT_FILES enabled

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 218c114  [SPARK-25595][2.4] Ignore corrupt Avro files if flag IGNORE_CORRUPT_FILES enabled
218c114 is described below

commit 218c11427796bfb97ce0eb324639cdd311739e21
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Mon Apr 27 12:51:25 2020 +0900

    [SPARK-25595][2.4] Ignore corrupt Avro files if flag IGNORE_CORRUPT_FILES enabled
    
    ## What changes were proposed in this pull request?
    
    With flag `IGNORE_CORRUPT_FILES` enabled, schema inference should ignore corrupt Avro files, which is consistent with Parquet and Orc data source.
    
    ## How was this patch tested?
    
    Unit test
    
    Closes #28334 from gengliangwang/SPARK-25595-2.4.
    
    Authored-by: Gengliang Wang <ge...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../org/apache/spark/sql/avro/AvroFileFormat.scala | 78 ++++++++++++++--------
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 43 ++++++++++++
 2 files changed, 93 insertions(+), 28 deletions(-)

diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 6df23c9..e60fa88 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -32,14 +32,14 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce.Job
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
 import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 private[avro] class AvroFileFormat extends FileFormat
   with DataSourceRegister with Logging with Serializable {
@@ -59,36 +59,13 @@ private[avro] class AvroFileFormat extends FileFormat
     val conf = spark.sessionState.newHadoopConf()
     val parsedOptions = new AvroOptions(options, conf)
 
-    // Schema evolution is not supported yet. Here we only pick a single random sample file to
-    // figure out the schema of the whole dataset.
-    val sampleFile =
-      if (parsedOptions.ignoreExtension) {
-        files.headOption.getOrElse {
-          throw new FileNotFoundException("Files for schema inferring have been not found.")
-        }
-      } else {
-        files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
-          throw new FileNotFoundException(
-            "No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
-        }
-      }
-
     // User can specify an optional avro json schema.
     val avroSchema = parsedOptions.schema
       .map(new Schema.Parser().parse)
       .getOrElse {
-        val in = new FsInput(sampleFile.getPath, conf)
-        try {
-          val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
-          try {
-            reader.getSchema
-          } finally {
-            reader.close()
-          }
-        } finally {
-          in.close()
-        }
-      }
+        inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension,
+          spark.sessionState.conf.ignoreCorruptFiles)
+    }
 
     SchemaConverters.toSqlType(avroSchema).dataType match {
       case t: StructType => Some(t)
@@ -100,6 +77,51 @@ private[avro] class AvroFileFormat extends FileFormat
     }
   }
 
+  private def inferAvroSchemaFromFiles(
+      files: Seq[FileStatus],
+      conf: Configuration,
+      ignoreExtension: Boolean,
+      ignoreCorruptFiles: Boolean): Schema = {
+    // Schema evolution is not supported yet. Here we only pick first random readable sample file to
+    // figure out the schema of the whole dataset.
+    val avroReader = files.iterator.map { f =>
+      val path = f.getPath
+      if (!ignoreExtension && !path.getName.endsWith(".avro")) {
+        None
+      } else {
+        Utils.tryWithResource {
+          new FsInput(path, conf)
+        } { in =>
+          try {
+            Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()))
+          } catch {
+            case e: IOException =>
+              if (ignoreCorruptFiles) {
+                logWarning(s"Skipped the footer in the corrupted file: $path", e)
+                None
+              } else {
+                throw new SparkException(s"Could not read file: $path", e)
+              }
+          }
+        }
+      }
+    }.collectFirst {
+      case Some(reader) => reader
+    }
+
+    avroReader match {
+      case Some(reader) =>
+        try {
+          reader.getSchema
+        } finally {
+          reader.close()
+        }
+      case None =>
+        throw new FileNotFoundException(
+          "No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
+    }
+  }
+
   override def shortName(): String = "avro"
 
   override def isSplitable(
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index d7317fc..12ef883 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
   import testImplicits._
@@ -368,6 +369,48 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
     }
   }
 
+  private def createDummyCorruptFile(dir: File): Unit = {
+    Utils.tryWithResource {
+      FileUtils.forceMkdir(dir)
+      val corruptFile = new File(dir, "corrupt.avro")
+      new BufferedWriter(new FileWriter(corruptFile))
+    } { writer =>
+      writer.write("corrupt")
+    }
+  }
+
+  test("Ignore corrupt Avro file if flag IGNORE_CORRUPT_FILES enabled") {
+    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+      withTempPath { dir =>
+        createDummyCorruptFile(dir)
+        val message = intercept[FileNotFoundException] {
+          spark.read.format("avro").load(dir.getAbsolutePath).schema
+        }.getMessage
+        assert(message.contains("No Avro files found."))
+
+        val srcFile = new File("src/test/resources/episodes.avro")
+        val destFile = new File(dir, "episodes.avro")
+        FileUtils.copyFile(srcFile, destFile)
+
+        val result = spark.read.format("avro").load(srcFile.getAbsolutePath).collect()
+        checkAnswer(spark.read.format("avro").load(dir.getAbsolutePath), result)
+      }
+    }
+  }
+
+  test("Throws IOException on reading corrupt Avro file if flag IGNORE_CORRUPT_FILES disabled") {
+    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+      withTempPath { dir =>
+        createDummyCorruptFile(dir)
+        val message = intercept[org.apache.spark.SparkException] {
+          spark.read.format("avro").load(dir.getAbsolutePath)
+        }.getMessage
+
+        assert(message.contains("Could not read file"))
+      }
+    }
+  }
+
   test("Date field type") {
     withTempPath { dir =>
       val schema = StructType(Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org