You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/18 22:20:55 UTC
[incubator-iceberg] branch master updated: Account for file open
cost in split planning (#130)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 11b5230 Account for file open cost in split planning (#130)
11b5230 is described below
commit 11b523097f086beadce891d9ce7349432923594a
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Mar 18 22:20:49 2019 +0000
Account for file open cost in split planning (#130)
This tasks with lots of small files by applying a file open cost that acts as a minimum file weight when combining files into tasks. Tasks with a large number of small files can be stragglers because of the latency to open each file.
---
.../java/com/netflix/iceberg/BaseTableScan.java | 7 +-
.../java/com/netflix/iceberg/TableProperties.java | 3 +
.../com/netflix/iceberg/TestSplitPlanning.java | 126 +++++++++++++++++++++
3 files changed, 135 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
index 7fe5964..f8c4215 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
@@ -50,6 +50,7 @@ import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Function;
import static com.netflix.iceberg.util.ThreadPools.getWorkerPool;
@@ -207,10 +208,14 @@ class BaseTableScan implements TableScan {
TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT);
int lookback = ops.current().propertyAsInt(
TableProperties.SPLIT_LOOKBACK, TableProperties.SPLIT_LOOKBACK_DEFAULT);
+ long openFileCost = ops.current().propertyAsLong(
+ TableProperties.SPLIT_OPEN_FILE_COST, TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+ Function<FileScanTask, Long> weightFunc = file -> Math.max(file.length(), openFileCost);
return CloseableIterable.transform(
CloseableIterable.wrap(splitFiles(splitSize), splits ->
- new BinPacking.PackingIterable<>(splits, splitSize, lookback, FileScanTask::length)),
+ new BinPacking.PackingIterable<>(splits, splitSize, lookback, weightFunc)),
BaseCombinedScanTask::new);
}
diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java
index 925900a..26128ee 100644
--- a/core/src/main/java/com/netflix/iceberg/TableProperties.java
+++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java
@@ -62,6 +62,9 @@ public class TableProperties {
public static final String SPLIT_LOOKBACK = "read.split.planning-lookback";
public static final int SPLIT_LOOKBACK_DEFAULT = 10;
+ public static final String SPLIT_OPEN_FILE_COST = "read.split.open-file-cost";
+ public static final long SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024; // 4MB
+
public static final String OBJECT_STORE_ENABLED = "write.object-storage.enabled";
public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;
diff --git a/core/src/test/java/com/netflix/iceberg/TestSplitPlanning.java b/core/src/test/java/com/netflix/iceberg/TestSplitPlanning.java
new file mode 100644
index 0000000..11df14d
--- /dev/null
+++ b/core/src/test/java/com/netflix/iceberg/TestSplitPlanning.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.netflix.iceberg.hadoop.HadoopTables;
+import com.netflix.iceberg.types.Types;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+
+public class TestSplitPlanning {
+
+ private static final Configuration CONF = new Configuration();
+ private static final HadoopTables TABLES = new HadoopTables(CONF);
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+ private Table table = null;
+
+ @Before
+ public void setupTable() throws IOException {
+ File tableDir = temp.newFolder();
+ String tableLocation = tableDir.toURI().toString();
+ table = TABLES.create(SCHEMA, tableLocation);
+ table.updateProperties()
+ .set(TableProperties.SPLIT_SIZE, String.valueOf(128 * 1024 * 1024))
+ .set(TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(4 * 1024 * 1024))
+ .set(TableProperties.SPLIT_LOOKBACK, String.valueOf(Integer.MAX_VALUE))
+ .commit();
+ }
+
+ @Test
+ public void testBasicSplitPlanning() {
+ List<DataFile> files128MB = newFiles(4, 128 * 1024 * 1024);
+ appendFiles(files128MB);
+ // we expect 4 bins since split size is 128MB and we have 4 files 128MB each
+ Assert.assertEquals(4, Iterables.size(table.newScan().planTasks()));
+ List<DataFile> files32MB = newFiles(16, 32 * 1024 * 1024);
+ appendFiles(files32MB);
+ // we expect 8 bins after we add 16 files 32MB each as they will form additional 4 bins
+ Assert.assertEquals(8, Iterables.size(table.newScan().planTasks()));
+ }
+
+ @Test
+ public void testSplitPlanningWithSmallFiles() {
+ List<DataFile> files60MB = newFiles(50, 60 * 1024 * 1024);
+ List<DataFile> files5KB = newFiles(370, 5 * 1024);
+ Iterable<DataFile> files = Iterables.concat(files60MB, files5KB);
+ appendFiles(files);
+ // 50 files of size 60MB will form 25 bins as split size is 128MB
+ // each of those bins will have 8MB left and all 370 files of size 5KB would end up
+ // in one of them without "read.split.open-file-cost"
+ // as "read.split.open-file-cost" is 4MB, each of the original 25 bins will get at most 2 files
+ // so 50 of 370 files will be packed into the existing 25 bins and the remaining 320 files
+ // will form additional 10 bins, resulting in 35 bins in total
+ Assert.assertEquals(35, Iterables.size(table.newScan().planTasks()));
+ }
+
+ @Test
+ public void testSplitPlanningWithNoMinWeight() {
+ table.updateProperties()
+ .set(TableProperties.SPLIT_OPEN_FILE_COST, "0")
+ .commit();
+ List<DataFile> files60MB = newFiles(2, 60 * 1024 * 1024);
+ List<DataFile> files5KB = newFiles(100, 5 * 1024);
+ Iterable<DataFile> files = Iterables.concat(files60MB, files5KB);
+ appendFiles(files);
+ // all small files will be packed into one bin as "read.split.open-file-cost" is set to 0
+ Assert.assertEquals(1, Iterables.size(table.newScan().planTasks()));
+ }
+
+ private void appendFiles(Iterable<DataFile> files) {
+ AppendFiles appendFiles = table.newAppend();
+ files.forEach(appendFiles::appendFile);
+ appendFiles.commit();
+ }
+
+ private List<DataFile> newFiles(int numFiles, long sizeInBytes) {
+ List<DataFile> files = Lists.newArrayList();
+ for (int fileNum = 0; fileNum < numFiles; fileNum++) {
+ files.add(newFile(sizeInBytes));
+ }
+ return files;
+ }
+
+ private DataFile newFile(long sizeInBytes) {
+ String fileName = UUID.randomUUID().toString();
+ return DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(FileFormat.PARQUET.addExtension(fileName))
+ .withFileSizeInBytes(sizeInBytes)
+ .withRecordCount(2)
+ .build();
+ }
+}