You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/27 17:09:40 UTC
[spark] branch master updated: [SPARK-27291][SQL]
PartitioningAwareFileIndex: Filter out empty files on listing files
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 49b0411 [SPARK-27291][SQL] PartitioningAwareFileIndex: Filter out empty files on listing files
49b0411 is described below
commit 49b0411549dad82d0ae8daf93a9ae2624f206791
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Wed Mar 27 10:08:38 2019 -0700
[SPARK-27291][SQL] PartitioningAwareFileIndex: Filter out empty files on listing files
## What changes were proposed in this pull request?
In https://github.com/apache/spark/pull/23130, all empty files are excluded from target file splits in `FileSourceScanExec`.
In File source V2, we should keep the same behavior.
This PR suggests to filter out empty files on listing files in `PartitioningAwareFileIndex` so that the upper level doesn't need to handle them.
## How was this patch tested?
Unit test
Closes #24227 from gengliangwang/ignoreEmptyFile.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/sql/execution/DataSourceScanExec.scala | 4 ++--
.../datasources/PartitioningAwareFileIndex.scala | 7 +++++--
.../org/apache/spark/sql/sources/SaveLoadSuite.scala | 16 +++++++++-------
3 files changed, 16 insertions(+), 11 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 92f7d66..33adfce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -382,7 +382,7 @@ case class FileSourceScanExec(
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val filesGroupedToBuckets =
selectedPartitions.flatMap { p =>
- p.files.filter(_.getLen > 0).map { f =>
+ p.files.map { f =>
PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values)
}
}.groupBy { f =>
@@ -426,7 +426,7 @@ case class FileSourceScanExec(
s"open cost is considered as scanning $openCostInBytes bytes.")
val splitFiles = selectedPartitions.flatMap { partition =>
- partition.files.filter(_.getLen > 0).flatMap { file =>
+ partition.files.flatMap { file =>
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
val isSplitable = relation.fileFormat.isSplitable(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index f5ae095..29b304a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -58,15 +58,18 @@ abstract class PartitioningAwareFileIndex(
override def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+ def isNonEmptyFile(f: FileStatus): Boolean = {
+ isDataPath(f.getPath) && f.getLen > 0
+ }
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
- PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
+ PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil
} else {
prunePartitions(partitionFilters, partitionSpec()).map {
case PartitionPath(values, path) =>
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
- existingDir.filter(f => isDataPath(f.getPath))
+ existingDir.filter(isNonEmptyFile)
case None =>
// Directory does not exist, or has no children files
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
index 048e4b8..7680f61 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -146,13 +146,15 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA
}
test("skip empty files in non bucketed read") {
- withTempDir { dir =>
- val path = dir.getCanonicalPath
- Files.write(Paths.get(path, "empty"), Array.empty[Byte])
- Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8))
- val readback = spark.read.option("wholetext", true).text(path)
-
- assert(readback.rdd.getNumPartitions === 1)
+ Seq("csv", "text").foreach { format =>
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+ Files.write(Paths.get(path, "empty"), Array.empty[Byte])
+ Files.write(Paths.get(path, "notEmpty"), "a".getBytes(StandardCharsets.UTF_8))
+ val readBack = spark.read.option("wholetext", true).format(format).load(path)
+
+ assert(readBack.rdd.getNumPartitions === 1)
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org