You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/10/03 15:34:53 UTC

[iceberg] branch master updated: API, Core: Support scanning tags and branches (#5364)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b8a103860 API, Core: Support scanning tags and branches (#5364)
8b8a103860 is described below

commit 8b8a1038609610873c7d40023b4fc98ba341a502
Author: Amogh Jahagirdar <ja...@amazon.com>
AuthorDate: Mon Oct 3 08:34:47 2022 -0700

    API, Core: Support scanning tags and branches (#5364)
---
 .palantir/revapi.yml                               |   3 +
 .../main/java/org/apache/iceberg/TableScan.java    |  16 +++-
 .../apache/iceberg/BaseAllMetadataTableScan.java   |   5 +
 .../java/org/apache/iceberg/BaseTableScan.java     |  14 ++-
 .../apache/iceberg/IncrementalDataTableScan.java   |   8 ++
 .../java/org/apache/iceberg/TestDataTableScan.java | 105 +++++++++++++++++++++
 6 files changed, 147 insertions(+), 4 deletions(-)

diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index d6f09d177d..ffbcbeee15 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -40,6 +40,9 @@ acceptedBreaks:
     - code: "java.method.removed"
       old: "method org.apache.iceberg.RowDelta org.apache.iceberg.RowDelta::validateNoConflictingAppends(org.apache.iceberg.expressions.Expression)"
       justification: "Deprecations for 1.0 release"
+    - code: "java.method.addedToInterface"
+      new: "method org.apache.iceberg.TableScan org.apache.iceberg.TableScan::useRef(java.lang.String)"
+      justification: "Adding table scan APIs to support scanning from refs"
   release-base-0.13.0:
     org.apache.iceberg:iceberg-api:
     - code: "java.class.defaultSerializationChanged"
diff --git a/api/src/main/java/org/apache/iceberg/TableScan.java b/api/src/main/java/org/apache/iceberg/TableScan.java
index 5f55b82d96..ebff7ad51b 100644
--- a/api/src/main/java/org/apache/iceberg/TableScan.java
+++ b/api/src/main/java/org/apache/iceberg/TableScan.java
@@ -39,13 +39,25 @@ public interface TableScan extends Scan<TableScan, FileScanTask, CombinedScanTas
    */
   TableScan useSnapshot(long snapshotId);
 
+  /**
+   * Create a new {@link TableScan} from this scan's configuration that will use the given
+   * reference.
+   *
+   * @param ref reference
+   * @return a new scan based on the given reference.
+   * @throws IllegalArgumentException if a reference with the given name could not be found
+   */
+  TableScan useRef(String ref);
+
   /**
    * Create a new {@link TableScan} from this scan's configuration that will use the most recent
-   * snapshot as of the given time in milliseconds.
+   * snapshot as of the given time in milliseconds on the branch in the scan or main if no branch is
+   * set.
    *
    * @param timestampMillis a timestamp in milliseconds.
    * @return a new scan based on this with the current snapshot at the given time
-   * @throws IllegalArgumentException if the snapshot cannot be found
+   * @throws IllegalArgumentException if the snapshot cannot be found or time travel is attempted on
+   *     a tag
    */
   TableScan asOfTime(long timestampMillis);
 
diff --git a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
index cd74ea348f..ace7480979 100644
--- a/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseAllMetadataTableScan.java
@@ -53,6 +53,11 @@ abstract class BaseAllMetadataTableScan extends BaseMetadataTableScan {
     throw new UnsupportedOperationException("Cannot select snapshot in table: " + tableType());
   }
 
+  @Override
+  public TableScan useRef(String ref) {
+    throw new UnsupportedOperationException("Cannot select ref in table: " + tableType());
+  }
+
   @Override
   public TableScan asOfTime(long timestampMillis) {
     throw new UnsupportedOperationException("Cannot select snapshot in table: " + tableType());
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index 7a751d99f6..d00bcb1a60 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -87,7 +87,7 @@ abstract class BaseTableScan extends BaseScan<TableScan, FileScanTask, CombinedS
   @Override
   public TableScan useSnapshot(long scanSnapshotId) {
     Preconditions.checkArgument(
-        snapshotId() == null, "Cannot override snapshot, already set to id=%s", snapshotId());
+        snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId());
     Preconditions.checkArgument(
         tableOps().current().snapshot(scanSnapshotId) != null,
         "Cannot find snapshot with ID %s",
@@ -96,10 +96,20 @@ abstract class BaseTableScan extends BaseScan<TableScan, FileScanTask, CombinedS
         tableOps(), table(), tableSchema(), context().useSnapshotId(scanSnapshotId));
   }
 
+  @Override
+  public TableScan useRef(String name) {
+    Preconditions.checkArgument(
+        snapshotId() == null, "Cannot override ref, already set snapshot id=%s", snapshotId());
+    Snapshot snapshot = table().snapshot(name);
+    Preconditions.checkArgument(snapshot != null, "Cannot find ref %s", name);
+    return newRefinedScan(
+        tableOps(), table(), tableSchema(), context().useSnapshotId(snapshot.snapshotId()));
+  }
+
   @Override
   public TableScan asOfTime(long timestampMillis) {
     Preconditions.checkArgument(
-        snapshotId() == null, "Cannot override snapshot, already set to id=%s", snapshotId());
+        snapshotId() == null, "Cannot override snapshot, already set snapshot id=%s", snapshotId());
 
     return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis));
   }
diff --git a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
index 270dfcf595..197679296e 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
@@ -46,6 +46,14 @@ class IncrementalDataTableScan extends DataTableScan {
             timestampMillis, context().fromSnapshotId(), context().toSnapshotId()));
   }
 
+  @Override
+  public TableScan useRef(String ref) {
+    throw new UnsupportedOperationException(
+        String.format(
+            "Cannot scan table using ref %s: configured for incremental data in snapshots (%s, %s]",
+            ref, context().fromSnapshotId(), context().toSnapshotId()));
+  }
+
   @Override
   public TableScan useSnapshot(long scanSnapshotId) {
     throw new UnsupportedOperationException(
diff --git a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
index 8a0333f97a..4e9f8f04d1 100644
--- a/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
+++ b/core/src/test/java/org/apache/iceberg/TestDataTableScan.java
@@ -18,8 +18,12 @@
  */
 package org.apache.iceberg;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.UUID;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -86,4 +90,105 @@ public class TestDataTableScan extends ScanTestBase<TableScan, FileScanTask, Com
         .withRecordCount(10)
         .build();
   }
+
+  @Test
+  public void testScanFromBranchTip() throws IOException {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    // Add B and C to new branch
+    table.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).toBranch("testBranch").commit();
+    // Add D to main
+    table.newFastAppend().appendFile(FILE_D).commit();
+
+    TableScan testBranchScan = table.newScan().useRef("testBranch");
+    validateExpectedFileScanTasks(
+        testBranchScan, ImmutableList.of(FILE_A.path(), FILE_B.path(), FILE_C.path()));
+
+    TableScan mainScan = table.newScan();
+    validateExpectedFileScanTasks(mainScan, ImmutableList.of(FILE_A.path(), FILE_D.path()));
+  }
+
+  @Test
+  public void testScanFromTag() throws IOException {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    table.manageSnapshots().createTag("tagB", table.currentSnapshot().snapshotId()).commit();
+    table.newFastAppend().appendFile(FILE_C).commit();
+    TableScan tagScan = table.newScan().useRef("tagB");
+    validateExpectedFileScanTasks(tagScan, ImmutableList.of(FILE_A.path(), FILE_B.path()));
+    TableScan mainScan = table.newScan();
+    validateExpectedFileScanTasks(
+        mainScan, ImmutableList.of(FILE_A.path(), FILE_B.path(), FILE_C.path()));
+  }
+
+  @Test
+  public void testScanFromRefWhenSnapshotSetFails() {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    table.manageSnapshots().createTag("tagB", table.currentSnapshot().snapshotId()).commit();
+
+    AssertHelpers.assertThrows(
+        "Should throw when attempting to use a ref for scanning when a snapshot is set",
+        IllegalArgumentException.class,
+        "Cannot override ref, already set snapshot id=1",
+        () -> table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).useRef("tagB"));
+  }
+
+  @Test
+  public void testSettingSnapshotWhenRefSetFails() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    Snapshot snapshotA = table.currentSnapshot();
+    table.newFastAppend().appendFile(FILE_B).commit();
+    table.manageSnapshots().createTag("tagB", table.currentSnapshot().snapshotId()).commit();
+
+    AssertHelpers.assertThrows(
+        "Should throw when attempting to use a snapshot for scanning when a ref is set",
+        IllegalArgumentException.class,
+        "Cannot override snapshot, already set snapshot id=2",
+        () -> table.newScan().useRef("tagB").useSnapshot(snapshotA.snapshotId()));
+  }
+
+  @Test
+  public void testBranchTimeTravelFails() {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    table
+        .manageSnapshots()
+        .createBranch("testBranch", table.currentSnapshot().snapshotId())
+        .commit();
+    AssertHelpers.assertThrows(
+        "Should throw when attempting to use a snapshot for scanning when a ref is set",
+        IllegalArgumentException.class,
+        "Cannot override snapshot, already set snapshot id=1",
+        () -> table.newScan().useRef("testBranch").asOfTime(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testSettingMultipleRefsFails() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    table.manageSnapshots().createTag("tagA", table.currentSnapshot().snapshotId()).commit();
+    table.newFastAppend().appendFile(FILE_B).commit();
+    table.manageSnapshots().createTag("tagB", table.currentSnapshot().snapshotId()).commit();
+
+    AssertHelpers.assertThrows(
+        "Should throw when attempting to use multiple refs",
+        IllegalArgumentException.class,
+        "Cannot override ref, already set snapshot id=2",
+        () -> table.newScan().useRef("tagB").useRef("tagA"));
+  }
+
+  @Test
+  public void testSettingInvalidRefFails() {
+    AssertHelpers.assertThrows(
+        "Should throw when attempting to use an invalid ref for scanning",
+        IllegalArgumentException.class,
+        "Cannot find ref nonexisting",
+        () -> table.newScan().useRef("nonexisting"));
+  }
+
+  private void validateExpectedFileScanTasks(
+      TableScan scan, List<CharSequence> expectedFileScanPaths) throws IOException {
+    try (CloseableIterable<FileScanTask> scanTasks = scan.planFiles()) {
+      Assert.assertEquals(expectedFileScanPaths.size(), Iterables.size(scanTasks));
+      List<CharSequence> actualFiles = Lists.newArrayList();
+      scanTasks.forEach(task -> actualFiles.add(task.file().path()));
+      Assert.assertTrue(actualFiles.containsAll(expectedFileScanPaths));
+    }
+  }
 }