You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/04/04 23:41:08 UTC

spark git commit: [SPARK-14259] [SQL] Merging small files together based on the cost of opening

Repository: spark
Updated Branches:
  refs/heads/master cc70f1741 -> 400b2f863


[SPARK-14259] [SQL] Merging small files together based on the cost of opening

## What changes were proposed in this pull request?

This PR basically re-do the things in #12068 but with a different model, which should work better in case of small files with different sizes.

## How was this patch tested?

Updated existing tests.

Ran a query on thousands of partitioned small files locally, with all default settings (the cost to open a file should be over estimated), the durations of tasks become smaller and smaller, which is good (the last few tasks will be shortest).

Author: Davies Liu <da...@databricks.com>

Closes #12095 from davies/file_cost.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/400b2f86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/400b2f86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/400b2f86

Branch: refs/heads/master
Commit: 400b2f863ffaa01a34a8dae1541c61526fef908b
Parents: cc70f17
Author: Davies Liu <da...@databricks.com>
Authored: Mon Apr 4 14:41:03 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Mon Apr 4 14:41:03 2016 -0700

----------------------------------------------------------------------
 .../execution/datasources/FileSourceStrategy.scala    | 13 +++++--------
 .../scala/org/apache/spark/sql/internal/SQLConf.scala | 13 ++++++++-----
 .../datasources/FileSourceStrategySuite.scala         | 14 ++++++++------
 3 files changed, 21 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/400b2f86/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index a143ac6..618d5a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -131,9 +131,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
 
         case _ =>
           val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
-          val maxFileNumInPartition = files.sqlContext.conf.filesMaxNumInPartition
+          val openCostInBytes = files.sqlContext.conf.filesOpenCostInBytes
           logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
-            s"max #files: $maxFileNumInPartition")
+            s"open cost is considered as scanning $openCostInBytes bytes.")
 
           val splitFiles = selectedPartitions.flatMap { partition =>
             partition.files.flatMap { file =>
@@ -151,7 +151,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
 
           /** Add the given file to the current partition. */
           def addFile(file: PartitionedFile): Unit = {
-            currentSize += file.length
+            currentSize += file.length + openCostInBytes
             currentFiles.append(file)
           }
 
@@ -171,13 +171,10 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
           // Assign files to partitions using "First Fit Decreasing" (FFD)
           // TODO: consider adding a slop factor here?
           splitFiles.foreach { file =>
-            if (currentSize + file.length > maxSplitBytes ||
-                currentFiles.length >= maxFileNumInPartition) {
+            if (currentSize + file.length > maxSplitBytes) {
               closePartition()
-              addFile(file)
-            } else {
-              addFile(file)
             }
+            addFile(file)
           }
           closePartition()
           partitions

http://git-wip-us.apache.org/repos/asf/spark/blob/400b2f86/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6cc72fb..a7c0be6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -510,10 +510,13 @@ object SQLConf {
     doc = "The maximum number of bytes to pack into a single partition when reading files.",
     isPublic = true)
 
-  val FILES_MAX_NUM_IN_PARTITION = longConf("spark.sql.files.maxNumInPartition",
-    defaultValue = Some(32),
-    doc = "The maximum number of files to pack into a single partition when reading files.",
-    isPublic = true)
+  val FILES_OPEN_COST_IN_BYTES = longConf("spark.sql.files.openCostInBytes",
+    defaultValue = Some(4 * 1024 * 1024),
+    doc = "The estimated cost to open a file, measured by the number of bytes could be scanned in" +
+      " the same time. This is used when putting multiple files into a partition. It's better to" +
+      " over estimated, then the partitions with small files will be faster than partitions with" +
+      " bigger files (which is scheduled first).",
+    isPublic = false)
 
   val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse",
     defaultValue = Some(true),
@@ -572,7 +575,7 @@ class SQLConf extends Serializable with CatalystConf with Logging {
 
   def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
 
-  def filesMaxNumInPartition: Long = getConf(FILES_MAX_NUM_IN_PARTITION)
+  def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
 
   def useCompression: Boolean = getConf(COMPRESS_CACHED)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/400b2f86/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 717a3a8..4446a68 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -76,7 +76,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
           "file2" -> 5,
           "file3" -> 5))
 
-    withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") {
+    withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "11",
+      SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
       checkScan(table.select('c1)) { partitions =>
         // 5 byte files should be laid out [(5, 5), (5)]
         assert(partitions.size == 2, "when checking partitions")
@@ -98,11 +99,12 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
       createTable(
         files = Seq(
           "file1" -> 15,
-          "file2" -> 4))
+          "file2" -> 3))
 
-    withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") {
+    withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10",
+      SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
       checkScan(table.select('c1)) { partitions =>
-        // Files should be laid out [(0-5), (5-10, 4)]
+        // Files should be laid out [(0-10), (10-15, 4)]
         assert(partitions.size == 2, "when checking partitions")
         assert(partitions(0).files.size == 1, "when checking partition 1")
         assert(partitions(1).files.size == 2, "when checking partition 2")
@@ -132,8 +134,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
           "file5" -> 1,
           "file6" -> 1))
 
-    withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "3",
-        SQLConf.FILES_MAX_NUM_IN_PARTITION.key -> "2") {
+    withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
+        SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
       checkScan(table.select('c1)) { partitions =>
         // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
         assert(partitions.size == 4, "when checking partitions")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org