You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/18 22:20:55 UTC

[incubator-iceberg] branch master updated: Account for file open cost in split planning (#130)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 11b5230  Account for file open cost in split planning (#130)
11b5230 is described below

commit 11b523097f086beadce891d9ce7349432923594a
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Mar 18 22:20:49 2019 +0000

    Account for file open cost in split planning (#130)
    
    This tasks with lots of small files by applying a file open cost that acts as a minimum file weight when combining files into tasks. Tasks with a large number of small files can be stragglers because of the latency to open each file.
---
 .../java/com/netflix/iceberg/BaseTableScan.java    |   7 +-
 .../java/com/netflix/iceberg/TableProperties.java  |   3 +
 .../com/netflix/iceberg/TestSplitPlanning.java     | 126 +++++++++++++++++++++
 3 files changed, 135 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
index 7fe5964..f8c4215 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
@@ -50,6 +50,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Function;
 
 import static com.netflix.iceberg.util.ThreadPools.getWorkerPool;
 
@@ -207,10 +208,14 @@ class BaseTableScan implements TableScan {
         TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT);
     int lookback = ops.current().propertyAsInt(
         TableProperties.SPLIT_LOOKBACK, TableProperties.SPLIT_LOOKBACK_DEFAULT);
+    long openFileCost = ops.current().propertyAsLong(
+      TableProperties.SPLIT_OPEN_FILE_COST, TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+    Function<FileScanTask, Long> weightFunc = file -> Math.max(file.length(), openFileCost);
 
     return CloseableIterable.transform(
         CloseableIterable.wrap(splitFiles(splitSize), splits ->
-            new BinPacking.PackingIterable<>(splits, splitSize, lookback, FileScanTask::length)),
+            new BinPacking.PackingIterable<>(splits, splitSize, lookback, weightFunc)),
         BaseCombinedScanTask::new);
   }
 
diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java
index 925900a..26128ee 100644
--- a/core/src/main/java/com/netflix/iceberg/TableProperties.java
+++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java
@@ -62,6 +62,9 @@ public class TableProperties {
   public static final String SPLIT_LOOKBACK = "read.split.planning-lookback";
   public static final int SPLIT_LOOKBACK_DEFAULT = 10;
 
+  public static final String SPLIT_OPEN_FILE_COST = "read.split.open-file-cost";
+  public static final long SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024; // 4MB
+
   public static final String OBJECT_STORE_ENABLED = "write.object-storage.enabled";
   public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;
 
diff --git a/core/src/test/java/com/netflix/iceberg/TestSplitPlanning.java b/core/src/test/java/com/netflix/iceberg/TestSplitPlanning.java
new file mode 100644
index 0000000..11df14d
--- /dev/null
+++ b/core/src/test/java/com/netflix/iceberg/TestSplitPlanning.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.netflix.iceberg.hadoop.HadoopTables;
+import com.netflix.iceberg.types.Types;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+
+public class TestSplitPlanning {
+
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private Table table = null;
+
+  @Before
+  public void setupTable() throws IOException {
+    File tableDir = temp.newFolder();
+    String tableLocation = tableDir.toURI().toString();
+    table = TABLES.create(SCHEMA, tableLocation);
+    table.updateProperties()
+        .set(TableProperties.SPLIT_SIZE, String.valueOf(128 * 1024 * 1024))
+        .set(TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(4 * 1024 * 1024))
+        .set(TableProperties.SPLIT_LOOKBACK, String.valueOf(Integer.MAX_VALUE))
+        .commit();
+  }
+
+  @Test
+  public void testBasicSplitPlanning() {
+    List<DataFile> files128MB = newFiles(4, 128 * 1024 * 1024);
+    appendFiles(files128MB);
+    // we expect 4 bins since split size is 128MB and we have 4 files 128MB each
+    Assert.assertEquals(4, Iterables.size(table.newScan().planTasks()));
+    List<DataFile> files32MB = newFiles(16, 32 * 1024 * 1024);
+    appendFiles(files32MB);
+    // we expect 8 bins after we add 16 files 32MB each as they will form additional 4 bins
+    Assert.assertEquals(8, Iterables.size(table.newScan().planTasks()));
+  }
+
+  @Test
+  public void testSplitPlanningWithSmallFiles() {
+    List<DataFile> files60MB = newFiles(50, 60 * 1024 * 1024);
+    List<DataFile> files5KB = newFiles(370, 5 * 1024);
+    Iterable<DataFile> files = Iterables.concat(files60MB, files5KB);
+    appendFiles(files);
+    // 50 files of size 60MB will form 25 bins as split size is 128MB
+    // each of those bins will have 8MB left and all 370 files of size 5KB would end up
+    // in one of them without "read.split.open-file-cost"
+    // as "read.split.open-file-cost" is 4MB, each of the original 25 bins will get at most 2 files
+    // so 50 of 370 files will be packed into the existing 25 bins and the remaining 320 files
+    // will form additional 10 bins, resulting in 35 bins in total
+    Assert.assertEquals(35, Iterables.size(table.newScan().planTasks()));
+  }
+
+  @Test
+  public void testSplitPlanningWithNoMinWeight() {
+    table.updateProperties()
+        .set(TableProperties.SPLIT_OPEN_FILE_COST, "0")
+        .commit();
+    List<DataFile> files60MB = newFiles(2, 60 * 1024 * 1024);
+    List<DataFile> files5KB = newFiles(100, 5 * 1024);
+    Iterable<DataFile> files = Iterables.concat(files60MB, files5KB);
+    appendFiles(files);
+    // all small files will be packed into one bin as "read.split.open-file-cost" is set to 0
+    Assert.assertEquals(1, Iterables.size(table.newScan().planTasks()));
+  }
+
+  private void appendFiles(Iterable<DataFile> files) {
+    AppendFiles appendFiles = table.newAppend();
+    files.forEach(appendFiles::appendFile);
+    appendFiles.commit();
+  }
+
+  private List<DataFile> newFiles(int numFiles, long sizeInBytes) {
+    List<DataFile> files = Lists.newArrayList();
+    for (int fileNum = 0; fileNum < numFiles; fileNum++) {
+      files.add(newFile(sizeInBytes));
+    }
+    return files;
+  }
+
+  private DataFile newFile(long sizeInBytes) {
+    String fileName = UUID.randomUUID().toString();
+    return DataFiles.builder(PartitionSpec.unpartitioned())
+        .withPath(FileFormat.PARQUET.addExtension(fileName))
+        .withFileSizeInBytes(sizeInBytes)
+        .withRecordCount(2)
+        .build();
+  }
+}