You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/03/16 00:50:40 UTC

[incubator-iceberg] branch master updated: Split files when planning scan tasks (#119)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new df5c3f5  Split files when planning scan tasks (#119)
df5c3f5 is described below

commit df5c3f5cb0f7786eea52b4e7782b55807611342f
Author: Ratandeep Ratti <rd...@gmail.com>
AuthorDate: Fri Mar 15 17:50:36 2019 -0700

    Split files when planning scan tasks (#119)
---
 .../main/java/com/netflix/iceberg/FileFormat.java  |  14 ++-
 .../java/com/netflix/iceberg/FileScanTask.java     |   7 ++
 .../java/com/netflix/iceberg/BaseFileScanTask.java |  84 +++++++++++++
 .../java/com/netflix/iceberg/BaseTableScan.java    |  14 ++-
 .../netflix/iceberg/TestSplitScanTaskIterator.java |  66 +++++++++++
 .../java/com/netflix/iceberg/TestSplitScan.java    | 131 +++++++++++++++++++++
 .../iceberg/spark/source/TestFilteredScan.java     |   5 +-
 7 files changed, 313 insertions(+), 8 deletions(-)

diff --git a/api/src/main/java/com/netflix/iceberg/FileFormat.java b/api/src/main/java/com/netflix/iceberg/FileFormat.java
index 0a6922a..274a867 100644
--- a/api/src/main/java/com/netflix/iceberg/FileFormat.java
+++ b/api/src/main/java/com/netflix/iceberg/FileFormat.java
@@ -25,14 +25,20 @@ import com.netflix.iceberg.types.Comparators;
  * Enum of supported file formats.
  */
 public enum FileFormat {
-  ORC("orc"),
-  PARQUET("parquet"),
-  AVRO("avro");
+  ORC("orc", true),
+  PARQUET("parquet", true),
+  AVRO("avro", true);
 
   private final String ext;
+  private final boolean splittable;
 
-  FileFormat(String ext) {
+  FileFormat(String ext, boolean splittable) {
     this.ext = "." + ext;
+    this.splittable = splittable;
+  }
+
+  public boolean isSplittable() {
+    return splittable;
   }
 
   /**
diff --git a/api/src/main/java/com/netflix/iceberg/FileScanTask.java b/api/src/main/java/com/netflix/iceberg/FileScanTask.java
index 1007ef2..6d124d1 100644
--- a/api/src/main/java/com/netflix/iceberg/FileScanTask.java
+++ b/api/src/main/java/com/netflix/iceberg/FileScanTask.java
@@ -64,6 +64,13 @@ public interface FileScanTask extends ScanTask {
    */
   Expression residual();
 
+  /**
+   * Splits this scan task into component {@link FileScanTask scan tasks}, each of {@code splitSize} size
+   * @param splitSize The size of a component scan task
+   * @return an Iterable of {@link FileScanTask scan tasks}
+   */
+  Iterable<FileScanTask> split(long splitSize);
+
   @Override
   default boolean isFileScanTask() {
     return true;
diff --git a/core/src/main/java/com/netflix/iceberg/BaseFileScanTask.java b/core/src/main/java/com/netflix/iceberg/BaseFileScanTask.java
index 6ebde16..0467f90 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseFileScanTask.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseFileScanTask.java
@@ -20,8 +20,10 @@
 package com.netflix.iceberg;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
 import com.netflix.iceberg.expressions.Expression;
 import com.netflix.iceberg.expressions.ResidualEvaluator;
+import java.util.Iterator;
 
 class BaseFileScanTask implements FileScanTask {
   private final DataFile file;
@@ -67,6 +69,15 @@ class BaseFileScanTask implements FileScanTask {
   }
 
   @Override
+  public Iterable<FileScanTask> split(long splitSize) {
+    if (file.format().isSplittable()) {
+      return () -> new SplitScanTaskIterator(splitSize, this);
+    } else {
+      return ImmutableList.of(this);
+    }
+  }
+
+  @Override
   public String toString() {
     return Objects.toStringHelper(this)
         .add("file", file.path())
@@ -74,4 +85,77 @@ class BaseFileScanTask implements FileScanTask {
         .add("residual", residual())
         .toString();
   }
+
+  /**
+   * Visible for Testing
+   */
+  static final class SplitScanTaskIterator implements Iterator<FileScanTask> {
+    private long offset;
+    private long remainingLen;
+    private long splitSize;
+    private final FileScanTask fileScanTask;
+
+    SplitScanTaskIterator(long splitSize, FileScanTask fileScanTask) {
+      this.offset = 0;
+      this.remainingLen = fileScanTask.length();
+      this.splitSize = splitSize;
+      this.fileScanTask = fileScanTask;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return remainingLen > 0;
+    }
+
+    @Override
+    public FileScanTask next() {
+      long len = Math.min(splitSize, remainingLen);
+      final FileScanTask splitTask = new SplitScanTask(offset, len, fileScanTask);
+      offset += len;
+      remainingLen -= len;
+      return splitTask;
+    }
+  }
+
+  private static final class SplitScanTask implements FileScanTask {
+    private final long len;
+    private final long offset;
+    private final FileScanTask fileScanTask;
+
+    SplitScanTask(long offset, long len, FileScanTask fileScanTask) {
+      this.offset = offset;
+      this.len = len;
+      this.fileScanTask = fileScanTask;
+    }
+
+    @Override
+    public DataFile file() {
+      return fileScanTask.file();
+    }
+
+    @Override
+    public PartitionSpec spec() {
+      return fileScanTask.spec();
+    }
+
+    @Override
+    public long start() {
+      return offset;
+    }
+
+    @Override
+    public long length() {
+      return len;
+    }
+
+    @Override
+    public Expression residual() {
+      return fileScanTask.residual();
+    }
+
+    @Override
+    public Iterable<FileScanTask> split(long splitSize) {
+      throw new UnsupportedOperationException("Cannot split a task which is already split");
+    }
+  }
 }
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
index 0c44df7..7fe5964 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
@@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -208,8 +209,8 @@ class BaseTableScan implements TableScan {
         TableProperties.SPLIT_LOOKBACK, TableProperties.SPLIT_LOOKBACK_DEFAULT);
 
     return CloseableIterable.transform(
-        CloseableIterable.wrap(planFiles(), files ->
-            new BinPacking.PackingIterable<>(files, splitSize, lookback, FileScanTask::length)),
+        CloseableIterable.wrap(splitFiles(splitSize), splits ->
+            new BinPacking.PackingIterable<>(splits, splitSize, lookback, FileScanTask::length)),
         BaseCombinedScanTask::new);
   }
 
@@ -231,4 +232,13 @@ class BaseTableScan implements TableScan {
         .add("filter", rowFilter)
         .toString();
   }
+
+  private CloseableIterable<FileScanTask> splitFiles(long splitSize) {
+    CloseableIterable<FileScanTask> fileScanTasks = planFiles();
+    Iterable<FileScanTask> splitTasks = FluentIterable
+        .from(fileScanTasks)
+        .transformAndConcat(input -> input.split(splitSize));
+    // Capture manifests which can be closed after scan planning
+    return CloseableIterable.combine(splitTasks, ImmutableList.of(fileScanTasks));
+  }
 }
diff --git a/core/src/test/java/com/netflix/iceberg/TestSplitScanTaskIterator.java b/core/src/test/java/com/netflix/iceberg/TestSplitScanTaskIterator.java
new file mode 100644
index 0000000..cee02b5
--- /dev/null
+++ b/core/src/test/java/com/netflix/iceberg/TestSplitScanTaskIterator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+import java.util.List;
+import static com.netflix.iceberg.BaseFileScanTask.SplitScanTaskIterator;
+
+public class TestSplitScanTaskIterator {
+  @Test
+  public void testSplits() {
+    verify(15L, 100L, l(l(0L, 15L), l(15L, 15L), l(30L, 15L), l(45L, 15L), l(60L, 15L), l(75L, 15L), l(90L, 10L)));
+    verify(10L, 10L, l(l(0L, 10L)));
+    verify(20L, 10L, l(l(0L, 10L)));
+  }
+
+  private static void verify(long splitSize, long fileLen, List<List<Long>> offsetLenPairs) {
+    List<FileScanTask> tasks = Lists.newArrayList(new SplitScanTaskIterator(splitSize, new MockFileScanTask(fileLen)));
+    int i = 0;
+    for (FileScanTask task : tasks) {
+      List<Long> split = offsetLenPairs.get(i);
+      long offset = split.get(0);
+      long length = split.get(1);
+      Assert.assertEquals(offset, task.start());
+      Assert.assertEquals(length, task.length());
+      i += 1;
+    }
+  }
+
+  private static class MockFileScanTask extends BaseFileScanTask {
+    private final long length;
+
+    MockFileScanTask(long length) {
+      super(null, null, null, null);
+      this.length = length;
+    }
+
+    @Override
+    public long length() {
+      return length;
+    }
+  }
+
+  private <T> List<T> l(T... items) {
+    return Lists.newArrayList(items);
+  }
+}
diff --git a/data/src/test/java/com/netflix/iceberg/TestSplitScan.java b/data/src/test/java/com/netflix/iceberg/TestSplitScan.java
new file mode 100644
index 0000000..022c8cc
--- /dev/null
+++ b/data/src/test/java/com/netflix/iceberg/TestSplitScan.java
@@ -0,0 +1,131 @@
+package com.netflix.iceberg;
+
+import com.google.common.collect.Lists;
+import com.netflix.iceberg.avro.Avro;
+import com.netflix.iceberg.data.IcebergGenerics;
+import com.netflix.iceberg.data.RandomGenericData;
+import com.netflix.iceberg.data.Record;
+import com.netflix.iceberg.data.avro.DataWriter;
+import com.netflix.iceberg.data.parquet.GenericParquetWriter;
+import com.netflix.iceberg.hadoop.HadoopTables;
+import com.netflix.iceberg.io.FileAppender;
+import com.netflix.iceberg.parquet.Parquet;
+import com.netflix.iceberg.types.Types;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+@RunWith(Parameterized.class)
+public class TestSplitScan {
+  private static final Configuration CONF = new Configuration();
+  private static final HadoopTables TABLES = new HadoopTables(CONF);
+
+  private static final long SPLIT_SIZE = 16 * 1024 * 1024;
+
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.IntegerType.get()),
+      required(2, "data", Types.StringType.get())
+  );
+
+  private Table table;
+  private File tableLocation;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private List<Record> expectedRecords;
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    return new Object[][]{
+        new Object[]{"parquet"},
+        new Object[]{"avro"}
+    };
+  }
+
+  private final FileFormat format;
+
+  public TestSplitScan(String format) {
+    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+  }
+
+  @Before
+  public void setup() throws IOException {
+    tableLocation = new File(temp.newFolder(), "table");
+    setupTable();
+  }
+
+  @Test
+  public void test() {
+    Assert.assertEquals(
+        "There should be 4 tasks created since file size is approximately close to 64MB and split size 16MB",
+        4, Lists.newArrayList(table.newScan().planTasks()).size());
+    List<Record> records = Lists.newArrayList(IcebergGenerics.read(table).build());
+    Assert.assertEquals(expectedRecords.size(), records.size());
+    for (int i = 0; i < expectedRecords.size(); i++) {
+      Assert.assertEquals(expectedRecords.get(i), records.get(i));
+    }
+  }
+
+  private void setupTable() throws IOException {
+    table = TABLES.create(SCHEMA, tableLocation.toString());
+    table.updateProperties()
+        .set(TableProperties.SPLIT_SIZE, String.valueOf(SPLIT_SIZE))
+        .commit();
+
+    // With these number of records and the given SCHEMA
+    // we can effectively write a file of approximate size 64 MB
+    int numRecords = 2500000;
+    expectedRecords = RandomGenericData.generate(SCHEMA, numRecords, 0L);
+    File file = writeToFile(expectedRecords, format);
+
+    DataFile dataFile = DataFiles.builder(PartitionSpec.unpartitioned())
+        .withRecordCount(expectedRecords.size())
+        .withFileSizeInBytes(file.length())
+        .withPath(file.toString())
+        .withFormat(format)
+        .build();
+
+    table.newAppend().appendFile(dataFile).commit();
+  }
+
+  private File writeToFile(List<Record> records, FileFormat format) throws IOException {
+    File file = temp.newFile();
+    Assert.assertTrue(file.delete());
+
+    switch (format) {
+      case AVRO:
+        try (FileAppender<Record> appender = Avro.write(Files.localOutput(file))
+            .schema(SCHEMA)
+            .createWriterFunc(DataWriter::create)
+            .named(format.name())
+            .build()) {
+          appender.addAll(records);
+        }
+        break;
+      case PARQUET:
+        try (FileAppender<Record> appender = Parquet.write(Files.localOutput(file))
+            .schema(SCHEMA)
+            .createWriterFunc(GenericParquetWriter::buildWriter)
+            .named(format.name())
+            .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(SPLIT_SIZE))
+            .build()) {
+          appender.addAll(records);
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("Cannot write format: " + format);
+    }
+    return file;
+  }
+}
diff --git a/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java b/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java
index e0c3fa1..2340cd6 100644
--- a/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java
+++ b/spark/src/test/java/com/netflix/iceberg/spark/source/TestFilteredScan.java
@@ -428,8 +428,9 @@ public class TestFilteredScan {
     File location = new File(parent, desc);
     Table byId = TABLES.create(SCHEMA, spec, location.toString());
 
-    // do not combine splits because the tests expect a split per partition
-    byId.updateProperties().set("read.split.target-size", "1").commit();
+    // Do not combine or split files because the tests expect a split per partition.
+    // A target split size of 2048 helps us achieve that.
+    byId.updateProperties().set("read.split.target-size", "2048").commit();
 
     // copy the unpartitioned table into the partitioned table to produce the partitioned data
     Dataset<Row> allRows = spark.read()