You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/12/02 02:29:30 UTC
spark git commit: [SPARK-26161][SQL] Ignore empty files in load
Repository: spark
Updated Branches:
refs/heads/master 17fdca7c1 -> 3e46e3ccd
[SPARK-26161][SQL] Ignore empty files in load
## What changes were proposed in this pull request?
In the PR, I propose filtering out all empty files inside of `FileSourceScanExec` and exclude them from file splits. It should reduce overhead of opening and reading files without any data, and as consequence datasources will not produce empty partitions for such files.
## How was this patch tested?
Added a test which creates an empty and non-empty files. If empty files are ignored in load, Text datasource in the `wholetext` mode must create only one partition for non-empty file.
Closes #23130 from MaxGekk/ignore-empty-files.
Authored-by: Maxim Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e46e3cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e46e3cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e46e3cc
Branch: refs/heads/master
Commit: 3e46e3ccd58d0a2d445dff58a52ab1966ce133e8
Parents: 17fdca7
Author: Maxim Gekk <ma...@gmail.com>
Authored: Sun Dec 2 10:29:25 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sun Dec 2 10:29:25 2018 +0800
----------------------------------------------------------------------
.../spark/sql/execution/DataSourceScanExec.scala | 4 ++--
.../sql/execution/datasources/json/JsonSuite.scala | 3 +--
.../org/apache/spark/sql/sources/SaveLoadSuite.scala | 13 +++++++++++++
3 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3e46e3cc/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 4faa27c..b29d5c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -368,7 +368,7 @@ case class FileSourceScanExec(
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val filesGroupedToBuckets =
selectedPartitions.flatMap { p =>
- p.files.map { f =>
+ p.files.filter(_.getLen > 0).map { f =>
val hosts = getBlockHosts(getBlockLocations(f), 0, f.getLen)
PartitionedFile(p.values, f.getPath.toUri.toString, 0, f.getLen, hosts)
}
@@ -418,7 +418,7 @@ case class FileSourceScanExec(
s"open cost is considered as scanning $openCostInBytes bytes.")
val splitFiles = selectedPartitions.flatMap { partition =>
- partition.files.flatMap { file =>
+ partition.files.filter(_.getLen > 0).flatMap { file =>
val blockLocations = getBlockLocations(file)
if (fsRelation.fileFormat.isSplitable(
fsRelation.sparkSession, fsRelation.options, file.getPath)) {
http://git-wip-us.apache.org/repos/asf/spark/blob/3e46e3cc/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index ee5176e..9d23161 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1842,7 +1842,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val path = dir.getCanonicalPath
primitiveFieldAndType
.toDF("value")
- .repartition(1)
.write
.text(path)
@@ -1910,7 +1909,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
F.count($"dummy").as("valid"),
F.count($"_corrupt_record").as("corrupt"),
F.count("*").as("count"))
- checkAnswer(counts, Row(1, 4, 6)) // null row for empty file
+ checkAnswer(counts, Row(1, 4, 6))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3e46e3cc/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
index 12779b4..048e4b8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.sources
import java.io.File
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
import org.scalatest.BeforeAndAfter
@@ -142,4 +144,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA
assert(e.contains(s"Partition column `$unknown` not found in schema $schemaCatalog"))
}
}
+
+ test("skip empty files in non bucketed read") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+ Files.write(Paths.get(path, "empty"), Array.empty[Byte])
+ Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8))
+ val readback = spark.read.option("wholetext", true).text(path)
+
+ assert(readback.rdd.getNumPartitions === 1)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org