You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/10/03 15:34:53 UTC
[iceberg] branch master updated: API, Core: Support scanning tags and branches (#5364)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 8b8a103860 API, Core: Support scanning tags and branches (#5364)
8b8a103860 is described below
commit 8b8a1038609610873c7d40023b4fc98ba341a502
Author: Amogh Jahagirdar <ja...@amazon.com>
AuthorDate: Mon Oct 3 08:34:47 2022 -0700
API, Core: Support scanning tags and branches (#5364)
---
.palantir/revapi.yml | 3 +
.../main/java/org/apache/iceberg/TableScan.java | 16 +++-
.../apache/iceberg/BaseAllMetadataTableScan.java | 5 +
.../java/org/apache/iceberg/BaseTableScan.java | 14 ++-
.../apache/iceberg/IncrementalDataTableScan.java | 8 ++
.../java/org/apache/iceberg/TestDataTableScan.java | 105 +++++++++++++++++++++
6 files changed, 147 insertions(+), 4 deletions(-)
diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index d6f09d177d..ffbcbeee15 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -40,6 +40,9 @@ acceptedBreaks:
- code: "java.method.removed"
old: "method org.apache.iceberg.RowDelta org.apache.iceberg.RowDelta::validateNoConflictingAppends(org.apache.iceberg.expressions.Expression)"
justification: "Deprecations for 1.0 release"
+ - code: "java.method.addedToInterface"
+ new: "method org.apache.iceberg.TableScan org.apache.iceberg.TableScan::useRef(java.lang.String)"
+ justification: "Adding table scan APIs to support scanning from refs"
release-base-0.13.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java
index 5f55b82d96..ebff7ad51b 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -39,13 +39,25 @@ public interface TableScan extends Scan<TableScan, FileScanTask, CombinedScanTas
*/
TableScan useSnapshot(long snapshotId);
+ /**
+ * Create a new {@link TableScan} from this scan's configuration that will use the given
+ * reference.
+ *
+ * @param ref reference
+ * @return a new scan based on the given reference.
+ * @throws IllegalArgumentException if a reference with the given name could not be found
+ */
+ TableScan useRef(String ref);
+
/**
* Create a new {@link TableScan} from this scan's configuration that will use the most recent
- * snapshot as of the given time in milliseconds.
+ * snapshot as of the given time in milliseconds on the branch in the scan or main if no branch is
+ * set.
*
* @param timestampMillis a timestamp in milliseconds.
* @return a new scan based on this with the current snapshot at the given time
- * @throws IllegalArgumentException if the snapshot cannot be found
+ * @throws IllegalArgumentException if the snapshot cannot be found or time travel is attempted on
+ * a tag
*/
TableScan asOfTime(long timestampMillis);
diff --git a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
index cd74ea348f..ace7480979 100644
--- a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
@@ -53,6 +53,11 @@ abstract class BaseAllMetadataTableScan extends BaseMetadataTableScan {
throw new UnsupportedOperationException("Cannot select snapshot in table: " + tableType());
}
+ @Override
+ public TableScan useRef(String ref) {
+ throw new UnsupportedOperationException("Cannot select ref in table: " + tableType());
+ }
+
@Override
public TableScan asOfTime(long timestampMillis) {
throw new UnsupportedOperationException("Cannot select snapshot in table: " + tableType());
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 7a751d99f6..d00bcb1a60 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -87,7 +87,7 @@ abstract class BaseTableScan extends BaseScan<TableScan, FileScanTask, CombinedS
@Override
public TableScan useSnapshot(long scanSnapshotId) {
Preconditions.checkArgument(
- snapshotId() == null, "Cannot override snapshot, already set to id=%s", snapshotId());
+ snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId());
Preconditions.checkArgument(
tableOps().current().snapshot(scanSnapshotId) != null,
"Cannot find snapshot with ID %s",
@@ -96,10 +96,20 @@ abstract class BaseTableScan extends BaseScan<TableScan, FileScanTask, CombinedS
tableOps(), table(), tableSchema(), context().useSnapshotId(scanSnapshotId));
}
+ @Override
+ public TableScan useRef(String name) {
+ Preconditions.checkArgument(
+ snapshotId() == null, "Cannot override ref, already set snapshot id=%s", snapshotId());
+ Snapshot snapshot = table().snapshot(name);
+ Preconditions.checkArgument(snapshot != null, "Cannot find ref %s", name);
+ return newRefinedScan(
+ tableOps(), table(), tableSchema(), context().useSnapshotId(snapshot.snapshotId()));
+ }
+
@Override
public TableScan asOfTime(long timestampMillis) {
Preconditions.checkArgument(
- snapshotId() == null, "Cannot override snapshot, already set to id=%s", snapshotId());
+ snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId());
return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis));
}
diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
index 270dfcf595..197679296e 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
@@ -46,6 +46,14 @@ class IncrementalDataTableScan extends DataTableScan {
timestampMillis, context().fromSnapshotId(), context().toSnapshotId()));
}
+ @Override
+ public TableScan useRef(String ref) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot scan table using ref %s: configured for incremental data in snapshots (%s, %s]",
+ ref, context().fromSnapshotId(), context().toSnapshotId()));
+ }
+
@Override
public TableScan useSnapshot(long scanSnapshotId) {
throw new UnsupportedOperationException(
diff --git a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
index 8a0333f97a..4e9f8f04d1 100644
--- a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
@@ -18,8 +18,12 @@
*/
package org.apache.iceberg;
+import java.io.IOException;
import java.util.List;
import java.util.UUID;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Assume;
@@ -86,4 +90,105 @@ public class TestDataTableScan extends ScanTestBase<TableScan, FileScanTask, Com
.withRecordCount(10)
.build();
}
+
+ @Test
+ public void testScanFromBranchTip() throws IOException {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ // Add B and C to new branch
+ table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).toBranch("testBranch").commit();
+ // Add D to main
+ table.newFastAppend().appendFile(FILE_D).commit();
+
+ TableScan testBranchScan = table.newScan().useRef("testBranch");
+ validateExpectedFileScanTasks(
+ testBranchScan, ImmutableList.of(FILE_A.path(), FILE_B.path(), FILE_C.path()));
+
+ TableScan mainScan = table.newScan();
+ validateExpectedFileScanTasks(mainScan, ImmutableList.of(FILE_A.path(), FILE_D.path()));
+ }
+
+ @Test
+ public void testScanFromTag() throws IOException {
+ table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+ table.manageSnapshots().createTag("tagB", table.currentSnapshot().snapshotId()).commit();
+ table.newFastAppend().appendFile(FILE_C).commit();
+ TableScan tagScan = table.newScan().useRef("tagB");
+ validateExpectedFileScanTasks(tagScan, ImmutableList.of(FILE_A.path(), FILE_B.path()));
+ TableScan mainScan = table.newScan();
+ validateExpectedFileScanTasks(
+ mainScan, ImmutableList.of(FILE_A.path(), FILE_B.path(), FILE_C.path()));
+ }
+
+ @Test
+ public void testScanFromRefWhenSnapshotSetFails() {
+ table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+ table.manageSnapshots().createTag("tagB", table.currentSnapshot().snapshotId()).commit();
+
+ AssertHelpers.assertThrows(
+ "Should throw when attempting to use a ref for scanning when a snapshot is set",
+ IllegalArgumentException.class,
+ "Cannot override ref, already set snapshot id=1",
+ () -> table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).useRef("tagB"));
+ }
+
+ @Test
+ public void testSettingSnapshotWhenRefSetFails() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ Snapshot snapshotA = table.currentSnapshot();
+ table.newFastAppend().appendFile(FILE_B).commit();
+ table.manageSnapshots().createTag("tagB", table.currentSnapshot().snapshotId()).commit();
+
+ AssertHelpers.assertThrows(
+ "Should throw when attempting to use a snapshot for scanning when a ref is set",
+ IllegalArgumentException.class,
+ "Cannot override snapshot, already set snapshot id=2",
+ () -> table.newScan().useRef("tagB").useSnapshot(snapshotA.snapshotId()));
+ }
+
+ @Test
+ public void testBranchTimeTravelFails() {
+ table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+ table
+ .manageSnapshots()
+ .createBranch("testBranch", table.currentSnapshot().snapshotId())
+ .commit();
+ AssertHelpers.assertThrows(
+ "Should throw when attempting to use a snapshot for scanning when a ref is set",
+ IllegalArgumentException.class,
+ "Cannot override snapshot, already set snapshot id=1",
+ () -> table.newScan().useRef("testBranch").asOfTime(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testSettingMultipleRefsFails() {
+ table.newFastAppend().appendFile(FILE_A).commit();
+ table.manageSnapshots().createTag("tagA", table.currentSnapshot().snapshotId()).commit();
+ table.newFastAppend().appendFile(FILE_B).commit();
+ table.manageSnapshots().createTag("tagB", table.currentSnapshot().snapshotId()).commit();
+
+ AssertHelpers.assertThrows(
+ "Should throw when attempting to use multiple refs",
+ IllegalArgumentException.class,
+ "Cannot override ref, already set snapshot id=2",
+ () -> table.newScan().useRef("tagB").useRef("tagA"));
+ }
+
+ @Test
+ public void testSettingInvalidRefFails() {
+ AssertHelpers.assertThrows(
+ "Should throw when attempting to use an invalid ref for scanning",
+ IllegalArgumentException.class,
+ "Cannot find ref nonexisting",
+ () -> table.newScan().useRef("nonexisting"));
+ }
+
+ private void validateExpectedFileScanTasks(
+ TableScan scan, List<CharSequence> expectedFileScanPaths) throws IOException {
+ try (CloseableIterable<FileScanTask> scanTasks = scan.planFiles()) {
+ Assert.assertEquals(expectedFileScanPaths.size(), Iterables.size(scanTasks));
+ List<CharSequence> actualFiles = Lists.newArrayList();
+ scanTasks.forEach(task -> actualFiles.add(task.file().path()));
+ Assert.assertTrue(actualFiles.containsAll(expectedFileScanPaths));
+ }
+ }
}