You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/14 18:58:15 UTC
spark git commit: Revert "[SPARK-23249][SQL] Improved block merging
logic for partitions"
Repository: spark
Updated Branches:
refs/heads/master 140f87533 -> 400a1d9e2
Revert "[SPARK-23249][SQL] Improved block merging logic for partitions"
This reverts commit 8c21170decfb9ca4d3233e1ea13bd1b6e3199ed9.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/400a1d9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/400a1d9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/400a1d9e
Branch: refs/heads/master
Commit: 400a1d9e25c1196f0be87323bd89fb3af0660166
Parents: 140f875
Author: gatorsmile <ga...@gmail.com>
Authored: Wed Feb 14 10:57:12 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Feb 14 10:57:12 2018 -0800
----------------------------------------------------------------------
.../sql/execution/DataSourceScanExec.scala | 29 ++++++--------------
.../datasources/FileSourceStrategySuite.scala | 15 ++++++----
2 files changed, 17 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/400a1d9e/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index ba1157d..08ff33a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -444,29 +444,16 @@ case class FileSourceScanExec(
currentSize = 0
}
- def addFile(file: PartitionedFile): Unit = {
- currentFiles += file
- currentSize += file.length + openCostInBytes
- }
-
- var frontIndex = 0
- var backIndex = splitFiles.length - 1
-
- while (frontIndex <= backIndex) {
- addFile(splitFiles(frontIndex))
- frontIndex += 1
- while (frontIndex <= backIndex &&
- currentSize + splitFiles(frontIndex).length <= maxSplitBytes) {
- addFile(splitFiles(frontIndex))
- frontIndex += 1
- }
- while (backIndex > frontIndex &&
- currentSize + splitFiles(backIndex).length <= maxSplitBytes) {
- addFile(splitFiles(backIndex))
- backIndex -= 1
+ // Assign files to partitions using "Next Fit Decreasing"
+ splitFiles.foreach { file =>
+ if (currentSize + file.length > maxSplitBytes) {
+ closePartition()
}
- closePartition()
+ // Add the given file to the current partition.
+ currentSize += file.length + openCostInBytes
+ currentFiles += file
}
+ closePartition()
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/400a1d9e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index bfccc93..c1d61b8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -141,17 +141,16 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
checkScan(table.select('c1)) { partitions =>
- // Files should be laid out [(file1, file6), (file2, file3), (file4, file5)]
- assert(partitions.size == 3, "when checking partitions")
- assert(partitions(0).files.size == 2, "when checking partition 1")
+ // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
+ assert(partitions.size == 4, "when checking partitions")
+ assert(partitions(0).files.size == 1, "when checking partition 1")
assert(partitions(1).files.size == 2, "when checking partition 2")
assert(partitions(2).files.size == 2, "when checking partition 3")
+ assert(partitions(3).files.size == 1, "when checking partition 4")
- // First partition reads (file1, file6)
+ // First partition reads (file1)
assert(partitions(0).files(0).start == 0)
assert(partitions(0).files(0).length == 2)
- assert(partitions(0).files(1).start == 0)
- assert(partitions(0).files(1).length == 1)
// Second partition reads (file2, file3)
assert(partitions(1).files(0).start == 0)
@@ -164,6 +163,10 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
assert(partitions(2).files(0).length == 1)
assert(partitions(2).files(1).start == 0)
assert(partitions(2).files(1).length == 1)
+
+ // Final partition reads (file6)
+ assert(partitions(3).files(0).start == 0)
+ assert(partitions(3).files(0).length == 1)
}
checkPartitionSchema(StructType(Nil))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org