You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/12/02 02:29:30 UTC

spark git commit: [SPARK-26161][SQL] Ignore empty files in load

Repository: spark
Updated Branches:
  refs/heads/master 17fdca7c1 -> 3e46e3ccd


[SPARK-26161][SQL] Ignore empty files in load

## What changes were proposed in this pull request?

In the PR, I propose filtering out all empty files inside of `FileSourceScanExec` and exclude them from file splits. It should reduce overhead of opening and reading files without any data, and as consequence datasources will not produce empty partitions for such files.

## How was this patch tested?

Added a test which creates an empty and non-empty files. If empty files are ignored in load, Text datasource in the `wholetext` mode must create only one partition for non-empty file.

Closes #23130 from MaxGekk/ignore-empty-files.

Authored-by: Maxim Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e46e3cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e46e3cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e46e3cc

Branch: refs/heads/master
Commit: 3e46e3ccd58d0a2d445dff58a52ab1966ce133e8
Parents: 17fdca7
Author: Maxim Gekk <ma...@gmail.com>
Authored: Sun Dec 2 10:29:25 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sun Dec 2 10:29:25 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/execution/DataSourceScanExec.scala       |  4 ++--
 .../sql/execution/datasources/json/JsonSuite.scala     |  3 +--
 .../org/apache/spark/sql/sources/SaveLoadSuite.scala   | 13 +++++++++++++
 3 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3e46e3cc/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 4faa27c..b29d5c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -368,7 +368,7 @@ case class FileSourceScanExec(
     logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
     val filesGroupedToBuckets =
       selectedPartitions.flatMap { p =>
-        p.files.map { f =>
+        p.files.filter(_.getLen > 0).map { f =>
           val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
           PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
         }
@@ -418,7 +418,7 @@ case class FileSourceScanExec(
       s"open cost is considered as scanning $openCostInBytes bytes.")
 
     val splitFiles = selectedPartitions.flatMap { partition =>
-      partition.files.flatMap { file =>
+      partition.files.filter(_.getLen > 0).flatMap { file =>
         val blockLocations = getBlockLocations(file)
         if (fsRelation.fileFormat.isSplitable(
             fsRelation.sparkSession, fsRelation.options, file.getPath)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3e46e3cc/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index ee5176e..9d23161 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1842,7 +1842,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       val path = dir.getCanonicalPath
       primitiveFieldAndType
         .toDF("value")
-        .repartition(1)
         .write
         .text(path)
 
@@ -1910,7 +1909,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
           F.count($"dummy").as("valid"),
           F.count($"_corrupt_record").as("corrupt"),
           F.count("*").as("count"))
-      checkAnswer(counts, Row(1, 4, 6)) // null row for empty file
+      checkAnswer(counts, Row(1, 4, 6))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3e46e3cc/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
index 12779b4..048e4b8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.sources
 
 import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
 
 import org.scalatest.BeforeAndAfter
 
@@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA
       assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog"))
     }
   }
+
+  test("skip empty files in non bucketed read") {
+    withTempDir { dir =>
+      val path = dir.getCanonicalPath
+      Files.write(Paths.get(path, "empty"), Array.empty[Byte])
+      Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8))
+      val readback = spark.read.option("wholetext", true).text(path)
+
+      assert(readback.rdd.getNumPartitions === 1)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org