You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/01/29 17:08:26 UTC
[incubator-iceberg] branch master updated: Add time-travel methods
to IcebergGenerics (#750)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 2ce03e5 Add time-travel methods to IcebergGenerics (#750)
2ce03e5 is described below
commit 2ce03e559cbe4db34805a37b9dd460e1a72372f4
Author: maqroll <lo...@gmail.com>
AuthorDate: Wed Jan 29 18:08:16 2020 +0100
Add time-travel methods to IcebergGenerics (#750)
---
.../org/apache/iceberg/data/IcebergGenerics.java | 32 ++--
.../org/apache/iceberg/data/TestLocalScan.java | 202 ++++++++++++++++-----
2 files changed, 179 insertions(+), 55 deletions(-)
diff --git a/data/src/main/java/org/apache/iceberg/data/IcebergGenerics.java b/data/src/main/java/org/apache/iceberg/data/IcebergGenerics.java
index eca1c84..3dfcbac 100644
--- a/data/src/main/java/org/apache/iceberg/data/IcebergGenerics.java
+++ b/data/src/main/java/org/apache/iceberg/data/IcebergGenerics.java
@@ -20,10 +20,9 @@
package org.apache.iceberg.data;
import com.google.common.collect.ImmutableList;
-import java.util.List;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
public class IcebergGenerics {
private IcebergGenerics() {
@@ -41,13 +40,12 @@ public class IcebergGenerics {
public static class ScanBuilder {
private final Table table;
- private Expression where = Expressions.alwaysTrue();
- private List<String> columns = ImmutableList.of("*");
+ private TableScan tableScan;
private boolean reuseContainers = false;
- private boolean caseSensitive = true;
public ScanBuilder(Table table) {
this.table = table;
+ this.tableScan = table.newScan();
}
public ScanBuilder reuseContainers() {
@@ -56,28 +54,34 @@ public class IcebergGenerics {
}
public ScanBuilder where(Expression rowFilter) {
- this.where = Expressions.and(where, rowFilter);
+ this.tableScan = tableScan.filter(rowFilter);
return this;
}
public ScanBuilder caseInsensitive() {
- this.caseSensitive = false;
+ this.tableScan = tableScan.caseSensitive(false);
return this;
}
public ScanBuilder select(String... selectedColumns) {
- this.columns = ImmutableList.copyOf(selectedColumns);
+ this.tableScan = tableScan.select(ImmutableList.copyOf(selectedColumns));
+ return this;
+ }
+
+ public ScanBuilder useSnapshot(long scanSnapshotId) {
+ this.tableScan = tableScan.useSnapshot(scanSnapshotId);
+ return this;
+ }
+
+ public ScanBuilder asOfTime(long scanTimestampMillis) {
+ this.tableScan = tableScan.asOfTime(scanTimestampMillis);
return this;
}
public Iterable<Record> build() {
return new TableScanIterable(
- table
- .newScan()
- .filter(where)
- .caseSensitive(caseSensitive)
- .select(columns),
- reuseContainers
+ tableScan,
+ reuseContainers
);
}
}
diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
index 44b57f6..e41c41d 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.data;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.File;
@@ -34,9 +35,10 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -49,7 +51,6 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
@@ -101,6 +102,73 @@ public class TestLocalScan {
private List<Record> file1Records = null;
private List<Record> file2Records = null;
private List<Record> file3Records = null;
+ private List<Record> file1FirstSnapshotRecords = null;
+ private List<Record> file2FirstSnapshotRecords = null;
+ private List<Record> file3FirstSnapshotRecords = null;
+ private List<Record> file1SecondSnapshotRecords = null;
+ private List<Record> file2SecondSnapshotRecords = null;
+ private List<Record> file3SecondSnapshotRecords = null;
+
+
+ private void overwriteExistingData() throws IOException {
+ Record record = GenericRecord.create(SCHEMA);
+
+ this.file1FirstSnapshotRecords = Lists.newArrayList(
+ record.copy(ImmutableMap.of("id", 4L, "data", "obscure")),
+ record.copy(ImmutableMap.of("id", 5L, "data", "secure")),
+ record.copy(ImmutableMap.of("id", 6L, "data", "fetta"))
+ );
+ DataFile file11 = writeFile(sharedTableLocation, format.addExtension("file-11"), file1FirstSnapshotRecords);
+
+ this.file2FirstSnapshotRecords = Lists.newArrayList(
+ record.copy(ImmutableMap.of("id", 14L, "data", "radical")),
+ record.copy(ImmutableMap.of("id", 15L, "data", "collocation")),
+ record.copy(ImmutableMap.of("id", 16L, "data", "book"))
+ );
+ DataFile file21 = writeFile(sharedTableLocation, format.addExtension("file-21"), file2FirstSnapshotRecords);
+
+ this.file3FirstSnapshotRecords = Lists.newArrayList(
+ record.copy(ImmutableMap.of("id", 24L, "data", "cloud")),
+ record.copy(ImmutableMap.of("id", 25L, "data", "zen")),
+ record.copy(ImmutableMap.of("id", 26L, "data", "sky"))
+ );
+ DataFile file31 = writeFile(sharedTableLocation, format.addExtension("file-31"), file3FirstSnapshotRecords);
+
+ sharedTable.newOverwrite()
+ .overwriteByRowFilter(Expressions.alwaysTrue())
+ .addFile(file11)
+ .addFile(file21)
+ .addFile(file31)
+ .commit();
+
+ this.file1SecondSnapshotRecords = Lists.newArrayList(
+ record.copy(ImmutableMap.of("id", 6L, "data", "brainy")),
+ record.copy(ImmutableMap.of("id", 7L, "data", "film")),
+ record.copy(ImmutableMap.of("id", 8L, "data", "fetta"))
+ );
+ DataFile file12 = writeFile(sharedTableLocation, format.addExtension("file-12"), file1SecondSnapshotRecords);
+
+ this.file2SecondSnapshotRecords = Lists.newArrayList(
+ record.copy(ImmutableMap.of("id", 16L, "data", "cake")),
+ record.copy(ImmutableMap.of("id", 17L, "data", "intrinsic")),
+ record.copy(ImmutableMap.of("id", 18L, "data", "paper"))
+ );
+ DataFile file22 = writeFile(sharedTableLocation, format.addExtension("file-22"), file2SecondSnapshotRecords);
+
+ this.file3SecondSnapshotRecords = Lists.newArrayList(
+ record.copy(ImmutableMap.of("id", 26L, "data", "belleview")),
+ record.copy(ImmutableMap.of("id", 27L, "data", "overview")),
+ record.copy(ImmutableMap.of("id", 28L, "data", "tender"))
+ );
+ DataFile file32 = writeFile(sharedTableLocation, format.addExtension("file-32"), file3SecondSnapshotRecords);
+
+ sharedTable.newOverwrite()
+ .overwriteByRowFilter(Expressions.alwaysTrue())
+ .addFile(file12)
+ .addFile(file22)
+ .addFile(file32)
+ .commit();
+ }
@Before
public void createTables() throws IOException {
@@ -119,7 +187,7 @@ public class TestLocalScan {
record.copy(ImmutableMap.of("id", 1L, "data", "risky")),
record.copy(ImmutableMap.of("id", 2L, "data", "falafel"))
);
- InputFile file1 = writeFile(sharedTableLocation, format.addExtension("file-1"), file1Records);
+ DataFile file1 = writeFile(sharedTableLocation, format.addExtension("file-1"), file1Records);
Record nullData = record.copy();
nullData.setField("id", 11L);
@@ -130,44 +198,20 @@ public class TestLocalScan {
record.copy(ImmutableMap.of("id", 11L, "data", "evacuate")),
record.copy(ImmutableMap.of("id", 12L, "data", "tissue"))
);
- InputFile file2 = writeFile(sharedTableLocation, format.addExtension("file-2"), file2Records);
+ DataFile file2 = writeFile(sharedTableLocation, format.addExtension("file-2"), file2Records);
this.file3Records = Lists.newArrayList(
record.copy(ImmutableMap.of("id", 20L, "data", "ocean")),
record.copy(ImmutableMap.of("id", 21L, "data", "holistic")),
record.copy(ImmutableMap.of("id", 22L, "data", "preventative"))
);
- InputFile file3 = writeFile(sharedTableLocation, format.addExtension("file-3"), file3Records);
+ DataFile file3 = writeFile(sharedTableLocation, format.addExtension("file-3"), file3Records);
// commit the test data
sharedTable.newAppend()
- .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
- .withInputFile(file1)
- .withMetrics(new Metrics(3L,
- null, // no column sizes
- ImmutableMap.of(1, 3L), // value count
- ImmutableMap.of(1, 0L), // null count
- ImmutableMap.of(1, longToBuffer(0L)), // lower bounds
- ImmutableMap.of(1, longToBuffer(2L)))) // upper bounds)
- .build())
- .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
- .withInputFile(file2)
- .withMetrics(new Metrics(3L,
- null, // no column sizes
- ImmutableMap.of(1, 3L), // value count
- ImmutableMap.of(1, 0L), // null count
- ImmutableMap.of(1, longToBuffer(10L)), // lower bounds
- ImmutableMap.of(1, longToBuffer(12L)))) // upper bounds)
- .build())
- .appendFile(DataFiles.builder(PartitionSpec.unpartitioned())
- .withInputFile(file3)
- .withMetrics(new Metrics(3L,
- null, // no column sizes
- ImmutableMap.of(1, 3L), // value count
- ImmutableMap.of(1, 0L), // null count
- ImmutableMap.of(1, longToBuffer(20L)), // lower bounds
- ImmutableMap.of(1, longToBuffer(22L)))) // upper bounds)
- .build())
+ .appendFile(file1)
+ .appendFile(file2)
+ .appendFile(file3)
.commit();
}
@@ -279,31 +323,107 @@ public class TestLocalScan {
Sets.newHashSet(transform(results, record -> record.getField("data").toString())));
}
- private InputFile writeFile(String location, String filename, List<Record> records) throws IOException {
+ @Test
+ public void testUseSnapshot() throws IOException {
+ overwriteExistingData();
+ Iterable<Record> results = IcebergGenerics.read(sharedTable)
+ .useSnapshot(/* first snapshot */ sharedTable.history().get(1).snapshotId())
+ .build();
+
+ Set<Record> expected = Sets.newHashSet();
+ expected.addAll(file1FirstSnapshotRecords);
+ expected.addAll(file2FirstSnapshotRecords);
+ expected.addAll(file3FirstSnapshotRecords);
+
+ Set<Record> records = Sets.newHashSet(results);
+ Assert.assertEquals("Should produce correct number of records",
+ expected.size(), records.size());
+ Assert.assertEquals("Record set should match",
+ Sets.newHashSet(expected), records);
+ Assert.assertNotNull(Iterables.get(records, 0).getField("id"));
+ Assert.assertNotNull(Iterables.get(records, 0).getField("data"));
+ }
+
+ @Test
+ public void testAsOfTime() throws IOException {
+ overwriteExistingData();
+ Iterable<Record> results = IcebergGenerics.read(sharedTable)
+ .asOfTime(/* timestamp first snapshot */ sharedTable.history().get(2).timestampMillis())
+ .build();
+
+ Set<Record> expected = Sets.newHashSet();
+ expected.addAll(file1SecondSnapshotRecords);
+ expected.addAll(file2SecondSnapshotRecords);
+ expected.addAll(file3SecondSnapshotRecords);
+
+ Set<Record> records = Sets.newHashSet(results);
+ Assert.assertEquals("Should produce correct number of records",
+ expected.size(), records.size());
+ Assert.assertEquals("Record set should match",
+ Sets.newHashSet(expected), records);
+ Assert.assertNotNull(Iterables.get(records, 0).getField("id"));
+ Assert.assertNotNull(Iterables.get(records, 0).getField("data"));
+ }
+
+ @Test
+ public void testUnknownSnapshotId() {
+ Long minSnapshotId = sharedTable.history().stream().map(h -> h.snapshotId()).min(Long::compareTo).get();
+
+ IcebergGenerics.ScanBuilder scanBuilder = IcebergGenerics.read(sharedTable);
+
+ AssertHelpers.assertThrows("Should fail on unknown snapshot id",
+ IllegalArgumentException.class,
+ "Cannot find snapshot with ID ",
+ () -> scanBuilder.useSnapshot(/* unknown snapshot id */ minSnapshotId - 1));
+ }
+
+ @Test
+ public void testAsOfTimeOlderThanFirstSnapshot() {
+ IcebergGenerics.ScanBuilder scanBuilder = IcebergGenerics.read(sharedTable);
+
+ AssertHelpers.assertThrows("Should fail on timestamp sooner than first write",
+ IllegalArgumentException.class,
+ "Cannot find a snapshot older than ",
+ () -> scanBuilder.asOfTime(/* older than first snapshot */ sharedTable.history().get(0).timestampMillis() - 1));
+ }
+
+ private DataFile writeFile(String location, String filename, List<Record> records) throws IOException {
Path path = new Path(location, filename);
FileFormat fileFormat = FileFormat.fromFileName(filename);
Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename);
switch (fileFormat) {
case AVRO:
- try (FileAppender<Record> appender = Avro.write(fromPath(path, CONF))
+ FileAppender avroAppender = Avro.write(fromPath(path, CONF))
.schema(SCHEMA)
.createWriterFunc(DataWriter::create)
.named(fileFormat.name())
- .build()) {
- appender.addAll(records);
+ .build();
+ try {
+ avroAppender.addAll(records);
+ } finally {
+ avroAppender.close();
}
- return HadoopInputFile.fromPath(path, CONF);
+ return DataFiles.builder(PartitionSpec.unpartitioned())
+ .withInputFile(HadoopInputFile.fromPath(path, CONF))
+ .withMetrics(avroAppender.metrics())
+ .build();
case PARQUET:
- try (FileAppender<Record> appender = Parquet.write(fromPath(path, CONF))
+ FileAppender<Record> orcAppender = Parquet.write(fromPath(path, CONF))
.schema(SCHEMA)
.createWriterFunc(GenericParquetWriter::buildWriter)
- .build()) {
- appender.addAll(records);
+ .build();
+ try {
+ orcAppender.addAll(records);
+ } finally {
+ orcAppender.close();
}
- return HadoopInputFile.fromPath(path, CONF);
+ return DataFiles.builder(PartitionSpec.unpartitioned())
+ .withInputFile(HadoopInputFile.fromPath(path, CONF))
+ .withMetrics(orcAppender.metrics())
+ .build();
default:
throw new UnsupportedOperationException("Cannot write format: " + fileFormat);