You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/31 17:14:09 UTC

spark git commit: [SPARK-23249][SQL] Improved block merging logic for partitions

Repository: spark
Updated Branches:
  refs/heads/master 48dd6a4c7 -> 8c21170de


[SPARK-23249][SQL] Improved block merging logic for partitions

## What changes were proposed in this pull request?

Change DataSourceScanExec so that when grouping blocks together into partitions, also checks the end of the sorted list of splits to more efficiently fill out partitions.

## How was this patch tested?

Updated old test to reflect the new logic, which causes the # of partitions to drop from 4 -> 3
Also, a current test exists to test large non-splittable files at https://github.com/glentakahashi/spark/blob/c575977a5952bf50b605be8079c9be1e30f3bd36/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala#L346

## Rationale

The current bin-packing method of next-fit descending for blocks into partitions is sub-optimal in a lot of cases and will result in extra partitions, un-even distribution of block-counts across partitions, and un-even distribution of partition sizes.

As an example, 128 files ranging from 1MB, 2MB,...127MB,128MB. will result in 82 partitions with the current algorithm, but only 64 using this algorithm. Also in this example, the max # of blocks per partition in NFD is 13, while in this algorithm is is 2.

More generally, running a simulation of 1000 runs using 128MB blocksize, between 1-1000 normally distributed file sizes between 1-500Mb, you can see an improvement of approx 5% reduction of partition counts, and a large reduction in standard deviation of blocks per partition.

This algorithm also runs in O(n) time as NFD does, and in every case is strictly better results than NFD.

Overall, the more even distribution of blocks across partitions and therefore reduced partition counts should result in a small but significant performance increase across the board

Author: Glen Takahashi <gt...@palantir.com>

Closes #20372 from glentakahashi/feature/improved-block-merging.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c21170d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c21170d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c21170d

Branch: refs/heads/master
Commit: 8c21170decfb9ca4d3233e1ea13bd1b6e3199ed9
Parents: 48dd6a4
Author: Glen Takahashi <gt...@palantir.com>
Authored: Thu Feb 1 01:14:01 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Feb 1 01:14:01 2018 +0800

----------------------------------------------------------------------
 .../sql/execution/DataSourceScanExec.scala      | 29 ++++++++++++++------
 .../datasources/FileSourceStrategySuite.scala   | 15 ++++------
 2 files changed, 27 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8c21170d/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index aa66ee7..f7732e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -445,16 +445,29 @@ case class FileSourceScanExec(
       currentSize = 0
     }
 
-    // Assign files to partitions using "Next Fit Decreasing"
-    splitFiles.foreach { file =>
-      if (currentSize + file.length > maxSplitBytes) {
-        closePartition()
+    def addFile(file: PartitionedFile): Unit = {
+        currentFiles += file
+        currentSize += file.length + openCostInBytes
+    }
+
+    var frontIndex = 0
+    var backIndex = splitFiles.length - 1
+
+    while (frontIndex <= backIndex) {
+      addFile(splitFiles(frontIndex))
+      frontIndex += 1
+      while (frontIndex <= backIndex &&
+             currentSize + splitFiles(frontIndex).length <= maxSplitBytes) {
+        addFile(splitFiles(frontIndex))
+        frontIndex += 1
+      }
+      while (backIndex > frontIndex &&
+             currentSize + splitFiles(backIndex).length <= maxSplitBytes) {
+        addFile(splitFiles(backIndex))
+        backIndex -= 1
       }
-      // Add the given file to the current partition.
-      currentSize += file.length + openCostInBytes
-      currentFiles += file
+      closePartition()
     }
-    closePartition()
 
     new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8c21170d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index c1d61b8..bfccc93 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -141,16 +141,17 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
     withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
         SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
       checkScan(table.select('c1)) { partitions =>
-        // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
-        assert(partitions.size == 4, "when checking partitions")
-        assert(partitions(0).files.size == 1, "when checking partition 1")
+        // Files should be laid out [(file1, file6), (file2, file3), (file4, file5)]
+        assert(partitions.size == 3, "when checking partitions")
+        assert(partitions(0).files.size == 2, "when checking partition 1")
         assert(partitions(1).files.size == 2, "when checking partition 2")
         assert(partitions(2).files.size == 2, "when checking partition 3")
-        assert(partitions(3).files.size == 1, "when checking partition 4")
 
-        // First partition reads (file1)
+        // First partition reads (file1, file6)
         assert(partitions(0).files(0).start == 0)
         assert(partitions(0).files(0).length == 2)
+        assert(partitions(0).files(1).start == 0)
+        assert(partitions(0).files(1).length == 1)
 
         // Second partition reads (file2, file3)
         assert(partitions(1).files(0).start == 0)
@@ -163,10 +164,6 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
         assert(partitions(2).files(0).length == 1)
         assert(partitions(2).files(1).start == 0)
         assert(partitions(2).files(1).length == 1)
-
-        // Final partition reads (file6)
-        assert(partitions(3).files(0).start == 0)
-        assert(partitions(3).files(0).length == 1)
       }
 
       checkPartitionSchema(StructType(Nil))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org