You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/14 18:58:15 UTC

spark git commit: Revert "[SPARK-23249][SQL] Improved block merging logic for partitions"

Repository: spark
Updated Branches:
  refs/heads/master 140f87533 -> 400a1d9e2


Revert "[SPARK-23249][SQL] Improved block merging logic for partitions"

This reverts commit 8c21170decfb9ca4d3233e1ea13bd1b6e3199ed9.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/400a1d9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/400a1d9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/400a1d9e

Branch: refs/heads/master
Commit: 400a1d9e25c1196f0be87323bd89fb3af0660166
Parents: 140f875
Author: gatorsmile <ga...@gmail.com>
Authored: Wed Feb 14 10:57:12 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Feb 14 10:57:12 2018 -0800

----------------------------------------------------------------------
 .../sql/execution/DataSourceScanExec.scala      | 29 ++++++--------------
 .../datasources/FileSourceStrategySuite.scala   | 15 ++++++----
 2 files changed, 17 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/400a1d9e/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index ba1157d..08ff33a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -444,29 +444,16 @@ case class FileSourceScanExec(
       currentSize = 0
     }
 
-    def addFile(file: PartitionedFile): Unit = {
-        currentFiles += file
-        currentSize += file.length + openCostInBytes
-    }
-
-    var frontIndex = 0
-    var backIndex = splitFiles.length - 1
-
-    while (frontIndex <= backIndex) {
-      addFile(splitFiles(frontIndex))
-      frontIndex += 1
-      while (frontIndex <= backIndex &&
-             currentSize + splitFiles(frontIndex).length <= maxSplitBytes) {
-        addFile(splitFiles(frontIndex))
-        frontIndex += 1
-      }
-      while (backIndex > frontIndex &&
-             currentSize + splitFiles(backIndex).length <= maxSplitBytes) {
-        addFile(splitFiles(backIndex))
-        backIndex -= 1
+    // Assign files to partitions using "Next Fit Decreasing"
+    splitFiles.foreach { file =>
+      if (currentSize + file.length > maxSplitBytes) {
+        closePartition()
       }
-      closePartition()
+      // Add the given file to the current partition.
+      currentSize += file.length + openCostInBytes
+      currentFiles += file
     }
+    closePartition()
 
     new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/400a1d9e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index bfccc93..c1d61b8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -141,17 +141,16 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
     withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
         SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
       checkScan(table.select('c1)) { partitions =>
-        // Files should be laid out [(file1, file6), (file2, file3), (file4, file5)]
-        assert(partitions.size == 3, "when checking partitions")
-        assert(partitions(0).files.size == 2, "when checking partition 1")
+        // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
+        assert(partitions.size == 4, "when checking partitions")
+        assert(partitions(0).files.size == 1, "when checking partition 1")
         assert(partitions(1).files.size == 2, "when checking partition 2")
         assert(partitions(2).files.size == 2, "when checking partition 3")
+        assert(partitions(3).files.size == 1, "when checking partition 4")
 
-        // First partition reads (file1, file6)
+        // First partition reads (file1)
         assert(partitions(0).files(0).start == 0)
         assert(partitions(0).files(0).length == 2)
-        assert(partitions(0).files(1).start == 0)
-        assert(partitions(0).files(1).length == 1)
 
         // Second partition reads (file2, file3)
         assert(partitions(1).files(0).start == 0)
@@ -164,6 +163,10 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
         assert(partitions(2).files(0).length == 1)
         assert(partitions(2).files(1).start == 0)
         assert(partitions(2).files(1).length == 1)
+
+        // Final partition reads (file6)
+        assert(partitions(3).files(0).start == 0)
+        assert(partitions(3).files(0).length == 1)
       }
 
       checkPartitionSchema(StructType(Nil))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org