You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/27 17:09:40 UTC

[spark] branch master updated: [SPARK-27291][SQL] PartitioningAwareFileIndex: Filter out empty files on listing files

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 49b0411  [SPARK-27291][SQL] PartitioningAwareFileIndex: Filter out empty files on listing files
49b0411 is described below

commit 49b0411549dad82d0ae8daf93a9ae2624f206791
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Wed Mar 27 10:08:38 2019 -0700

    [SPARK-27291][SQL] PartitioningAwareFileIndex: Filter out empty files on listing files
    
    ## What changes were proposed in this pull request?
    
    In https://github.com/apache/spark/pull/23130, all empty files are excluded from target file splits in `FileSourceScanExec`.
    In File source V2, we should keep the same behavior.
    
    This PR suggests to filter out empty files on listing files in `PartitioningAwareFileIndex` so that the upper level doesn't need to handle them.
    ## How was this patch tested?
    
    Unit test
    
    Closes #24227 from gengliangwang/ignoreEmptyFile.
    
    Authored-by: Gengliang Wang <ge...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/execution/DataSourceScanExec.scala  |  4 ++--
 .../datasources/PartitioningAwareFileIndex.scala         |  7 +++++--
 .../org/apache/spark/sql/sources/SaveLoadSuite.scala     | 16 +++++++++-------
 3 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 92f7d66..33adfce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -382,7 +382,7 @@ case class FileSourceScanExec(
     logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
     val filesGroupedToBuckets =
       selectedPartitions.flatMap { p =>
-        p.files.filter(_.getLen > 0).map { f =>
+        p.files.map { f =>
           PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
         }
       }.groupBy { f =>
@@ -426,7 +426,7 @@ case class FileSourceScanExec(
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
     val splitFiles = selectedPartitions.flatMap { partition =>
-      partition.files.filter(_.getLen > 0).flatMap { file =>
+      partition.files.flatMap { file =>
         // getPath() is very expensive so we only want to call it once in this block:
         val filePath = file.getPath
         val isSplitable = relation.fileFormat.isSplitable(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index f5ae095..29b304a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -58,15 +58,18 @@ abstract class PartitioningAwareFileIndex(
 
   override def listFiles(
       partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+    def isNonEmptyFile(f: FileStatus): Boolean = {
+      isDataPath(f.getPath) && f.getLen > 0
+    }
     val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
-      PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
+      PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil
     } else {
       prunePartitions(partitionFilters, partitionSpec()).map {
         case PartitionPath(values, path) =>
           val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
             case Some(existingDir) =>
               // Directory has children files in it, return them
-              existingDir.filter(f => isDataPath(f.getPath))
+              existingDir.filter(isNonEmptyFile)
 
             case None =>
               // Directory does not exist, or has no children files
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
index 048e4b8..7680f61 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -146,13 +146,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA
   }
 
   test("skip empty files in non bucketed read") {
-    withTempDir { dir =>
-      val path = dir.getCanonicalPath
-      Files.write(Paths.get(path, "empty"), Array.empty[Byte])
-      Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8))
-      val readback = spark.read.option("wholetext", true).text(path)
-
-      assert(readback.rdd.getNumPartitions === 1)
+    Seq("csv", "text").foreach { format =>
+      withTempDir { dir =>
+        val path = dir.getCanonicalPath
+        Files.write(Paths.get(path, "empty"), Array.empty[Byte])
+        Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8))
+        val readBack = spark.read.option("wholetext", true).format(format).load(path)
+
+        assert(readBack.rdd.getNumPartitions === 1)
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org