You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/16 21:35:47 UTC
spark git commit: [SPARK-24805][SQL] Do not ignore avro files without
extensions by default
Repository: spark
Updated Branches:
refs/heads/master b0c95a1d6 -> ba437fc5c
[SPARK-24805][SQL] Do not ignore avro files without extensions by default
## What changes were proposed in this pull request?
In the PR, I propose to change default behaviour of AVRO datasource which currently ignores files without `.avro` extension in read by default. This PR sets the default value for `avro.mapred.ignore.inputs.without.extension` to `false` in the case if the parameter is not set by an user.
## How was this patch tested?
Added a test file without extension in AVRO format, and new test for reading the file with and wihout specified schema.
Author: Maxim Gekk <ma...@databricks.com>
Author: Maxim Gekk <ma...@gmail.com>
Closes #21769 from MaxGekk/avro-without-extension.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba437fc5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba437fc5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba437fc5
Branch: refs/heads/master
Commit: ba437fc5c73b95ee4c59327abf3161c58f64cb12
Parents: b0c95a1
Author: Maxim Gekk <ma...@databricks.com>
Authored: Mon Jul 16 14:35:44 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Mon Jul 16 14:35:44 2018 -0700
----------------------------------------------------------------------
.../apache/spark/sql/avro/AvroFileFormat.scala | 14 +++---
.../org/apache/spark/sql/avro/AvroSuite.scala | 45 +++++++++++++++++---
2 files changed, 47 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ba437fc5/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index fb93033..9eb2064 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -62,7 +62,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
// Schema evolution is not supported yet. Here we only pick a single random sample file to
// figure out the schema of the whole dataset.
val sampleFile =
- if (conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true)) {
+ if (AvroFileFormat.ignoreFilesWithoutExtensions(conf)) {
files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
throw new FileNotFoundException(
"No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " +
@@ -170,10 +170,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
// Doing input file filtering is improper because we may generate empty tasks that process no
// input files but stress the scheduler. We should probably add a more general input file
// filtering mechanism for `FileFormat` data sources. See SPARK-16317.
- if (
- conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true) &&
- !file.filePath.endsWith(".avro")
- ) {
+ if (AvroFileFormat.ignoreFilesWithoutExtensions(conf) && !file.filePath.endsWith(".avro")) {
Iterator.empty
} else {
val reader = {
@@ -278,4 +275,11 @@ private[avro] object AvroFileFormat {
value.readFields(new DataInputStream(in))
}
}
+
+ def ignoreFilesWithoutExtensions(conf: Configuration): Boolean = {
+ // Files without .avro extensions are not ignored by default
+ val defaultValue = false
+
+ conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, defaultValue)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ba437fc5/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 9c6526b..446b421 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -18,7 +18,8 @@
package org.apache.spark.sql.avro
import java.io._
-import java.nio.file.Files
+import java.net.URL
+import java.nio.file.{Files, Path, Paths}
import java.sql.{Date, Timestamp}
import java.util.{TimeZone, UUID}
@@ -622,7 +623,12 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
intercept[FileNotFoundException] {
withTempPath { dir =>
FileUtils.touch(new File(dir, "test"))
- spark.read.avro(dir.toString)
+ val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
+ try {
+ hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
+ spark.read.avro(dir.toString)
+ } finally {
+ hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty) }
}
}
@@ -684,12 +690,18 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
Files.createFile(new File(tempSaveDir, "non-avro").toPath)
- val newDf = spark
- .read
- .option(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
- .avro(tempSaveDir)
+ val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
+ val count = try {
+ hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
+ val newDf = spark
+ .read
+ .avro(tempSaveDir)
+ newDf.count()
+ } finally {
+ hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
+ }
- assert(newDf.count == 8)
+ assert(count == 8)
}
}
@@ -805,4 +817,23 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(readDf.collect().sameElements(writeDf.collect()))
}
}
+
+ test("SPARK-24805: do not ignore files without .avro extension by default") {
+ withTempDir { dir =>
+ Files.copy(
+ Paths.get(new URL(episodesAvro).toURI),
+ Paths.get(dir.getCanonicalPath, "episodes"))
+
+ val fileWithoutExtension = s"${dir.getCanonicalPath}/episodes"
+ val df1 = spark.read.avro(fileWithoutExtension)
+ assert(df1.count == 8)
+
+ val schema = new StructType()
+ .add("title", StringType)
+ .add("air_date", StringType)
+ .add("doctor", IntegerType)
+ val df2 = spark.read.schema(schema).avro(fileWithoutExtension)
+ assert(df2.count == 8)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org