You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/01/29 17:08:26 UTC

[incubator-iceberg] branch master updated: Add time-travel methods to IcebergGenerics (#750)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ce03e5  Add time-travel methods to IcebergGenerics (#750)
2ce03e5 is described below

commit 2ce03e559cbe4db34805a37b9dd460e1a72372f4
Author: maqroll <lo...@gmail.com>
AuthorDate: Wed Jan 29 18:08:16 2020 +0100

    Add time-travel methods to IcebergGenerics (#750)
---
 .../org/apache/iceberg/data/IcebergGenerics.java   |  32 ++--
 .../org/apache/iceberg/data/TestLocalScan.java     | 202 ++++++++++++++++-----
 2 files changed, 179 insertions(+), 55 deletions(-)

diff --git a/data/src/main/java/org/apache/iceberg/data/IcebergGenerics.java b/data/src/main/java/org/apache/iceberg/data/IcebergGenerics.java
index eca1c84..3dfcbac 100644
--- a/data/src/main/java/org/apache/iceberg/data/IcebergGenerics.java
+++ b/data/src/main/java/org/apache/iceberg/data/IcebergGenerics.java
@@ -20,10 +20,9 @@
 package org.apache.iceberg.data;
 
 import com.google.common.collect.ImmutableList;
-import java.util.List;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
 
 public class IcebergGenerics {
   private IcebergGenerics() {
@@ -41,13 +40,12 @@ public class IcebergGenerics {
 
   public static class ScanBuilder {
     private final Table table;
-    private Expression where = Expressions.alwaysTrue();
-    private List<String> columns = ImmutableList.of("*");
+    private TableScan tableScan;
     private boolean reuseContainers = false;
-    private boolean caseSensitive = true;
 
     public ScanBuilder(Table table) {
       this.table = table;
+      this.tableScan = table.newScan();
     }
 
     public ScanBuilder reuseContainers() {
@@ -56,28 +54,34 @@ public class IcebergGenerics {
     }
 
     public ScanBuilder where(Expression rowFilter) {
-      this.where = Expressions.and(where, rowFilter);
+      this.tableScan = tableScan.filter(rowFilter);
       return this;
     }
 
     public ScanBuilder caseInsensitive() {
-      this.caseSensitive = false;
+      this.tableScan = tableScan.caseSensitive(false);
       return this;
     }
 
     public ScanBuilder select(String... selectedColumns) {
-      this.columns = ImmutableList.copyOf(selectedColumns);
+      this.tableScan = tableScan.select(ImmutableList.copyOf(selectedColumns));
+      return this;
+    }
+
+    public ScanBuilder useSnapshot(long scanSnapshotId) {
+      this.tableScan = tableScan.useSnapshot(scanSnapshotId);
+      return this;
+    }
+
+    public ScanBuilder asOfTime(long scanTimestampMillis) {
+      this.tableScan = tableScan.asOfTime(scanTimestampMillis);
       return this;
     }
 
     public Iterable<Record> build() {
       return new TableScanIterable(
-        table
-          .newScan()
-          .filter(where)
-          .caseSensitive(caseSensitive)
-          .select(columns),
-        reuseContainers
+          tableScan,
+          reuseContainers
       );
     }
   }
diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
index 44b57f6..e41c41d 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.data;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.File;
@@ -34,9 +35,10 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Metrics;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -49,7 +51,6 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
@@ -101,6 +102,73 @@ public class TestLocalScan {
   private List<Record> file1Records = null;
   private List<Record> file2Records = null;
   private List<Record> file3Records = null;
+  private List<Record> file1FirstSnapshotRecords = null;
+  private List<Record> file2FirstSnapshotRecords = null;
+  private List<Record> file3FirstSnapshotRecords = null;
+  private List<Record> file1SecondSnapshotRecords = null;
+  private List<Record> file2SecondSnapshotRecords = null;
+  private List<Record> file3SecondSnapshotRecords = null;
+
+
+  private void overwriteExistingData() throws IOException {
+    Record record = GenericRecord.create(SCHEMA);
+
+    this.file1FirstSnapshotRecords = Lists.newArrayList(
+        record.copy(ImmutableMap.of("id", 4L, "data", "obscure")),
+        record.copy(ImmutableMap.of("id", 5L, "data", "secure")),
+        record.copy(ImmutableMap.of("id", 6L, "data", "fetta"))
+    );
+    DataFile file11 = writeFile(sharedTableLocation, format.addExtension("file-11"), file1FirstSnapshotRecords);
+
+    this.file2FirstSnapshotRecords = Lists.newArrayList(
+        record.copy(ImmutableMap.of("id", 14L, "data", "radical")),
+        record.copy(ImmutableMap.of("id", 15L, "data", "collocation")),
+        record.copy(ImmutableMap.of("id", 16L, "data", "book"))
+    );
+    DataFile file21 = writeFile(sharedTableLocation, format.addExtension("file-21"), file2FirstSnapshotRecords);
+
+    this.file3FirstSnapshotRecords = Lists.newArrayList(
+        record.copy(ImmutableMap.of("id", 24L, "data", "cloud")),
+        record.copy(ImmutableMap.of("id", 25L, "data", "zen")),
+        record.copy(ImmutableMap.of("id", 26L, "data", "sky"))
+    );
+    DataFile file31 = writeFile(sharedTableLocation, format.addExtension("file-31"), file3FirstSnapshotRecords);
+
+    sharedTable.newOverwrite()
+        .overwriteByRowFilter(Expressions.alwaysTrue())
+        .addFile(file11)
+        .addFile(file21)
+        .addFile(file31)
+        .commit();
+
+    this.file1SecondSnapshotRecords = Lists.newArrayList(
+        record.copy(ImmutableMap.of("id", 6L, "data", "brainy")),
+        record.copy(ImmutableMap.of("id", 7L, "data", "film")),
+        record.copy(ImmutableMap.of("id", 8L, "data", "fetta"))
+    );
+    DataFile file12 = writeFile(sharedTableLocation, format.addExtension("file-12"), file1SecondSnapshotRecords);
+
+    this.file2SecondSnapshotRecords = Lists.newArrayList(
+        record.copy(ImmutableMap.of("id", 16L, "data", "cake")),
+        record.copy(ImmutableMap.of("id", 17L, "data", "intrinsic")),
+        record.copy(ImmutableMap.of("id", 18L, "data", "paper"))
+    );
+    DataFile file22 = writeFile(sharedTableLocation, format.addExtension("file-22"), file2SecondSnapshotRecords);
+
+    this.file3SecondSnapshotRecords = Lists.newArrayList(
+        record.copy(ImmutableMap.of("id", 26L, "data", "belleview")),
+        record.copy(ImmutableMap.of("id", 27L, "data", "overview")),
+        record.copy(ImmutableMap.of("id", 28L, "data", "tender"))
+    );
+    DataFile file32 = writeFile(sharedTableLocation, format.addExtension("file-32"), file3SecondSnapshotRecords);
+
+    sharedTable.newOverwrite()
+        .overwriteByRowFilter(Expressions.alwaysTrue())
+        .addFile(file12)
+        .addFile(file22)
+        .addFile(file32)
+        .commit();
+  }
 
   @Before
   public void createTables() throws IOException {
@@ -119,7 +187,7 @@ public class TestLocalScan {
         record.copy(ImmutableMap.of("id", 1L, "data", "risky")),
         record.copy(ImmutableMap.of("id", 2L, "data", "falafel"))
     );
-    InputFile file1 = writeFile(sharedTableLocation, format.addExtension("file-1"), file1Records);
+    DataFile file1 = writeFile(sharedTableLocation, format.addExtension("file-1"), file1Records);
 
     Record nullData = record.copy();
     nullData.setField("id", 11L);
@@ -130,44 +198,20 @@ public class TestLocalScan {
         record.copy(ImmutableMap.of("id", 11L, "data", "evacuate")),
         record.copy(ImmutableMap.of("id", 12L, "data", "tissue"))
     );
-    InputFile file2 = writeFile(sharedTableLocation, format.addExtension("file-2"), file2Records);
+    DataFile file2 = writeFile(sharedTableLocation, format.addExtension("file-2"), file2Records);
 
     this.file3Records = Lists.newArrayList(
         record.copy(ImmutableMap.of("id", 20L, "data", "ocean")),
         record.copy(ImmutableMap.of("id", 21L, "data", "holistic")),
         record.copy(ImmutableMap.of("id", 22L, "data", "preventative"))
     );
-    InputFile file3 = writeFile(sharedTableLocation, format.addExtension("file-3"), file3Records);
+    DataFile file3 = writeFile(sharedTableLocation, format.addExtension("file-3"), file3Records);
 
     // commit the test data
     sharedTable.newAppend()
-        .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
-            .withInputFile(file1)
-            .withMetrics(new Metrics(3L,
-                null, // no column sizes
-                ImmutableMap.of(1, 3L), // value count
-                ImmutableMap.of(1, 0L), // null count
-                ImmutableMap.of(1, longToBuffer(0L)), // lower bounds
-                ImmutableMap.of(1, longToBuffer(2L)))) // upper bounds)
-            .build())
-        .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
-            .withInputFile(file2)
-            .withMetrics(new Metrics(3L,
-                null, // no column sizes
-                ImmutableMap.of(1, 3L), // value count
-                ImmutableMap.of(1, 0L), // null count
-                ImmutableMap.of(1, longToBuffer(10L)), // lower bounds
-                ImmutableMap.of(1, longToBuffer(12L)))) // upper bounds)
-            .build())
-        .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
-            .withInputFile(file3)
-            .withMetrics(new Metrics(3L,
-                null, // no column sizes
-                ImmutableMap.of(1, 3L), // value count
-                ImmutableMap.of(1, 0L), // null count
-                ImmutableMap.of(1, longToBuffer(20L)), // lower bounds
-                ImmutableMap.of(1, longToBuffer(22L)))) // upper bounds)
-            .build())
+        .appendFile(file1)
+        .appendFile(file2)
+        .appendFile(file3)
         .commit();
   }
 
@@ -279,31 +323,107 @@ public class TestLocalScan {
         Sets.newHashSet(transform(results, record -> record.getField("data").toString())));
   }
 
-  private InputFile writeFile(String location, String filename, List<Record> records) throws IOException {
+  @Test
+  public void testUseSnapshot() throws IOException {
+    overwriteExistingData();
+    Iterable<Record> results = IcebergGenerics.read(sharedTable)
+        .useSnapshot(/* first snapshot */ sharedTable.history().get(1).snapshotId())
+        .build();
+
+    Set<Record> expected = Sets.newHashSet();
+    expected.addAll(file1FirstSnapshotRecords);
+    expected.addAll(file2FirstSnapshotRecords);
+    expected.addAll(file3FirstSnapshotRecords);
+
+    Set<Record> records = Sets.newHashSet(results);
+    Assert.assertEquals("Should produce correct number of records",
+        expected.size(), records.size());
+    Assert.assertEquals("Record set should match",
+        Sets.newHashSet(expected), records);
+    Assert.assertNotNull(Iterables.get(records, 0).getField("id"));
+    Assert.assertNotNull(Iterables.get(records, 0).getField("data"));
+  }
+
+  @Test
+  public void testAsOfTime() throws IOException {
+    overwriteExistingData();
+    Iterable<Record> results = IcebergGenerics.read(sharedTable)
+        .asOfTime(/* timestamp first snapshot */ sharedTable.history().get(2).timestampMillis())
+        .build();
+
+    Set<Record> expected = Sets.newHashSet();
+    expected.addAll(file1SecondSnapshotRecords);
+    expected.addAll(file2SecondSnapshotRecords);
+    expected.addAll(file3SecondSnapshotRecords);
+
+    Set<Record> records = Sets.newHashSet(results);
+    Assert.assertEquals("Should produce correct number of records",
+        expected.size(), records.size());
+    Assert.assertEquals("Record set should match",
+        Sets.newHashSet(expected), records);
+    Assert.assertNotNull(Iterables.get(records, 0).getField("id"));
+    Assert.assertNotNull(Iterables.get(records, 0).getField("data"));
+  }
+
+  @Test
+  public void testUnknownSnapshotId() {
+    Long minSnapshotId = sharedTable.history().stream().map(h -> h.snapshotId()).min(Long::compareTo).get();
+
+    IcebergGenerics.ScanBuilder scanBuilder = IcebergGenerics.read(sharedTable);
+
+    AssertHelpers.assertThrows("Should fail on unknown snapshot id",
+        IllegalArgumentException.class,
+        "Cannot find snapshot with ID ",
+        () -> scanBuilder.useSnapshot(/* unknown snapshot id */ minSnapshotId - 1));
+  }
+
+  @Test
+  public void testAsOfTimeOlderThanFirstSnapshot() {
+    IcebergGenerics.ScanBuilder scanBuilder = IcebergGenerics.read(sharedTable);
+
+    AssertHelpers.assertThrows("Should fail on timestamp sooner than first write",
+        IllegalArgumentException.class,
+        "Cannot find a snapshot older than ",
+        () -> scanBuilder.asOfTime(/* older than first snapshot */ sharedTable.history().get(0).timestampMillis() - 1));
+  }
+
+  private DataFile writeFile(String location, String filename, List<Record> records) throws IOException {
     Path path = new Path(location, filename);
     FileFormat fileFormat = FileFormat.fromFileName(filename);
     Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename);
     switch (fileFormat) {
       case AVRO:
-        try (FileAppender<Record> appender = Avro.write(fromPath(path, CONF))
+        FileAppender avroAppender = Avro.write(fromPath(path, CONF))
             .schema(SCHEMA)
             .createWriterFunc(DataWriter::create)
             .named(fileFormat.name())
-            .build()) {
-          appender.addAll(records);
+            .build();
+        try {
+          avroAppender.addAll(records);
+        } finally {
+          avroAppender.close();
         }
 
-        return HadoopInputFile.fromPath(path, CONF);
+        return DataFiles.builder(PartitionSpec.unpartitioned())
+            .withInputFile(HadoopInputFile.fromPath(path, CONF))
+            .withMetrics(avroAppender.metrics())
+            .build();
 
       case PARQUET:
-        try (FileAppender<Record> appender = Parquet.write(fromPath(path, CONF))
+        FileAppender<Record> orcAppender = Parquet.write(fromPath(path, CONF))
             .schema(SCHEMA)
             .createWriterFunc(GenericParquetWriter::buildWriter)
-            .build()) {
-          appender.addAll(records);
+            .build();
+        try {
+          orcAppender.addAll(records);
+        } finally {
+          orcAppender.close();
         }
 
-        return HadoopInputFile.fromPath(path, CONF);
+        return DataFiles.builder(PartitionSpec.unpartitioned())
+            .withInputFile(HadoopInputFile.fromPath(path, CONF))
+            .withMetrics(orcAppender.metrics())
+            .build();
 
       default:
         throw new UnsupportedOperationException("Cannot write format: " + fileFormat);