You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/16 21:35:47 UTC

spark git commit: [SPARK-24805][SQL] Do not ignore avro files without extensions by default

Repository: spark
Updated Branches:
  refs/heads/master b0c95a1d6 -> ba437fc5c


[SPARK-24805][SQL] Do not ignore avro files without extensions by default

## What changes were proposed in this pull request?

In the PR, I propose to change default behaviour of AVRO datasource which currently ignores files without `.avro` extension in read by default. This PR sets the default value for `avro.mapred.ignore.inputs.without.extension` to `false` in the case if the parameter is not set by an user.

## How was this patch tested?

Added a test file without extension in AVRO format, and new test for reading the file with and wihout specified schema.

Author: Maxim Gekk <ma...@databricks.com>
Author: Maxim Gekk <ma...@gmail.com>

Closes #21769 from MaxGekk/avro-without-extension.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba437fc5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba437fc5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba437fc5

Branch: refs/heads/master
Commit: ba437fc5c73b95ee4c59327abf3161c58f64cb12
Parents: b0c95a1
Author: Maxim Gekk <ma...@databricks.com>
Authored: Mon Jul 16 14:35:44 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Mon Jul 16 14:35:44 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/avro/AvroFileFormat.scala  | 14 +++---
 .../org/apache/spark/sql/avro/AvroSuite.scala   | 45 +++++++++++++++++---
 2 files changed, 47 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba437fc5/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index fb93033..9eb2064 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -62,7 +62,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
     // Schema evolution is not supported yet. Here we only pick a single random sample file to
     // figure out the schema of the whole dataset.
     val sampleFile =
-      if (conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true)) {
+      if (AvroFileFormat.ignoreFilesWithoutExtensions(conf)) {
         files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
           throw new FileNotFoundException(
             "No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " +
@@ -170,10 +170,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
       // Doing input file filtering is improper because we may generate empty tasks that process no
       // input files but stress the scheduler. We should probably add a more general input file
       // filtering mechanism for `FileFormat` data sources. See SPARK-16317.
-      if (
-        conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true) &&
-        !file.filePath.endsWith(".avro")
-      ) {
+      if (AvroFileFormat.ignoreFilesWithoutExtensions(conf) && !file.filePath.endsWith(".avro")) {
         Iterator.empty
       } else {
         val reader = {
@@ -278,4 +275,11 @@ private[avro] object AvroFileFormat {
       value.readFields(new DataInputStream(in))
     }
   }
+
+  def ignoreFilesWithoutExtensions(conf: Configuration): Boolean = {
+    // Files without .avro extensions are not ignored by default
+    val defaultValue = false
+
+    conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, defaultValue)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba437fc5/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 9c6526b..446b421 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.avro
 
 import java.io._
-import java.nio.file.Files
+import java.net.URL
+import java.nio.file.{Files, Path, Paths}
 import java.sql.{Date, Timestamp}
 import java.util.{TimeZone, UUID}
 
@@ -622,7 +623,12 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
     intercept[FileNotFoundException] {
       withTempPath { dir =>
         FileUtils.touch(new File(dir, "test"))
-        spark.read.avro(dir.toString)
+        val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
+        try {
+          hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
+          spark.read.avro(dir.toString)
+        } finally {
+          hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)        }
       }
     }
 
@@ -684,12 +690,18 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
 
       Files.createFile(new File(tempSaveDir, "non-avro").toPath)
 
-      val newDf = spark
-        .read
-        .option(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
-        .avro(tempSaveDir)
+      val hadoopConf = spark.sqlContext.sparkContext.hadoopConfiguration
+      val count = try {
+        hadoopConf.set(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
+        val newDf = spark
+          .read
+          .avro(tempSaveDir)
+        newDf.count()
+      } finally {
+        hadoopConf.unset(AvroFileFormat.IgnoreFilesWithoutExtensionProperty)
+      }
 
-      assert(newDf.count == 8)
+      assert(count == 8)
     }
   }
 
@@ -805,4 +817,23 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       assert(readDf.collect().sameElements(writeDf.collect()))
     }
   }
+
+  test("SPARK-24805: do not ignore files without .avro extension by default") {
+    withTempDir { dir =>
+      Files.copy(
+        Paths.get(new URL(episodesAvro).toURI),
+        Paths.get(dir.getCanonicalPath, "episodes"))
+
+      val fileWithoutExtension = s"${dir.getCanonicalPath}/episodes"
+      val df1 = spark.read.avro(fileWithoutExtension)
+      assert(df1.count == 8)
+
+      val schema = new StructType()
+        .add("title", StringType)
+        .add("air_date", StringType)
+        .add("doctor", IntegerType)
+      val df2 = spark.read.schema(schema).avro(fileWithoutExtension)
+      assert(df2.count == 8)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org