You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/10/19 07:13:33 UTC

[GitHub] [iceberg] nastra commented on a diff in pull request #5984: Core, API: Support incremental scanning with branch

nastra commented on code in PR #5984:
URL: https://github.com/apache/iceberg/pull/5984#discussion_r999021646


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -35,6 +35,26 @@ protected BaseIncrementalScan(
   protected abstract CloseableIterable<T> doPlanFiles(
       Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
 
+  @Override
+  public ThisT fromSnapshotInclusive(long fromSnapshotId, String branch) {
+    Preconditions.checkArgument(
+        table().snapshot(fromSnapshotId) != null,
+        "Cannot find the starting snapshot: %s",
+        fromSnapshotId);
+
+    Snapshot branchSnapshot = table().snapshot(branch);
+    Preconditions.checkArgument(branchSnapshot != null, "Cannot find the branch: %s", branch);

Review Comment:
   in `BaseTableScan` we say `Cannot find ref %` so it would be good to align this part here as well. Also it would be good to use `ref` or `referenceName` rather than `branch` across parameters/variable names, since that is more precise imo



##########
api/src/main/java/org/apache/iceberg/IncrementalScan.java:
##########
@@ -33,6 +49,21 @@
    */
   ThisT fromSnapshotInclusive(long fromSnapshotId);
 
+  /**
+   * Instructs this scan to look for changes starting from a particular snapshot (exclusive).
+   *
+   * <p>If the start snapshot is not configured, it is defaulted to the oldest ancestor of the end
+   * snapshot (inclusive).
+   *
+   * @param fromSnapshotId the start snapshot ID (exclusive)
+   * @param branch the ref used
+   * @return this for method chaining
+   * @throws IllegalArgumentException if the start snapshot is not an ancestor of the end snapshot
+   */
+  default ThisT fromSnapshotExclusive(long fromSnapshotId, String branch) {
+    throw new UnsupportedOperationException("Unsupported starting from the specified reference.");

Review Comment:
   ```suggestion
       throw new UnsupportedOperationException("Starting from the specified reference is not supported.");
   ```



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -35,6 +35,26 @@ protected BaseIncrementalScan(
   protected abstract CloseableIterable<T> doPlanFiles(
       Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
 
+  @Override
+  public ThisT fromSnapshotInclusive(long fromSnapshotId, String branch) {
+    Preconditions.checkArgument(
+        table().snapshot(fromSnapshotId) != null,
+        "Cannot find the starting snapshot: %s",
+        fromSnapshotId);
+
+    Snapshot branchSnapshot = table().snapshot(branch);
+    Preconditions.checkArgument(branchSnapshot != null, "Cannot find the branch: %s", branch);
+    Preconditions.checkArgument(
+        SnapshotUtil.isAncestorOf(table(), branchSnapshot.snapshotId(), fromSnapshotId),
+        "Starting snapshot (inclusive) %s is not a parent ancestor of branch: %s",

Review Comment:
   ```suggestion
           "Starting snapshot (inclusive) %s is not a parent ancestor of ref: %s",
   ```



##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java:
##########
@@ -50,6 +50,42 @@ public void testFromSnapshotInclusive() {
     Assert.assertEquals(3, Iterables.size(scanWithToSnapshot.planFiles()));
   }
 
+  @Test
+  public void testFromSnapshotInclusiveWithBranch() {

Review Comment:
   it would also be good to add tests that hit the `Cannot find the starting snapshot` / `Cannot find ref...` error messages



##########
api/src/main/java/org/apache/iceberg/IncrementalScan.java:
##########
@@ -21,6 +21,22 @@
 /** API for configuring an incremental scan. */
 public interface IncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
     extends Scan<ThisT, T, G> {
+
+  /**
+   * Instructs this scan to look for changes starting from a particular snapshot (inclusive).
+   *
+   * <p>If the start snapshot is not configured, it is defaulted to the oldest ancestor of the end
+   * snapshot (inclusive).
+   *
+   * @param fromSnapshotId the start snapshot ID (inclusive)
+   * @param branch the ref used
+   * @return this for method chaining
+   * @throws IllegalArgumentException if the start snapshot is not an ancestor of the end snapshot
+   */
+  default ThisT fromSnapshotInclusive(long fromSnapshotId, String branch) {
+    throw new UnsupportedOperationException("Unsupported starting from the specified reference.");

Review Comment:
   ```suggestion
       throw new UnsupportedOperationException("Starting from the specified reference is not supported.");
   ```



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -35,6 +35,26 @@ protected BaseIncrementalScan(
   protected abstract CloseableIterable<T> doPlanFiles(
       Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
 
+  @Override
+  public ThisT fromSnapshotInclusive(long fromSnapshotId, String branch) {

Review Comment:
   ```suggestion
     public ThisT fromSnapshotInclusive(long fromSnapshotId, String referenceName) {
   ```



##########
api/src/main/java/org/apache/iceberg/IncrementalScan.java:
##########
@@ -33,6 +49,21 @@
    */
   ThisT fromSnapshotInclusive(long fromSnapshotId);
 
+  /**
+   * Instructs this scan to look for changes starting from a particular snapshot (exclusive).
+   *
+   * <p>If the start snapshot is not configured, it is defaulted to the oldest ancestor of the end
+   * snapshot (inclusive).
+   *
+   * @param fromSnapshotId the start snapshot ID (exclusive)
+   * @param branch the ref used
+   * @return this for method chaining
+   * @throws IllegalArgumentException if the start snapshot is not an ancestor of the end snapshot
+   */
+  default ThisT fromSnapshotExclusive(long fromSnapshotId, String branch) {

Review Comment:
   ```suggestion
     default ThisT fromSnapshotExclusive(long fromSnapshotId, String referenceName) {
   ```



##########
core/src/main/java/org/apache/iceberg/TableScanContext.java:
##########
@@ -60,6 +61,7 @@ final class TableScanContext {
     this.planExecutor = null;
     this.fromSnapshotInclusive = false;
     this.metricsReporter = new LoggingMetricsReporter();
+    this.branch = null;

Review Comment:
   I think this should rather be `ref` or `referenceName`. Also I think it would be great to get #5982 / #5985 in before this PR



##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalAppendScan.java:
##########
@@ -67,6 +103,35 @@ public void testFromSnapshotExclusive() {
     Assert.assertEquals(1, Iterables.size(scanWithToSnapshot.planFiles()));
   }
 
+  @Test
+  public void testFromSnapshotExclusiveWithBranch() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+    long snapshotAId = table.currentSnapshot().snapshotId();
+
+    String branchName = "b1";
+    table.manageSnapshots().createBranch(branchName, snapshotAId).commit();
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+    long snapshotBId = table.currentSnapshot().snapshotId();
+    String branchName2 = "b2";
+    table.manageSnapshots().createBranch(branchName2, snapshotBId).commit();
+
+    table.newFastAppend().appendFile(FILE_C).toBranch(branchName).commit();
+    long snapshotCId = table.snapshot(branchName).snapshotId();
+
+    IncrementalAppendScan scan = newScan().fromSnapshotExclusive(snapshotAId, branchName);
+    Assert.assertEquals(1, Iterables.size(scan.planFiles()));
+
+    IncrementalAppendScan scanWithToSnapshot =
+        newScan().fromSnapshotExclusive(snapshotAId, branchName).toSnapshot(snapshotBId);
+    Assert.assertEquals(1, Iterables.size(scanWithToSnapshot.planFiles()));
+
+    AssertHelpers.assertThrows(
+        "Should throw exception",
+        NullPointerException.class,

Review Comment:
   I think it would be good to always check against an expected error message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org