You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/16 00:50:40 UTC
[incubator-iceberg] branch master updated: Split files when
planning scan tasks (#119)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new df5c3f5 Split files when planning scan tasks (#119)
df5c3f5 is described below
commit df5c3f5cb0f7786eea52b4e7782b55807611342f
Author: Ratandeep Ratti <rd...@gmail.com>
AuthorDate: Fri Mar 15 17:50:36 2019 -0700
Split files when planning scan tasks (#119)
---
.../main/java/com/netflix/iceberg/FileFormat.java | 14 ++-
.../java/com/netflix/iceberg/FileScanTask.java | 7 ++
.../java/com/netflix/iceberg/BaseFileScanTask.java | 84 +++++++++++++
.../java/com/netflix/iceberg/BaseTableScan.java | 14 ++-
.../netflix/iceberg/TestSplitScanTaskIterator.java | 66 +++++++++++
.../java/com/netflix/iceberg/TestSplitScan.java | 131 +++++++++++++++++++++
.../iceberg/spark/source/TestFilteredScan.java | 5 +-
7 files changed, 313 insertions(+), 8 deletions(-)
diff --git a/api/src/main/java/com/netflix/iceberg/FileFormat.java b/api/src/main/java/com/netflix/iceberg/FileFormat.java
index 0a6922a..274a867 100644
--- a/api/src/main/java/com/netflix/iceberg/FileFormat.java
+++ b/api/src/main/java/com/netflix/iceberg/FileFormat.java
@@ -25,14 +25,20 @@ import com.netflix.iceberg.types.Comparators;
* Enum of supported file formats.
*/
public enum FileFormat {
- ORC("orc"),
- PARQUET("parquet"),
- AVRO("avro");
+ ORC("orc", true),
+ PARQUET("parquet", true),
+ AVRO("avro", true);
private final String ext;
+ private final boolean splittable;
- FileFormat(String ext) {
+ FileFormat(String ext, boolean splittable) {
this.ext = "." + ext;
+ this.splittable = splittable;
+ }
+
+ public boolean isSplittable() {
+ return splittable;
}
/**
diff --git a/api/src/main/java/com/netflix/iceberg/FileScanTask.java b/api/src/main/java/com/netflix/iceberg/FileScanTask.java
index 1007ef2..6d124d1 100644
--- a/api/src/main/java/com/netflix/iceberg/FileScanTask.java
+++ b/api/src/main/java/com/netflix/iceberg/FileScanTask.java
@@ -64,6 +64,13 @@ public interface FileScanTask extends ScanTask {
*/
Expression residual();
+ /**
+ * Splits this scan task into component {@link FileScanTask scan tasks}, each of {@code splitSize} size
+ * @param splitSize The size of a component scan task
+ * @return an Iterable of {@link FileScanTask scan tasks}
+ */
+ Iterable<FileScanTask> split(long splitSize);
+
@Override
default boolean isFileScanTask() {
return true;
diff --git a/core/src/main/java/com/netflix/iceberg/BaseFileScanTask.java b/core/src/main/java/com/netflix/iceberg/BaseFileScanTask.java
index 6ebde16..0467f90 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseFileScanTask.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseFileScanTask.java
@@ -20,8 +20,10 @@
package com.netflix.iceberg;
import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
import com.netflix.iceberg.expressions.Expression;
import com.netflix.iceberg.expressions.ResidualEvaluator;
+import java.util.Iterator;
class BaseFileScanTask implements FileScanTask {
private final DataFile file;
@@ -67,6 +69,15 @@ class BaseFileScanTask implements FileScanTask {
}
@Override
+ public Iterable<FileScanTask> split(long splitSize) {
+ if (file.format().isSplittable()) {
+ return () -> new SplitScanTaskIterator(splitSize, this);
+ } else {
+ return ImmutableList.of(this);
+ }
+ }
+
+ @Override
public String toString() {
return Objects.toStringHelper(this)
.add("file", file.path())
@@ -74,4 +85,77 @@ class BaseFileScanTask implements FileScanTask {
.add("residual", residual())
.toString();
}
+
+ /**
+ * Visible for Testing
+ */
+ static final class SplitScanTaskIterator implements Iterator<FileScanTask> {
+ private long offset;
+ private long remainingLen;
+ private long splitSize;
+ private final FileScanTask fileScanTask;
+
+ SplitScanTaskIterator(long splitSize, FileScanTask fileScanTask) {
+ this.offset = 0;
+ this.remainingLen = fileScanTask.length();
+ this.splitSize = splitSize;
+ this.fileScanTask = fileScanTask;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return remainingLen > 0;
+ }
+
+ @Override
+ public FileScanTask next() {
+ long len = Math.min(splitSize, remainingLen);
+ final FileScanTask splitTask = new SplitScanTask(offset, len, fileScanTask);
+ offset += len;
+ remainingLen -= len;
+ return splitTask;
+ }
+ }
+
+ private static final class SplitScanTask implements FileScanTask {
+ private final long len;
+ private final long offset;
+ private final FileScanTask fileScanTask;
+
+ SplitScanTask(long offset, long len, FileScanTask fileScanTask) {
+ this.offset = offset;
+ this.len = len;
+ this.fileScanTask = fileScanTask;
+ }
+
+ @Override
+ public DataFile file() {
+ return fileScanTask.file();
+ }
+
+ @Override
+ public PartitionSpec spec() {
+ return fileScanTask.spec();
+ }
+
+ @Override
+ public long start() {
+ return offset;
+ }
+
+ @Override
+ public long length() {
+ return len;
+ }
+
+ @Override
+ public Expression residual() {
+ return fileScanTask.residual();
+ }
+
+ @Override
+ public Iterable<FileScanTask> split(long splitSize) {
+ throw new UnsupportedOperationException("Cannot split a task which is already split");
+ }
+ }
}
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
index 0c44df7..7fe5964 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@@ -208,8 +209,8 @@ class BaseTableScan implements TableScan {
TableProperties.SPLIT_LOOKBACK, TableProperties.SPLIT_LOOKBACK_DEFAULT);
return CloseableIterable.transform(
- CloseableIterable.wrap(planFiles(), files ->
- new BinPacking.PackingIterable<>(files, splitSize, lookback, FileScanTask::length)),
+ CloseableIterable.wrap(splitFiles(splitSize), splits ->
+ new BinPacking.PackingIterable<>(splits, splitSize, lookback, FileScanTask::length)),
BaseCombinedScanTask::new);
}
@@ -231,4 +232,13 @@ class BaseTableScan implements TableScan {
.add("filter", rowFilter)
.toString();
}
+
+ private CloseableIterable<FileScanTask> splitFiles(long splitSize) {
+ CloseableIterable<FileScanTask> fileScanTasks = planFiles();
+ Iterable<FileScanTask> splitTasks = FluentIterable
+ .from(fileScanTasks)
+ .transformAndConcat(input -> input.split(splitSize));
+ // Capture manifests which can be closed after scan planning
+ return CloseableIterable.combine(splitTasks, ImmutableList.of(fileScanTasks));
+ }
}
diff --git a/core/src/test/java/com/netflix/iceberg/TestSplitScanTaskIterator.java b/core/src/test/java/com/netflix/iceberg/TestSplitScanTaskIterator.java
new file mode 100644
index 0000000..cee02b5
--- /dev/null
+++ b/core/src/test/java/com/netflix/iceberg/TestSplitScanTaskIterator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+import java.util.List;
+import static com.netflix.iceberg.BaseFileScanTask.SplitScanTaskIterator;
+
+public class TestSplitScanTaskIterator {
+ @Test
+ public void testSplits() {
+ verify(15L, 100L, l(l(0L, 15L), l(15L, 15L), l(30L, 15L), l(45L, 15L), l(60L, 15L), l(75L, 15L), l(90L, 10L)));
+ verify(10L, 10L, l(l(0L, 10L)));
+ verify(20L, 10L, l(l(0L, 10L)));
+ }
+
+ private static void verify(long splitSize, long fileLen, List<List<Long>> offsetLenPairs) {
+ List<FileScanTask> tasks = Lists.newArrayList(new SplitScanTaskIterator(splitSize, new MockFileScanTask(fileLen)));
+ int i = 0;
+ for (FileScanTask task : tasks) {
+ List<Long> split = offsetLenPairs.get(i);
+ long offset = split.get(0);
+ long length = split.get(1);
+ Assert.assertEquals(offset, task.start());
+ Assert.assertEquals(length, task.length());
+ i += 1;
+ }
+ }
+
+ private static class MockFileScanTask extends BaseFileScanTask {
+ private final long length;
+
+ MockFileScanTask(long length) {
+ super(null, null, null, null);
+ this.length = length;
+ }
+
+ @Override
+ public long length() {
+ return length;
+ }
+ }
+
+ private <T> List<T> l(T... items) {
+ return Lists.newArrayList(items);
+ }
+}
diff --git a/data/src/test/java/com/netflix/iceberg/TestSplitScan.java b/data/src/test/java/com/netflix/iceberg/TestSplitScan.java
new file mode 100644
index 0000000..022c8cc
--- /dev/null
+++ b/data/src/test/java/com/netflix/iceberg/TestSplitScan.java
@@ -0,0 +1,131 @@
+package com.netflix.iceberg;
+
+import com.google.common.collect.Lists;
+import com.netflix.iceberg.avro.Avro;
+import com.netflix.iceberg.data.IcebergGenerics;
+import com.netflix.iceberg.data.RandomGenericData;
+import com.netflix.iceberg.data.Record;
+import com.netflix.iceberg.data.avro.DataWriter;
+import com.netflix.iceberg.data.parquet.GenericParquetWriter;
+import com.netflix.iceberg.hadoop.HadoopTables;
+import com.netflix.iceberg.io.FileAppender;
+import com.netflix.iceberg.parquet.Parquet;
+import com.netflix.iceberg.types.Types;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+@RunWith(Parameterized.class)
+public class TestSplitScan {
+ private static final Configuration CONF = new Configuration();
+ private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+ private static final long SPLIT_SIZE = 16 * 1024 * 1024;
+
+ private static final Schema SCHEMA = new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ required(2, "data", Types.StringType.get())
+ );
+
+ private Table table;
+ private File tableLocation;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+ private List<Record> expectedRecords;
+
+ @Parameterized.Parameters
+ public static Object[][] parameters() {
+ return new Object[][]{
+ new Object[]{"parquet"},
+ new Object[]{"avro"}
+ };
+ }
+
+ private final FileFormat format;
+
+ public TestSplitScan(String format) {
+ this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+ }
+
+ @Before
+ public void setup() throws IOException {
+ tableLocation = new File(temp.newFolder(), "table");
+ setupTable();
+ }
+
+ @Test
+ public void test() {
+ Assert.assertEquals(
+ "There should be 4 tasks created since file size is approximately close to 64MB and split size 16MB",
+ 4, Lists.newArrayList(table.newScan().planTasks()).size());
+ List<Record> records = Lists.newArrayList(IcebergGenerics.read(table).build());
+ Assert.assertEquals(expectedRecords.size(), records.size());
+ for (int i = 0; i < expectedRecords.size(); i++) {
+ Assert.assertEquals(expectedRecords.get(i), records.get(i));
+ }
+ }
+
+ private void setupTable() throws IOException {
+ table = TABLES.create(SCHEMA, tableLocation.toString());
+ table.updateProperties()
+ .set(TableProperties.SPLIT_SIZE, String.valueOf(SPLIT_SIZE))
+ .commit();
+
+ // With these number of records and the given SCHEMA
+ // we can effectively write a file of approximate size 64 MB
+ int numRecords = 2500000;
+ expectedRecords = RandomGenericData.generate(SCHEMA, numRecords, 0L);
+ File file = writeToFile(expectedRecords, format);
+
+ DataFile dataFile = DataFiles.builder(PartitionSpec.unpartitioned())
+ .withRecordCount(expectedRecords.size())
+ .withFileSizeInBytes(file.length())
+ .withPath(file.toString())
+ .withFormat(format)
+ .build();
+
+ table.newAppend().appendFile(dataFile).commit();
+ }
+
+ private File writeToFile(List<Record> records, FileFormat format) throws IOException {
+ File file = temp.newFile();
+ Assert.assertTrue(file.delete());
+
+ switch (format) {
+ case AVRO:
+ try (FileAppender<Record> appender = Avro.write(Files.localOutput(file))
+ .schema(SCHEMA)
+ .createWriterFunc(DataWriter::create)
+ .named(format.name())
+ .build()) {
+ appender.addAll(records);
+ }
+ break;
+ case PARQUET:
+ try (FileAppender<Record> appender = Parquet.write(Files.localOutput(file))
+ .schema(SCHEMA)
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .named(format.name())
+ .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(SPLIT_SIZE))
+ .build()) {
+ appender.addAll(records);
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Cannot write format: " + format);
+ }
+ return file;
+ }
+}
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java
index e0c3fa1..2340cd6 100644
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java
@@ -428,8 +428,9 @@ public class TestFilteredScan {
File location = new File(parent, desc);
Table byId = TABLES.create(SCHEMA, spec, location.toString());
- // do not combine splits because the tests expect a split per partition
- byId.updateProperties().set("read.split.target-size", "1").commit();
+ // Do not combine or split files because the tests expect a split per partition.
+ // A target split size of 2048 helps us achieve that.
+ byId.updateProperties().set("read.split.target-size", "2048").commit();
// copy the unpartitioned table into the partitioned table to produce the partitioned data
Dataset<Row> allRows = spark.read()