You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/04/27 03:52:53 UTC
[spark] branch branch-2.4 updated: [SPARK-25595][2.4] Ignore
corrupt Avro files if flag IGNORE_CORRUPT_FILES enabled
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 218c114 [SPARK-25595][2.4] Ignore corrupt Avro files if flag IGNORE_CORRUPT_FILES enabled
218c114 is described below
commit 218c11427796bfb97ce0eb324639cdd311739e21
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Mon Apr 27 12:51:25 2020 +0900
[SPARK-25595][2.4] Ignore corrupt Avro files if flag IGNORE_CORRUPT_FILES enabled
## What changes were proposed in this pull request?
With flag `IGNORE_CORRUPT_FILES` enabled, schema inference should ignore corrupt Avro files, which is consistent with Parquet and Orc data source.
## How was this patch tested?
Unit test
Closes #28334 from gengliangwang/SPARK-25595-2.4.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../org/apache/spark/sql/avro/AvroFileFormat.scala | 78 ++++++++++++++--------
.../org/apache/spark/sql/avro/AvroSuite.scala | 43 ++++++++++++
2 files changed, 93 insertions(+), 28 deletions(-)
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 6df23c9..e60fa88 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -32,14 +32,14 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
private[avro] class AvroFileFormat extends FileFormat
with DataSourceRegister with Logging with Serializable {
@@ -59,36 +59,13 @@ private[avro] class AvroFileFormat extends FileFormat
val conf = spark.sessionState.newHadoopConf()
val parsedOptions = new AvroOptions(options, conf)
- // Schema evolution is not supported yet. Here we only pick a single random sample file to
- // figure out the schema of the whole dataset.
- val sampleFile =
- if (parsedOptions.ignoreExtension) {
- files.headOption.getOrElse {
- throw new FileNotFoundException("Files for schema inferring have been not found.")
- }
- } else {
- files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
- throw new FileNotFoundException(
- "No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
- }
- }
-
// User can specify an optional avro json schema.
val avroSchema = parsedOptions.schema
.map(new Schema.Parser().parse)
.getOrElse {
- val in = new FsInput(sampleFile.getPath, conf)
- try {
- val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
- try {
- reader.getSchema
- } finally {
- reader.close()
- }
- } finally {
- in.close()
- }
- }
+ inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension,
+ spark.sessionState.conf.ignoreCorruptFiles)
+ }
SchemaConverters.toSqlType(avroSchema).dataType match {
case t: StructType => Some(t)
@@ -100,6 +77,51 @@ private[avro] class AvroFileFormat extends FileFormat
}
}
+ private def inferAvroSchemaFromFiles(
+ files: Seq[FileStatus],
+ conf: Configuration,
+ ignoreExtension: Boolean,
+ ignoreCorruptFiles: Boolean): Schema = {
+ // Schema evolution is not supported yet. Here we only pick first random readable sample file to
+ // figure out the schema of the whole dataset.
+ val avroReader = files.iterator.map { f =>
+ val path = f.getPath
+ if (!ignoreExtension && !path.getName.endsWith(".avro")) {
+ None
+ } else {
+ Utils.tryWithResource {
+ new FsInput(path, conf)
+ } { in =>
+ try {
+ Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()))
+ } catch {
+ case e: IOException =>
+ if (ignoreCorruptFiles) {
+ logWarning(s"Skipped the footer in the corrupted file: $path", e)
+ None
+ } else {
+ throw new SparkException(s"Could not read file: $path", e)
+ }
+ }
+ }
+ }
+ }.collectFirst {
+ case Some(reader) => reader
+ }
+
+ avroReader match {
+ case Some(reader) =>
+ try {
+ reader.getSchema
+ } finally {
+ reader.close()
+ }
+ case None =>
+ throw new FileNotFoundException(
+ "No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
+ }
+ }
+
override def shortName(): String = "avro"
override def isSplitable(
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index d7317fc..12ef883 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
import testImplicits._
@@ -368,6 +369,48 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}
+ private def createDummyCorruptFile(dir: File): Unit = {
+ Utils.tryWithResource {
+ FileUtils.forceMkdir(dir)
+ val corruptFile = new File(dir, "corrupt.avro")
+ new BufferedWriter(new FileWriter(corruptFile))
+ } { writer =>
+ writer.write("corrupt")
+ }
+ }
+
+ test("Ignore corrupt Avro file if flag IGNORE_CORRUPT_FILES enabled") {
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+ withTempPath { dir =>
+ createDummyCorruptFile(dir)
+ val message = intercept[FileNotFoundException] {
+ spark.read.format("avro").load(dir.getAbsolutePath).schema
+ }.getMessage
+ assert(message.contains("No Avro files found."))
+
+ val srcFile = new File("src/test/resources/episodes.avro")
+ val destFile = new File(dir, "episodes.avro")
+ FileUtils.copyFile(srcFile, destFile)
+
+ val result = spark.read.format("avro").load(srcFile.getAbsolutePath).collect()
+ checkAnswer(spark.read.format("avro").load(dir.getAbsolutePath), result)
+ }
+ }
+ }
+
+ test("Throws IOException on reading corrupt Avro file if flag IGNORE_CORRUPT_FILES disabled") {
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+ withTempPath { dir =>
+ createDummyCorruptFile(dir)
+ val message = intercept[org.apache.spark.SparkException] {
+ spark.read.format("avro").load(dir.getAbsolutePath)
+ }.getMessage
+
+ assert(message.contains("Could not read file"))
+ }
+ }
+ }
+
test("Date field type") {
withTempPath { dir =>
val schema = StructType(Seq(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org