You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/07/29 02:18:10 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request, #5382: Core: Implement IncrementalChangelogScan without deletes

aokolnychyi opened a new pull request, #5382:
URL: https://github.com/apache/iceberg/pull/5382

   This PR implements `IncrementalChangelogScan` without support for delete files.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933705197


##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestBaseIncrementalChangelogScan
+    extends ScanTestBase<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>> {
+
+  public TestBaseIncrementalChangelogScan(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newScan() {
+    return table.newIncrementalChangelogScan();
+  }
+
+  @Test
+  public void testDataFilters() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+    ManifestFile firstSnapshotDataManifest =
+        Iterables.getOnlyElement(firstSnapshot.dataManifests(table.io()));
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    Assert.assertEquals(
+        "Must be 2 data manifests", 2, secondSnapshot.dataManifests(table.io()).size());
+
+    withUnavailableLocations(
+        ImmutableList.of(firstSnapshotDataManifest.path()),
+        () -> {
+          // bucket(k, 16) is 1 which is supposed to match only FILE_B
+          IncrementalChangelogScan scan = newScan().filter(Expressions.equal("data", "k"));
+
+          List<ChangelogScanTask> tasks = plan(scan);
+
+          Assert.assertEquals("Must have 1 task", 1, tasks.size());
+
+          AddedRowsScanTask task = (AddedRowsScanTask) Iterables.getOnlyElement(tasks);
+          Assert.assertEquals("Ordinal must match", 1, task.changeOrdinal());
+          Assert.assertEquals(
+              "Snapshot must match", secondSnapshot.snapshotId(), task.commitSnapshotId());
+          Assert.assertEquals("Data file must match", FILE_B.path(), task.file().path());
+          Assert.assertTrue("Must be no deletes", task.deletes().isEmpty());
+        });
+  }
+
+  @Test
+  public void testOverwrites() {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    table.newOverwrite().addFile(FILE_A2).deleteFile(FILE_A).commit();
+
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    IncrementalChangelogScan scan =
+        newScan()
+            .fromSnapshotExclusive(firstSnapshot.snapshotId())
+            .toSnapshot(secondSnapshot.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 2 tasks", 2, tasks.size());
+
+    AddedRowsScanTask addedRowsTask = (AddedRowsScanTask) tasks.get(0);
+    Assert.assertEquals("Ordinal must match", 0, addedRowsTask.changeOrdinal());
+    Assert.assertEquals(
+        "Snapshot must match", secondSnapshot.snapshotId(), addedRowsTask.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A2.path(), addedRowsTask.file().path());
+    Assert.assertTrue("Must be no deletes", addedRowsTask.deletes().isEmpty());
+
+    DeletedDataFileScanTask deletedDataFileTask = (DeletedDataFileScanTask) tasks.get(1);
+    Assert.assertEquals("Ordinal must match", 0, deletedDataFileTask.changeOrdinal());
+    Assert.assertEquals(
+        "Snapshot must match", secondSnapshot.snapshotId(), deletedDataFileTask.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), deletedDataFileTask.file().path());
+    Assert.assertTrue("Must be no deletes", deletedDataFileTask.existingDeletes().isEmpty());
+  }
+
+  @Test
+  public void testFileDeletes() {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    table.newDelete().deleteFile(FILE_A).commit();
+
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    IncrementalChangelogScan scan =
+        newScan()
+            .fromSnapshotExclusive(firstSnapshot.snapshotId())
+            .toSnapshot(secondSnapshot.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 1 tasks", 1, tasks.size());
+
+    DeletedDataFileScanTask deletedDataFileTask =
+        (DeletedDataFileScanTask) Iterables.getOnlyElement(tasks);
+    Assert.assertEquals("Ordinal must match", 0, deletedDataFileTask.changeOrdinal());
+    Assert.assertEquals(
+        "Snapshot must match", secondSnapshot.snapshotId(), deletedDataFileTask.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), deletedDataFileTask.file().path());
+    Assert.assertTrue("Must be no deletes", deletedDataFileTask.existingDeletes().isEmpty());
+  }
+
+  @Test
+  public void testExistingEntriesInNewDataManifestsAreIgnored() {
+    table
+        .updateProperties()
+        .set(MANIFEST_MIN_MERGE_COUNT, "1")
+        .set(MANIFEST_MERGE_ENABLED, "true")
+        .commit();
+
+    table.newAppend().appendFile(FILE_A).commit();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    table.newAppend().appendFile(FILE_C).commit();
+
+    Snapshot thirdSnapshot = table.currentSnapshot();
+
+    ManifestFile manifest = Iterables.getOnlyElement(thirdSnapshot.dataManifests(table.io()));
+    Assert.assertTrue("Manifest must have existing files", manifest.hasExistingFiles());
+
+    IncrementalChangelogScan scan =
+        newScan()
+            .fromSnapshotInclusive(thirdSnapshot.snapshotId())
+            .toSnapshot(thirdSnapshot.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 1 task", 1, tasks.size());
+
+    AddedRowsScanTask task = (AddedRowsScanTask) Iterables.getOnlyElement(tasks);
+    Assert.assertEquals("Ordinal must match", 0, task.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", thirdSnapshot.snapshotId(), task.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_C.path(), task.file().path());
+    Assert.assertTrue("Must be no deletes", task.deletes().isEmpty());
+  }
+
+  @Test
+  public void testManifestRewritesAreIgnored() throws IOException {
+    table.newAppend().appendFile(FILE_A).commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    ManifestFile newManifest =
+        writeManifest(
+            "manifest-file.avro",
+            manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A),
+            manifestEntry(ManifestEntry.Status.EXISTING, secondSnapshot.snapshotId(), FILE_B));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+
+    for (ManifestFile manifest : secondSnapshot.dataManifests(table.io())) {
+      rewriteManifests.deleteManifest(manifest);
+    }
+
+    rewriteManifests.addManifest(newManifest);
+
+    rewriteManifests.commit();
+
+    table.newAppend().appendFile(FILE_C).commit();
+
+    Snapshot fourthSnapshot = table.currentSnapshot();
+
+    List<ChangelogScanTask> tasks = plan(newScan());
+
+    Assert.assertEquals("Must have 3 tasks", 3, tasks.size());
+
+    AddedRowsScanTask firstTask = (AddedRowsScanTask) tasks.get(0);

Review Comment:
   I should probably reconsider names here. Maybe, `task1` would be better? Then lines below would fit on one line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] iflytek-hmwang5 commented on pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
iflytek-hmwang5 commented on PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#issuecomment-1216041786

   Hi, If this PR don't support deleted rows, whether overwrite cases can't be supported?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937174756


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =

Review Comment:
   Yea , or maybe just clarify the variable or method, that this is in order ascending (by timestamp), and that's what the CreateDataFileChangeTasks expects.
   
   Or how about just create the ordinals in this method, and have CreateDataFileChangeTasks take in the ordinals?
   
   I guess the whole goal for me, if possible, would be to make it easier to see that that the ordinals are in order ascending without having to jump around different methods/ inner classes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937121997


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  private Deque<Snapshot> changelogSnapshots(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot :
+        SnapshotUtil.ancestorsBetween(table(), toSnapshotIdInclusive, fromSnapshotIdExclusive)) {
+
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {
+        if (snapshot.deleteManifests(table().io()).size() > 0) {
+          throw new UnsupportedOperationException(
+              "Delete files are currently not supported in changelog scans");
+        }
+
+        // insert at the beginning to have old snapshots first
+        changelogSnapshots.addFirst(snapshot);
+      }
+    }
+
+    return changelogSnapshots;
+  }
+
+  private static Map<Long, Integer> computeSnapshotOrdinals(Iterable<Snapshot> snapshots) {

Review Comment:
   I am going to add another function for supporting V2 deletes. That's why I kept it outside.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r938320211


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        orderedChangelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  // builds a collection of changelog snapshots (oldest to newest)
+  // the order of the snapshots is important as it is used to determine change ordinals
+  private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl) {
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {
+        if (snapshot.deleteManifests(table().io()).size() > 0) {
+          throw new UnsupportedOperationException(
+              "Delete files are currently not supported in changelog scans");
+        }
+
+        changelogSnapshots.addFirst(snapshot);
+      }
+    }
+
+    return changelogSnapshots;
+  }
+
+  private Set<Long> toSnapshotIds(Collection<Snapshot> snapshots) {
+    return snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+  }
+
+  private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapshots) {

Review Comment:
   Correct, using 0-based ordinals instead of seq numbers. That's why we call it `changeOrdinal` in the API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r936119427


##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestBaseIncrementalChangelogScan
+    extends ScanTestBase<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>> {
+
+  public TestBaseIncrementalChangelogScan(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newScan() {
+    return table.newIncrementalChangelogScan();
+  }
+
+  @Test
+  public void testDataFilters() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+    ManifestFile snap1DataManifest = Iterables.getOnlyElement(snap1.dataManifests(table.io()));
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    Assert.assertEquals("Must be 2 data manifests", 2, snap2.dataManifests(table.io()).size());
+
+    withUnavailableLocations(

Review Comment:
   Question, while this is interesting, is it overkill for testing a data filter?  What are we trying to test, that even if the table is corrupt that CDC scan is working fine?  



##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestBaseIncrementalChangelogScan
+    extends ScanTestBase<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>> {
+
+  public TestBaseIncrementalChangelogScan(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newScan() {
+    return table.newIncrementalChangelogScan();
+  }
+
+  @Test
+  public void testDataFilters() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+    ManifestFile snap1DataManifest = Iterables.getOnlyElement(snap1.dataManifests(table.io()));
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    Assert.assertEquals("Must be 2 data manifests", 2, snap2.dataManifests(table.io()).size());
+
+    withUnavailableLocations(
+        ImmutableList.of(snap1DataManifest.path()),
+        () -> {
+          // bucket(k, 16) is 1 which is supposed to match only FILE_B
+          IncrementalChangelogScan scan = newScan().filter(Expressions.equal("data", "k"));
+
+          List<ChangelogScanTask> tasks = plan(scan);
+
+          Assert.assertEquals("Must have 1 task", 1, tasks.size());
+
+          AddedRowsScanTask t1 = (AddedRowsScanTask) Iterables.getOnlyElement(tasks);
+          Assert.assertEquals("Ordinal must match", 1, t1.changeOrdinal());
+          Assert.assertEquals("Snapshot must match", snap2.snapshotId(), t1.commitSnapshotId());
+          Assert.assertEquals("Data file must match", FILE_B.path(), t1.file().path());
+          Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+        });
+  }
+
+  @Test
+  public void testOverwrites() {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    table.newOverwrite().addFile(FILE_A2).deleteFile(FILE_A).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    IncrementalChangelogScan scan =
+        newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 2 tasks", 2, tasks.size());
+
+    AddedRowsScanTask t1 = (AddedRowsScanTask) tasks.get(0);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A2.path(), t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+
+    DeletedDataFileScanTask t2 = (DeletedDataFileScanTask) tasks.get(1);
+    Assert.assertEquals("Ordinal must match", 0, t2.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), t2.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), t2.file().path());
+    Assert.assertTrue("Must be no deletes", t2.existingDeletes().isEmpty());
+  }
+
+  @Test
+  public void testFileDeletes() {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    table.newDelete().deleteFile(FILE_A).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    IncrementalChangelogScan scan =
+        newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 1 tasks", 1, tasks.size());
+
+    DeletedDataFileScanTask t1 = (DeletedDataFileScanTask) Iterables.getOnlyElement(tasks);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.existingDeletes().isEmpty());
+  }
+
+  @Test
+  public void testExistingEntriesInNewDataManifestsAreIgnored() {
+    table
+        .updateProperties()
+        .set(MANIFEST_MIN_MERGE_COUNT, "1")
+        .set(MANIFEST_MERGE_ENABLED, "true")
+        .commit();
+
+    table.newAppend().appendFile(FILE_A).commit();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    table.newAppend().appendFile(FILE_C).commit();
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    ManifestFile manifest = Iterables.getOnlyElement(snap3.dataManifests(table.io()));
+    Assert.assertTrue("Manifest must have existing files", manifest.hasExistingFiles());
+
+    IncrementalChangelogScan scan =
+        newScan().fromSnapshotInclusive(snap3.snapshotId()).toSnapshot(snap3.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 1 task", 1, tasks.size());
+
+    AddedRowsScanTask t1 = (AddedRowsScanTask) Iterables.getOnlyElement(tasks);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap3.snapshotId(), t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_C.path(), t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+  }
+
+  @Test
+  public void testManifestRewritesAreIgnored() throws IOException {
+    table.newAppend().appendFile(FILE_A).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    ManifestFile newManifest =
+        writeManifest(
+            "manifest-file.avro",
+            manifestEntry(ManifestEntry.Status.EXISTING, snap1.snapshotId(), FILE_A),
+            manifestEntry(ManifestEntry.Status.EXISTING, snap2.snapshotId(), FILE_B));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+
+    for (ManifestFile manifest : snap2.dataManifests(table.io())) {
+      rewriteManifests.deleteManifest(manifest);
+    }
+
+    rewriteManifests.addManifest(newManifest);
+
+    rewriteManifests.commit();
+
+    table.newAppend().appendFile(FILE_C).commit();
+
+    Snapshot snap4 = table.currentSnapshot();
+
+    List<ChangelogScanTask> tasks = plan(newScan());
+
+    Assert.assertEquals("Must have 3 tasks", 3, tasks.size());
+
+    AddedRowsScanTask t1 = (AddedRowsScanTask) tasks.get(0);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap1.snapshotId(), t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+
+    AddedRowsScanTask t2 = (AddedRowsScanTask) tasks.get(1);
+    Assert.assertEquals("Ordinal must match", 1, t2.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), t2.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_B.path(), t2.file().path());
+    Assert.assertTrue("Must be no deletes", t2.deletes().isEmpty());
+
+    AddedRowsScanTask t3 = (AddedRowsScanTask) tasks.get(2);
+    Assert.assertEquals("Ordinal must match", 2, t3.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap4.snapshotId(), t3.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_C.path(), t3.file().path());
+    Assert.assertTrue("Must be no deletes", t3.deletes().isEmpty());
+  }
+
+  @Test
+  public void testDataFileRewrites() {
+    table.newAppend().appendFile(FILE_A).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A2)).commit();
+
+    List<ChangelogScanTask> tasks = plan(newScan());
+
+    Assert.assertEquals("Must have 2 tasks", 2, tasks.size());
+
+    AddedRowsScanTask t1 = (AddedRowsScanTask) tasks.get(0);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap1.snapshotId(), t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+
+    AddedRowsScanTask t2 = (AddedRowsScanTask) tasks.get(1);
+    Assert.assertEquals("Ordinal must match", 1, t2.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), t2.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_B.path(), t2.file().path());
+    Assert.assertTrue("Must be no deletes", t2.deletes().isEmpty());
+  }
+
+  @Test
+  public void testDeleteFilesAreNotSupported() {
+    Assume.assumeTrue(formatVersion == 2);
+
+    table.newFastAppend().appendFile(FILE_A2).appendFile(FILE_B).commit();
+
+    table.newRowDelta().addDeletes(FILE_A2_DELETES).commit();
+
+    AssertHelpers.assertThrows(
+        "Should complain about delete files",
+        UnsupportedOperationException.class,
+        "Delete files are currently not supported",
+        () -> plan(newScan()));
+  }
+
+  // plans tasks and reorders them to have deterministic order
+  private List<ChangelogScanTask> plan(IncrementalChangelogScan scan) {
+    try (CloseableIterable<ChangelogScanTask> tasks = scan.planFiles()) {
+      List<ChangelogScanTask> tasksAsList = Lists.newArrayList(tasks);
+
+      tasksAsList.sort(
+          (t1, t2) -> {
+            int res = t1.changeOrdinal() - t2.changeOrdinal();

Review Comment:
   I think Guava ComparisonChain may help here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r936120343


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  private Deque<Snapshot> changelogSnapshots(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot :
+        SnapshotUtil.ancestorsBetween(table(), toSnapshotIdInclusive, fromSnapshotIdExclusive)) {
+
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {

Review Comment:
   I guess REPLACE is always rewrite manifest, or rewrite data file, so no CDC log will be generated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r938417349


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -138,6 +138,10 @@ public static Snapshot oldestAncestorOf(long snapshotId, Function<Long, Snapshot
     return lastSnapshot;
   }
 
+  public static Snapshot oldestAncestorOf(Table table, long snapshotId) {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933924218


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changeSnapshots =

Review Comment:
   That is ok, thx anton~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937082307


##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -137,6 +144,13 @@
    */
   CloseableIterable<G> planTasks();
 
+  /**
+   * Returns whether this scan is case-sensitive with respect to column names.
+   *
+   * @return true if case-sensitive, false otherwise.
+   */
+  boolean isCaseSensitive();

Review Comment:
   Sounds like a good idea. Let me move it here and in `BaseScan`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933704533


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>

Review Comment:
   I think it is because of the new line length limit. Such changes are really unfortunate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933705002


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -20,17 +20,48 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import org.apache.iceberg.expressions.Binder;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.PropertyUtil;
 
 abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
     implements Scan<ThisT, T, G> {
+
+  private static final ImmutableList<String> SCAN_COLUMNS =

Review Comment:
   I moved these vars here as they are used in a lot of places, not just in `DataTableScan`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r934664585


##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -137,6 +144,14 @@
    */
   CloseableIterable<G> planTasks();
 
+  /**
+   * Returns whether this scan should apply column name case sensitiveness as per {@link
+   * Scan#caseSensitive(boolean)}.

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#issuecomment-1206020974

   > Are we going to support the filter on the CDC type? For example, users are only interested in deleted rows.
   
   Yeah, we can definitely add this in the future once a use case comes along.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937188243


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  private Deque<Snapshot> changelogSnapshots(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot :
+        SnapshotUtil.ancestorsBetween(table(), toSnapshotIdInclusive, fromSnapshotIdExclusive)) {
+
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {
+        if (snapshot.deleteManifests(table().io()).size() > 0) {
+          throw new UnsupportedOperationException(
+              "Delete files are currently not supported in changelog scans");
+        }
+
+        // insert at the beginning to have old snapshots first
+        changelogSnapshots.addFirst(snapshot);
+      }
+    }
+
+    return changelogSnapshots;
+  }
+
+  private static Map<Long, Integer> computeSnapshotOrdinals(Iterable<Snapshot> snapshots) {
+    Map<Long, Integer> snapshotOrdinals = Maps.newHashMap();
+
+    int ordinal = 0;
+
+    for (Snapshot snapshot : snapshots) {
+      snapshotOrdinals.put(snapshot.snapshotId(), ordinal++);
+    }
+
+    return snapshotOrdinals;
+  }
+
+  private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> {
+    private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+
+    private final Map<Long, Integer> snapshotOrdinals;
+
+    CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
+      this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+    }
+
+    @Override
+    public CloseableIterable<ChangelogScanTask> apply(
+        CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext context) {
+
+      return CloseableIterable.transform(
+          entries,
+          entry -> {
+            Preconditions.checkArgument(
+                snapshotOrdinals.containsKey(entry.snapshotId()),
+                "Cannot determine ordinal for snapshot %s",
+                entry.snapshotId());
+
+            DataFile dataFile = entry.file().copy(context.shouldKeepStats());
+            int changeOrdinal = snapshotOrdinals.get(entry.snapshotId());
+            long commitSnapshotId = entry.snapshotId();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937189285


##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestBaseIncrementalChangelogScan
+    extends ScanTestBase<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>> {
+
+  public TestBaseIncrementalChangelogScan(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newScan() {
+    return table.newIncrementalChangelogScan();
+  }
+
+  @Test
+  public void testDataFilters() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+    ManifestFile snap1DataManifest = Iterables.getOnlyElement(snap1.dataManifests(table.io()));
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    Assert.assertEquals("Must be 2 data manifests", 2, snap2.dataManifests(table.io()).size());
+
+    withUnavailableLocations(

Review Comment:
   I copied this logic from a few other places. The goal is to verify that data skipping works and non-matching files are not read. If we tried to read those files, we would get an exception now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937174756


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =

Review Comment:
   Yea , or maybe just clarify the variable or method, that this is in order ascending (by timestamp), and that's what the CreateDataFileChangeTasks expects.
   
   Or how about just create the ordinals in this method, and have CreateDataFileChangeTasks take in the ordinals?
   
   I guess the whole goal for me would be to make it easier to see that that the ordinals are in order ascending without having to jump around different methods/ inner classes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937203118


##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestBaseIncrementalChangelogScan
+    extends ScanTestBase<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>> {
+
+  public TestBaseIncrementalChangelogScan(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newScan() {
+    return table.newIncrementalChangelogScan();
+  }
+
+  @Test
+  public void testDataFilters() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+    ManifestFile snap1DataManifest = Iterables.getOnlyElement(snap1.dataManifests(table.io()));
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    Assert.assertEquals("Must be 2 data manifests", 2, snap2.dataManifests(table.io()).size());
+
+    withUnavailableLocations(
+        ImmutableList.of(snap1DataManifest.path()),
+        () -> {
+          // bucket(k, 16) is 1 which is supposed to match only FILE_B
+          IncrementalChangelogScan scan = newScan().filter(Expressions.equal("data", "k"));
+
+          List<ChangelogScanTask> tasks = plan(scan);
+
+          Assert.assertEquals("Must have 1 task", 1, tasks.size());
+
+          AddedRowsScanTask t1 = (AddedRowsScanTask) Iterables.getOnlyElement(tasks);
+          Assert.assertEquals("Ordinal must match", 1, t1.changeOrdinal());
+          Assert.assertEquals("Snapshot must match", snap2.snapshotId(), t1.commitSnapshotId());
+          Assert.assertEquals("Data file must match", FILE_B.path(), t1.file().path());
+          Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+        });
+  }
+
+  @Test
+  public void testOverwrites() {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    table.newOverwrite().addFile(FILE_A2).deleteFile(FILE_A).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    IncrementalChangelogScan scan =
+        newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 2 tasks", 2, tasks.size());
+
+    AddedRowsScanTask t1 = (AddedRowsScanTask) tasks.get(0);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A2.path(), t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+
+    DeletedDataFileScanTask t2 = (DeletedDataFileScanTask) tasks.get(1);
+    Assert.assertEquals("Ordinal must match", 0, t2.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), t2.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), t2.file().path());
+    Assert.assertTrue("Must be no deletes", t2.existingDeletes().isEmpty());
+  }
+
+  @Test
+  public void testFileDeletes() {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    table.newDelete().deleteFile(FILE_A).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    IncrementalChangelogScan scan =
+        newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap2.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 1 tasks", 1, tasks.size());
+
+    DeletedDataFileScanTask t1 = (DeletedDataFileScanTask) Iterables.getOnlyElement(tasks);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.existingDeletes().isEmpty());
+  }
+
+  @Test
+  public void testExistingEntriesInNewDataManifestsAreIgnored() {
+    table
+        .updateProperties()
+        .set(MANIFEST_MIN_MERGE_COUNT, "1")
+        .set(MANIFEST_MERGE_ENABLED, "true")
+        .commit();
+
+    table.newAppend().appendFile(FILE_A).commit();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    table.newAppend().appendFile(FILE_C).commit();
+
+    Snapshot snap3 = table.currentSnapshot();
+
+    ManifestFile manifest = Iterables.getOnlyElement(snap3.dataManifests(table.io()));
+    Assert.assertTrue("Manifest must have existing files", manifest.hasExistingFiles());
+
+    IncrementalChangelogScan scan =
+        newScan().fromSnapshotInclusive(snap3.snapshotId()).toSnapshot(snap3.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 1 task", 1, tasks.size());
+
+    AddedRowsScanTask t1 = (AddedRowsScanTask) Iterables.getOnlyElement(tasks);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap3.snapshotId(), t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_C.path(), t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+  }
+
+  @Test
+  public void testManifestRewritesAreIgnored() throws IOException {
+    table.newAppend().appendFile(FILE_A).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    ManifestFile newManifest =
+        writeManifest(
+            "manifest-file.avro",
+            manifestEntry(ManifestEntry.Status.EXISTING, snap1.snapshotId(), FILE_A),
+            manifestEntry(ManifestEntry.Status.EXISTING, snap2.snapshotId(), FILE_B));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+
+    for (ManifestFile manifest : snap2.dataManifests(table.io())) {
+      rewriteManifests.deleteManifest(manifest);
+    }
+
+    rewriteManifests.addManifest(newManifest);
+
+    rewriteManifests.commit();
+
+    table.newAppend().appendFile(FILE_C).commit();
+
+    Snapshot snap4 = table.currentSnapshot();
+
+    List<ChangelogScanTask> tasks = plan(newScan());
+
+    Assert.assertEquals("Must have 3 tasks", 3, tasks.size());
+
+    AddedRowsScanTask t1 = (AddedRowsScanTask) tasks.get(0);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap1.snapshotId(), t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+
+    AddedRowsScanTask t2 = (AddedRowsScanTask) tasks.get(1);
+    Assert.assertEquals("Ordinal must match", 1, t2.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), t2.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_B.path(), t2.file().path());
+    Assert.assertTrue("Must be no deletes", t2.deletes().isEmpty());
+
+    AddedRowsScanTask t3 = (AddedRowsScanTask) tasks.get(2);
+    Assert.assertEquals("Ordinal must match", 2, t3.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap4.snapshotId(), t3.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_C.path(), t3.file().path());
+    Assert.assertTrue("Must be no deletes", t3.deletes().isEmpty());
+  }
+
+  @Test
+  public void testDataFileRewrites() {
+    table.newAppend().appendFile(FILE_A).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    table.newRewrite().rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A2)).commit();
+
+    List<ChangelogScanTask> tasks = plan(newScan());
+
+    Assert.assertEquals("Must have 2 tasks", 2, tasks.size());
+
+    AddedRowsScanTask t1 = (AddedRowsScanTask) tasks.get(0);
+    Assert.assertEquals("Ordinal must match", 0, t1.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap1.snapshotId(), t1.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), t1.file().path());
+    Assert.assertTrue("Must be no deletes", t1.deletes().isEmpty());
+
+    AddedRowsScanTask t2 = (AddedRowsScanTask) tasks.get(1);
+    Assert.assertEquals("Ordinal must match", 1, t2.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", snap2.snapshotId(), t2.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_B.path(), t2.file().path());
+    Assert.assertTrue("Must be no deletes", t2.deletes().isEmpty());
+  }
+
+  @Test
+  public void testDeleteFilesAreNotSupported() {
+    Assume.assumeTrue(formatVersion == 2);
+
+    table.newFastAppend().appendFile(FILE_A2).appendFile(FILE_B).commit();
+
+    table.newRowDelta().addDeletes(FILE_A2_DELETES).commit();
+
+    AssertHelpers.assertThrows(
+        "Should complain about delete files",
+        UnsupportedOperationException.class,
+        "Delete files are currently not supported",
+        () -> plan(newScan()));
+  }
+
+  // plans tasks and reorders them to have deterministic order
+  private List<ChangelogScanTask> plan(IncrementalChangelogScan scan) {
+    try (CloseableIterable<ChangelogScanTask> tasks = scan.planFiles()) {
+      List<ChangelogScanTask> tasksAsList = Lists.newArrayList(tasks);
+
+      tasksAsList.sort(
+          (t1, t2) -> {
+            int res = t1.changeOrdinal() - t2.changeOrdinal();

Review Comment:
   Great idea. Implemented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933704822


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());

Review Comment:
   Let me know if a helper method would be more readable. I wanted to format it differently.
   
   ```
   Set<Long> changelogSnapshotIds = changelogSnapshots.stream()
       .map(Snapshot::snapshotId)
       .collect(Collectors.toSet());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r934642410


##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -137,6 +144,14 @@
    */
   CloseableIterable<G> planTasks();
 
+  /**
+   * Returns whether this scan should apply column name case sensitiveness as per {@link
+   * Scan#caseSensitive(boolean)}.

Review Comment:
   I copied the doc from `TableScan` but I'd be up for updating it. Let me do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r936115965


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =

Review Comment:
   Opt/Nit:  It seems a bit strange to have a deque so early, when I see its really needed inside CreateDataFileChangeTasks.  You have to kind of track the code in two places to see how it works.
   
   What do you think about, just using Iterable<Snapshot>  here and then only doing the Deque / computeSnapshotOrdinals logic inside the  CreateDataFileChangeTasks constructor?
   
   The negative would be having to change Iterable<Snapshot> to stream using Java, for some of these lines of code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933095884


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>

Review Comment:
   nit: no newlines required



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r936119427


##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestBaseIncrementalChangelogScan
+    extends ScanTestBase<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>> {
+
+  public TestBaseIncrementalChangelogScan(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newScan() {
+    return table.newIncrementalChangelogScan();
+  }
+
+  @Test
+  public void testDataFilters() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+    ManifestFile snap1DataManifest = Iterables.getOnlyElement(snap1.dataManifests(table.io()));
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    Assert.assertEquals("Must be 2 data manifests", 2, snap2.dataManifests(table.io()).size());
+
+    withUnavailableLocations(

Review Comment:
   Question, while this is interesting, is it overkill for testing a data filter?  What are we trying to test, that even if the table is corrupt that CDC scan is working fine?  Not sure if we really need this test, but if we do we could probably clarify the name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#issuecomment-1206011582

   Thanks for reviewing, @Reo-LEI @kbendick @stevenzwu @szehon-ho @flyrain!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r938269521


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        orderedChangelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  // builds a collection of changelog snapshots (oldest to newest)
+  // the order of the snapshots is important as it is used to determine change ordinals
+  private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl) {
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {
+        if (snapshot.deleteManifests(table().io()).size() > 0) {
+          throw new UnsupportedOperationException(
+              "Delete files are currently not supported in changelog scans");
+        }
+
+        changelogSnapshots.addFirst(snapshot);
+      }
+    }
+
+    return changelogSnapshots;
+  }
+
+  private Set<Long> toSnapshotIds(Collection<Snapshot> snapshots) {
+    return snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+  }
+
+  private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapshots) {

Review Comment:
   We used to think about use sequence number(https://iceberg.apache.org/spec/#sequence-numbers) as the ordinals, but it isn't available for tables with old spec. We still need zero based ordinals, which I think it is a good thing, and we should probably use the zero based ordinals for all tables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] Reo-LEI commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
Reo-LEI commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933097389


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changeSnapshots =

Review Comment:
   I think `incrementalSnapshots` is more readable.



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changeSnapshots =
+        changeSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changeSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changeSnapshotIds =
+        changeSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changeSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changeSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())

Review Comment:
   Is `deleteManifests` are not scanned because `BaseIncrementalChangelogScan` only applies to COW? Will we consider to support MOR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933704808


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =

Review Comment:
   This would also previously fit on one line.



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());

Review Comment:
   Let me know if a helper method would be readable. I wanted to format it differently.
   
   ```
   Set<Long> changelogSnapshotIds = changelogSnapshots.stream()
       .map(Snapshot::snapshotId)
       .collect(Collectors.toSet());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r934957155


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.events.IncrementalScanEvent;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.SnapshotUtil;
+
+abstract class BaseIncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, G> implements IncrementalScan<ThisT, T, G> {
+
+  protected BaseIncrementalScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+
+  @Override
+  public ThisT fromSnapshotInclusive(long fromSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(fromSnapshotId) != null,
+        "Cannot find the starting snapshot: %s",
+        fromSnapshotId);
+    TableScanContext newContext = context().fromSnapshotIdInclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT fromSnapshotExclusive(long fromSnapshotId) {
+    // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied
+    // as fromSnapshotId could be matched to a parent snapshot that is already expired
+    TableScanContext newContext = context().fromSnapshotIdExclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT toSnapshot(long toSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(toSnapshotId) != null, "Cannot find the end snapshot: %s", toSnapshotId);
+    TableScanContext newContext = context().toSnapshotId(toSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public CloseableIterable<T> planFiles() {
+    if (scanCurrentLineage() && table().currentSnapshot() == null) {
+      // If the table is empty (no current snapshot) and both from and to snapshots aren't set,
+      // simply return an empty iterable. In this case, the listener notification is also skipped.
+      return CloseableIterable.empty();
+    }
+
+    long toSnapshotIdInclusive = toSnapshotIdInclusive();
+    Long fromSnapshotIdExclusive = fromSnapshotIdExclusive(toSnapshotIdInclusive);
+
+    if (fromSnapshotIdExclusive != null) {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              fromSnapshotIdExclusive,
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              false));
+    } else {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              SnapshotUtil.oldestAncestorOf(table(), toSnapshotIdInclusive).snapshotId(),
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              true));
+    }
+
+    return doPlanFiles(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+  }
+
+  private boolean scanCurrentLineage() {
+    return context().fromSnapshotId() == null && context().toSnapshotId() == null;
+  }
+
+  private long toSnapshotIdInclusive() {
+    if (context().toSnapshotId() != null) {
+      return context().toSnapshotId();
+    } else {
+      Snapshot currentSnapshot = table().currentSnapshot();
+      Preconditions.checkArgument(
+          currentSnapshot != null, "End snapshot is not set and table has no current snapshot");
+      return currentSnapshot.snapshotId();
+    }
+  }
+
+  private Long fromSnapshotIdExclusive(long toSnapshotIdInclusive) {
+    Long fromSnapshotId = context().fromSnapshotId();
+
+    if (fromSnapshotId == null) {
+      return null;
+

Review Comment:
   nit: extra empty line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937121325


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  private Deque<Snapshot> changelogSnapshots(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot :
+        SnapshotUtil.ancestorsBetween(table(), toSnapshotIdInclusive, fromSnapshotIdExclusive)) {
+
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {

Review Comment:
   Correct, replace operations should be ignored as they don't change the underlying data. I have tests for rewriting manifests and rewriting data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r938256729


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(

Review Comment:
   Question: Looks like this is another style of builder pattern, which keeps creating new objects. It is not introduced by this PR though. Wondering if a builder can fit better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937120869


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =

Review Comment:
   I used `Deque` cause I need a collection of ordered snapshots. I thought about using `List` with `LinkedList` as the underlying implementation but `LinkedList` is less efficient than `ArrayDeque`.
   
   I could use `Deque` in the method but return `Collection` but will that be cleaner?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r938325035


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -138,6 +138,10 @@ public static Snapshot oldestAncestorOf(long snapshotId, Function<Long, Snapshot
     return lastSnapshot;
   }
 
+  public static Snapshot oldestAncestorOf(Table table, long snapshotId) {

Review Comment:
   Good point, I missed this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933723830


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>

Review Comment:
   I find that sometimes if it changes something I've written that I really dislike (especially extends statements), I can try to fix it a bit and rerun spotless apply and it will usually let me do something better for formatting than what it chose to do at first.
   
   Breaking the type parameter of `BaseIncrementalScan` at the `<` across lines is somewhat of a visual sin (which I know you didn't choose to do 😜 ).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r936115965


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =

Review Comment:
   Opt/Nit:  It seems a bit strange to have a deque so early, when I see its really needed inside CreateDataFileChangeTasks.  You have to kind of track the code in two places to see how it works.
   
   What do you think about, just using Iterable<Snapshot>  here and then only doing the Deque / computeSnapshotOrdinals logic inside the  CreateDataFileChangeTasks constructor?
   
   The slight negative might be , having to transform Iterable<Snapshot> to stream for some of these methods.



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.events.IncrementalScanEvent;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.SnapshotUtil;
+
+abstract class BaseIncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, G> implements IncrementalScan<ThisT, T, G> {
+
+  protected BaseIncrementalScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+
+  @Override
+  public ThisT fromSnapshotInclusive(long fromSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(fromSnapshotId) != null,
+        "Cannot find the starting snapshot: %s",
+        fromSnapshotId);
+    TableScanContext newContext = context().fromSnapshotIdInclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT fromSnapshotExclusive(long fromSnapshotId) {
+    // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied
+    // as fromSnapshotId could be matched to a parent snapshot that is already expired
+    TableScanContext newContext = context().fromSnapshotIdExclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT toSnapshot(long toSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(toSnapshotId) != null, "Cannot find the end snapshot: %s", toSnapshotId);
+    TableScanContext newContext = context().toSnapshotId(toSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public CloseableIterable<T> planFiles() {
+    if (scanCurrentLineage() && table().currentSnapshot() == null) {
+      // If the table is empty (no current snapshot) and both from and to snapshots aren't set,
+      // simply return an empty iterable. In this case, the listener notification is also skipped.
+      return CloseableIterable.empty();
+    }
+
+    long toSnapshotIdInclusive = toSnapshotIdInclusive();
+    Long fromSnapshotIdExclusive = fromSnapshotIdExclusive(toSnapshotIdInclusive);
+
+    if (fromSnapshotIdExclusive != null) {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              fromSnapshotIdExclusive,
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              false));
+    } else {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              SnapshotUtil.oldestAncestorOf(table(), toSnapshotIdInclusive).snapshotId(),
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              true));
+    }
+
+    return doPlanFiles(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+  }
+
+  private boolean scanCurrentLineage() {
+    return context().fromSnapshotId() == null && context().toSnapshotId() == null;
+  }
+
+  private long toSnapshotIdInclusive() {
+    if (context().toSnapshotId() != null) {
+      return context().toSnapshotId();
+    } else {
+      Snapshot currentSnapshot = table().currentSnapshot();
+      Preconditions.checkArgument(
+          currentSnapshot != null, "End snapshot is not set and table has no current snapshot");
+      return currentSnapshot.snapshotId();
+    }
+  }
+
+  private Long fromSnapshotIdExclusive(long toSnapshotIdInclusive) {
+    Long fromSnapshotId = context().fromSnapshotId();
+
+    if (fromSnapshotId == null) {
+      return null;
+
+    } else if (context().fromSnapshotInclusive()) {

Review Comment:
   Nit: java style question (as you have probably read more about it than me), what do you think , is it better to just start another if block instead of doing 'else-if' in this case, because the above block is a return?  I personally have a hard time understanding multi if-else-else cases where the conditions are on different variables. 



##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -137,6 +144,13 @@
    */
   CloseableIterable<G> planTasks();
 
+  /**
+   * Returns whether this scan is case-sensitive with respect to column names.
+   *
+   * @return true if case-sensitive, false otherwise.
+   */
+  boolean isCaseSensitive();

Review Comment:
   Nit: Realize its a move, but maybe we could move the caseSensitive() getter up to the setter, or just put the other getter (filter) to the bottom of the file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r936122161


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  private Deque<Snapshot> changelogSnapshots(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot :
+        SnapshotUtil.ancestorsBetween(table(), toSnapshotIdInclusive, fromSnapshotIdExclusive)) {
+
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {

Review Comment:
   oh. replace is for compaction or sorting like table change where there is no net change to table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r936115965


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =

Review Comment:
   Opt/Nit:  It seems a bit strange to have a deque so early, when I see its really needed inside CreateDataFileChangeTasks.  You have to kind of track the code in two places to see how it works.
   
   What do you think about, just using Iterable<Snapshot>  here and then only doing the Deque / computeSnapshotOrdinals logic inside the  CreateDataFileChangeTasks constructor?
   
   The only negative might be , having to transform Iterable<Snapshot> to stream using Java, for some of these lines of code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933704899


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  private Deque<Snapshot> changelogSnapshots(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot :

Review Comment:
   This would also previously fit on one line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933704597


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>

Review Comment:
   I think it is because of the new line length limit. Such changes are really unfortunate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937174756


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =

Review Comment:
   Yea , or maybe just clarify the variable or method "changelogSnapshots", that this is in order ascending (by timestamp), and that's what the CreateDataFileChangeTasks expects.
   
   Or how about just create the ordinals in this method, and have CreateDataFileChangeTasks take in the ordinals?
   
   I guess the whole goal for me, if possible, would be to make it easier to see that that the ordinals are in order ascending without having to jump around different methods/ inner classes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] iflytek-hmwang5 commented on pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
iflytek-hmwang5 commented on PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#issuecomment-1217341819

   > 
   
   I am sorry, I may not have been clear at the beginning, we need row-level overwrite.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933723830


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>

Review Comment:
   I find that sometimes if it changes something I've written that I really dislike (especially extends statements), I can try to fix it a bit and rerun spotless apply and it will usually let me do something better than what it chose to do at first.
   
   Breaking the type parameter of `BaseIncrementalScan` at the `<` across lines is somewhat of a visual sin (which I know you didn't choose to do 😜 ).



##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -137,6 +144,14 @@
    */
   CloseableIterable<G> planTasks();
 
+  /**
+   * Returns whether this scan should apply column name case sensitiveness as per {@link
+   * Scan#caseSensitive(boolean)}.

Review Comment:
   Nit: maybe `Return whether this scan should be case sensitive with respect to column names` or something else that avoids the phrase `apply column name case sensitiveness`. Even just avoiding `case sensitiveness` would help me to think less about it when looking through it.
   
   EDiT - I see this is the same wording carried from `TableScan`. Would not advise changing for this reason alone then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
flyrain commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r938109281


##########
core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java:
##########
@@ -138,6 +138,10 @@ public static Snapshot oldestAncestorOf(long snapshotId, Function<Long, Snapshot
     return lastSnapshot;
   }
 
+  public static Snapshot oldestAncestorOf(Table table, long snapshotId) {

Review Comment:
   Nit: shall we move this method to line 121. We usually put caller before callee. Something like this.
   ```
   A Caller of B
   B Caller of C
   C  
   ```



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        orderedChangelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  // builds a collection of changelog snapshots (oldest to newest)
+  // the order of the snapshots is important as it is used to determine change ordinals
+  private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl) {
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {
+        if (snapshot.deleteManifests(table().io()).size() > 0) {
+          throw new UnsupportedOperationException(
+              "Delete files are currently not supported in changelog scans");
+        }
+
+        changelogSnapshots.addFirst(snapshot);
+      }
+    }
+
+    return changelogSnapshots;
+  }
+
+  private Set<Long> toSnapshotIds(Collection<Snapshot> snapshots) {
+    return snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+  }
+
+  private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapshots) {

Review Comment:
   I guess we don't want to reuse seq# as ordinals as we discussed before. The zero based ordinals should be more consistent and less confusing.



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.events.IncrementalScanEvent;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.SnapshotUtil;
+
+abstract class BaseIncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, G> implements IncrementalScan<ThisT, T, G> {
+
+  protected BaseIncrementalScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+
+  @Override
+  public ThisT fromSnapshotInclusive(long fromSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(fromSnapshotId) != null,
+        "Cannot find the starting snapshot: %s",
+        fromSnapshotId);
+    TableScanContext newContext = context().fromSnapshotIdInclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT fromSnapshotExclusive(long fromSnapshotId) {
+    // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied
+    // as fromSnapshotId could be matched to a parent snapshot that is already expired
+    TableScanContext newContext = context().fromSnapshotIdExclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT toSnapshot(long toSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(toSnapshotId) != null, "Cannot find the end snapshot: %s", toSnapshotId);
+    TableScanContext newContext = context().toSnapshotId(toSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public CloseableIterable<T> planFiles() {
+    if (scanCurrentLineage() && table().currentSnapshot() == null) {
+      // If the table is empty (no current snapshot) and both from and to snapshots aren't set,
+      // simply return an empty iterable. In this case, the listener notification is also skipped.
+      return CloseableIterable.empty();
+    }
+
+    long toSnapshotIdInclusive = toSnapshotIdInclusive();
+    Long fromSnapshotIdExclusive = fromSnapshotIdExclusive(toSnapshotIdInclusive);
+
+    if (fromSnapshotIdExclusive != null) {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              fromSnapshotIdExclusive,
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              false));
+    } else {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              SnapshotUtil.oldestAncestorOf(table(), toSnapshotIdInclusive).snapshotId(),
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              true));
+    }

Review Comment:
   Minor suggestion: How about this?
   ```
       Listeners.notifyAll(
           new IncrementalScanEvent(
               table().name(),
               fromSnapshotIdExclusive != null
                   ? fromSnapshotIdExclusive
                   : SnapshotUtil.oldestAncestorOf(table(), toSnapshotIdInclusive).snapshotId(),
               toSnapshotIdInclusive,
               filter(),
               schema(),
               fromSnapshotIdExclusive != null));
   ```



##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -59,6 +90,22 @@ protected TableScanContext context() {
     return context;
   }
 
+  protected List<String> scanColumns() {
+    return context.returnColumnStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS;
+  }

Review Comment:
   +1 for the refactor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937943456


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =

Review Comment:
   I renamed the method and added a comment on what the order is and why it is important.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] iflytek-hmwang5 commented on pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
iflytek-hmwang5 commented on PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#issuecomment-1216205725

   > > Hi, If this PR don't support deleted rows, whether overwrite cases can't be supported?
   > 
   > There will be a subsequent PR. But I'm not sure if overwrite is intended to be supported in that.
   
   if subsequent PR can include overwrite operation, it will be nice.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged PR #5382:
URL: https://github.com/apache/iceberg/pull/5382


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] flyrain commented on pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
flyrain commented on PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#issuecomment-1216897079

   @iflytek-hmwang5, overwrite should work as long as there is no row-level deletes. Specially, the copy-on-write deletes is handled by `DeletedDataFileScanTask`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r934977044


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  private Deque<Snapshot> changelogSnapshots(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot :
+        SnapshotUtil.ancestorsBetween(table(), toSnapshotIdInclusive, fromSnapshotIdExclusive)) {
+
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {
+        if (snapshot.deleteManifests(table().io()).size() > 0) {
+          throw new UnsupportedOperationException(
+              "Delete files are currently not supported in changelog scans");
+        }
+
+        // insert at the beginning to have old snapshots first
+        changelogSnapshots.addFirst(snapshot);
+      }
+    }
+
+    return changelogSnapshots;
+  }
+
+  private static Map<Long, Integer> computeSnapshotOrdinals(Iterable<Snapshot> snapshots) {
+    Map<Long, Integer> snapshotOrdinals = Maps.newHashMap();
+
+    int ordinal = 0;
+
+    for (Snapshot snapshot : snapshots) {
+      snapshotOrdinals.put(snapshot.snapshotId(), ordinal++);
+    }
+
+    return snapshotOrdinals;
+  }
+
+  private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> {
+    private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+
+    private final Map<Long, Integer> snapshotOrdinals;
+
+    CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
+      this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+    }
+
+    @Override
+    public CloseableIterable<ChangelogScanTask> apply(
+        CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext context) {
+
+      return CloseableIterable.transform(
+          entries,
+          entry -> {
+            Preconditions.checkArgument(
+                snapshotOrdinals.containsKey(entry.snapshotId()),
+                "Cannot determine ordinal for snapshot %s",
+                entry.snapshotId());
+
+            DataFile dataFile = entry.file().copy(context.shouldKeepStats());
+            int changeOrdinal = snapshotOrdinals.get(entry.snapshotId());
+            long commitSnapshotId = entry.snapshotId();

Review Comment:
   nit: move this line above and can reuse the variable. current form is perfectly fine too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r935936753


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.events.IncrementalScanEvent;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.SnapshotUtil;
+
+abstract class BaseIncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, G> implements IncrementalScan<ThisT, T, G> {
+
+  protected BaseIncrementalScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+
+  @Override
+  public ThisT fromSnapshotInclusive(long fromSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(fromSnapshotId) != null,
+        "Cannot find the starting snapshot: %s",
+        fromSnapshotId);
+    TableScanContext newContext = context().fromSnapshotIdInclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT fromSnapshotExclusive(long fromSnapshotId) {
+    // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied
+    // as fromSnapshotId could be matched to a parent snapshot that is already expired
+    TableScanContext newContext = context().fromSnapshotIdExclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT toSnapshot(long toSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(toSnapshotId) != null, "Cannot find the end snapshot: %s", toSnapshotId);
+    TableScanContext newContext = context().toSnapshotId(toSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public CloseableIterable<T> planFiles() {
+    if (scanCurrentLineage() && table().currentSnapshot() == null) {
+      // If the table is empty (no current snapshot) and both from and to snapshots aren't set,
+      // simply return an empty iterable. In this case, the listener notification is also skipped.
+      return CloseableIterable.empty();
+    }
+
+    long toSnapshotIdInclusive = toSnapshotIdInclusive();
+    Long fromSnapshotIdExclusive = fromSnapshotIdExclusive(toSnapshotIdInclusive);
+
+    if (fromSnapshotIdExclusive != null) {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              fromSnapshotIdExclusive,
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              false));
+    } else {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              SnapshotUtil.oldestAncestorOf(table(), toSnapshotIdInclusive).snapshotId(),
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              true));
+    }
+
+    return doPlanFiles(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+  }
+
+  private boolean scanCurrentLineage() {
+    return context().fromSnapshotId() == null && context().toSnapshotId() == null;
+  }
+
+  private long toSnapshotIdInclusive() {
+    if (context().toSnapshotId() != null) {
+      return context().toSnapshotId();
+    } else {
+      Snapshot currentSnapshot = table().currentSnapshot();
+      Preconditions.checkArgument(
+          currentSnapshot != null, "End snapshot is not set and table has no current snapshot");
+      return currentSnapshot.snapshotId();
+    }
+  }
+
+  private Long fromSnapshotIdExclusive(long toSnapshotIdInclusive) {
+    Long fromSnapshotId = context().fromSnapshotId();
+
+    if (fromSnapshotId == null) {
+      return null;
+
+    } else if (context().fromSnapshotInclusive()) {

Review Comment:
   Nit: java style question (as you have probably read more about it than me), what do you think , is it better to just start another if block instead of doing 'else-if' in this case, because the above block is a return?  I personally have a hard time understanding multi if-else-else cases where the conditions are on different variables.  I actually had an easier time with the previous version (nested if-else)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933401435


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>

Review Comment:
   This is automatic formatting by spotless. I can't change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933705522


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  private Deque<Snapshot> changelogSnapshots(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot :

Review Comment:
   I can play around with shorter variable names but I doubt it is a good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933704574


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changeSnapshots =

Review Comment:
   I updated it to `changelogSnapshots`. What do you think, @Reo-LEI?



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>

Review Comment:
   I think it is because of the new line length limit. Such changes are really unfortunate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937188121


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =

Review Comment:
   Got it. Let me see what I can do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937191535


##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -137,6 +144,13 @@
    */
   CloseableIterable<G> planTasks();
 
+  /**
+   * Returns whether this scan is case-sensitive with respect to column names.
+   *
+   * @return true if case-sensitive, false otherwise.
+   */
+  boolean isCaseSensitive();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r938263489


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        orderedChangelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  // builds a collection of changelog snapshots (oldest to newest)
+  // the order of the snapshots is important as it is used to determine change ordinals
+  private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl) {
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {
+        if (snapshot.deleteManifests(table().io()).size() > 0) {
+          throw new UnsupportedOperationException(
+              "Delete files are currently not supported in changelog scans");
+        }
+
+        changelogSnapshots.addFirst(snapshot);
+      }
+    }
+
+    return changelogSnapshots;
+  }
+
+  private Set<Long> toSnapshotIds(Collection<Snapshot> snapshots) {
+    return snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+  }
+
+  private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapshots) {

Review Comment:
   I didnt follow the conversation about that, so curious.  I guess if we introduce new concept of 'ordinal' as we do here, its a bit more code, and also do you also lose correlation with the snapshot that did the change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r935922258


##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -137,6 +144,13 @@
    */
   CloseableIterable<G> planTasks();
 
+  /**
+   * Returns whether this scan is case-sensitive with respect to column names.
+   *
+   * @return true if case-sensitive, false otherwise.
+   */
+  boolean isCaseSensitive();

Review Comment:
   Nit: Realize its a move, but maybe we could move the caseSensitive() getter up to the setter, or just put the other getter (filter) to the bottom of the file?  (This comment would apply to all the files) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933704821


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =

Review Comment:
   This would also previously fit on one line.



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());

Review Comment:
   Let me know if a helper method would be readable. I wanted to format it differently.
   
   ```
   Set<Long> changelogSnapshotIds = changelogSnapshots.stream()
       .map(Snapshot::snapshotId)
       .collect(Collectors.toSet());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933704899


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  private Deque<Snapshot> changelogSnapshots(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot :

Review Comment:
   This would also fit on one line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#issuecomment-1200061898

   cc @flyrain @rdblue @stevenzwu @karuppayya @szehon-ho @RussellSpitzer @anuragmantri


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r934989056


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  private Deque<Snapshot> changelogSnapshots(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot :
+        SnapshotUtil.ancestorsBetween(table(), toSnapshotIdInclusive, fromSnapshotIdExclusive)) {
+
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {

Review Comment:
   What Is the expected changelog behavior for rewrite/replace? noop or deleted data files followed by added data files?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r934990365


##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestBaseIncrementalChangelogScan

Review Comment:
   great test coverage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r936119427


##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestBaseIncrementalChangelogScan
+    extends ScanTestBase<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>> {
+
+  public TestBaseIncrementalChangelogScan(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newScan() {
+    return table.newIncrementalChangelogScan();
+  }
+
+  @Test
+  public void testDataFilters() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    Snapshot snap1 = table.currentSnapshot();
+    ManifestFile snap1DataManifest = Iterables.getOnlyElement(snap1.dataManifests(table.io()));
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    Assert.assertEquals("Must be 2 data manifests", 2, snap2.dataManifests(table.io()).size());
+
+    withUnavailableLocations(

Review Comment:
   Question, while this is interesting, is it overkill for testing a data filter?  What are we trying to test, that even if the table is corrupt that CDC scan is working fine?  Not sure if we really need this test (I imagine a corrupt table breaks many things), but if we do we could probably clarify the name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r935922258


##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -137,6 +144,13 @@
    */
   CloseableIterable<G> planTasks();
 
+  /**
+   * Returns whether this scan is case-sensitive with respect to column names.
+   *
+   * @return true if case-sensitive, false otherwise.
+   */
+  boolean isCaseSensitive();

Review Comment:
   Nit: Realize its a move, but the new getter locations seem inconsistent?   maybe we could move the caseSensitive() getter up to the setter, or just put the other getter (filter) to the bottom of the file?  (This comment would apply to all the files) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r934642410


##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -137,6 +144,14 @@
    */
   CloseableIterable<G> planTasks();
 
+  /**
+   * Returns whether this scan should apply column name case sensitiveness as per {@link
+   * Scan#caseSensitive(boolean)}.

Review Comment:
   I copied the doc from `TableScan` but I'd be up for updating it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r938319791


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(

Review Comment:
   I am not sure it was a deliberate choice to always create a new object on each modification. We can take a look after this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r938417751


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.events.IncrementalScanEvent;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.SnapshotUtil;
+
+abstract class BaseIncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, G> implements IncrementalScan<ThisT, T, G> {
+
+  protected BaseIncrementalScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+
+  @Override
+  public ThisT fromSnapshotInclusive(long fromSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(fromSnapshotId) != null,
+        "Cannot find the starting snapshot: %s",
+        fromSnapshotId);
+    TableScanContext newContext = context().fromSnapshotIdInclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT fromSnapshotExclusive(long fromSnapshotId) {
+    // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied
+    // as fromSnapshotId could be matched to a parent snapshot that is already expired
+    TableScanContext newContext = context().fromSnapshotIdExclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT toSnapshot(long toSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(toSnapshotId) != null, "Cannot find the end snapshot: %s", toSnapshotId);
+    TableScanContext newContext = context().toSnapshotId(toSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public CloseableIterable<T> planFiles() {
+    if (scanCurrentLineage() && table().currentSnapshot() == null) {
+      // If the table is empty (no current snapshot) and both from and to snapshots aren't set,
+      // simply return an empty iterable. In this case, the listener notification is also skipped.
+      return CloseableIterable.empty();
+    }
+
+    long toSnapshotIdInclusive = toSnapshotIdInclusive();
+    Long fromSnapshotIdExclusive = fromSnapshotIdExclusive(toSnapshotIdInclusive);
+
+    if (fromSnapshotIdExclusive != null) {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              fromSnapshotIdExclusive,
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              false));
+    } else {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              SnapshotUtil.oldestAncestorOf(table(), toSnapshotIdInclusive).snapshotId(),
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              true));
+    }

Review Comment:
   I did that but then realized I made a typo. I kept two branches but added in-line comments for boolean args for clarity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r938418283


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.events.IncrementalScanEvent;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.SnapshotUtil;
+
+abstract class BaseIncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, G> implements IncrementalScan<ThisT, T, G> {
+
+  protected BaseIncrementalScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+
+  @Override
+  public ThisT fromSnapshotInclusive(long fromSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(fromSnapshotId) != null,
+        "Cannot find the starting snapshot: %s",
+        fromSnapshotId);
+    TableScanContext newContext = context().fromSnapshotIdInclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT fromSnapshotExclusive(long fromSnapshotId) {
+    // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied
+    // as fromSnapshotId could be matched to a parent snapshot that is already expired
+    TableScanContext newContext = context().fromSnapshotIdExclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT toSnapshot(long toSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(toSnapshotId) != null, "Cannot find the end snapshot: %s", toSnapshotId);
+    TableScanContext newContext = context().toSnapshotId(toSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public CloseableIterable<T> planFiles() {
+    if (scanCurrentLineage() && table().currentSnapshot() == null) {
+      // If the table is empty (no current snapshot) and both from and to snapshots aren't set,
+      // simply return an empty iterable. In this case, the listener notification is also skipped.
+      return CloseableIterable.empty();
+    }
+
+    long toSnapshotIdInclusive = toSnapshotIdInclusive();
+    Long fromSnapshotIdExclusive = fromSnapshotIdExclusive(toSnapshotIdInclusive);
+
+    if (fromSnapshotIdExclusive != null) {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              fromSnapshotIdExclusive,
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              false));
+    } else {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              SnapshotUtil.oldestAncestorOf(table(), toSnapshotIdInclusive).snapshotId(),
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              true));
+    }

Review Comment:
   I think we should eventually add a builder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933705197


##########
core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class TestBaseIncrementalChangelogScan
+    extends ScanTestBase<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>> {
+
+  public TestBaseIncrementalChangelogScan(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newScan() {
+    return table.newIncrementalChangelogScan();
+  }
+
+  @Test
+  public void testDataFilters() {
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+    ManifestFile firstSnapshotDataManifest =
+        Iterables.getOnlyElement(firstSnapshot.dataManifests(table.io()));
+
+    table.newFastAppend().appendFile(FILE_B).commit();
+
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    Assert.assertEquals(
+        "Must be 2 data manifests", 2, secondSnapshot.dataManifests(table.io()).size());
+
+    withUnavailableLocations(
+        ImmutableList.of(firstSnapshotDataManifest.path()),
+        () -> {
+          // bucket(k, 16) is 1 which is supposed to match only FILE_B
+          IncrementalChangelogScan scan = newScan().filter(Expressions.equal("data", "k"));
+
+          List<ChangelogScanTask> tasks = plan(scan);
+
+          Assert.assertEquals("Must have 1 task", 1, tasks.size());
+
+          AddedRowsScanTask task = (AddedRowsScanTask) Iterables.getOnlyElement(tasks);
+          Assert.assertEquals("Ordinal must match", 1, task.changeOrdinal());
+          Assert.assertEquals(
+              "Snapshot must match", secondSnapshot.snapshotId(), task.commitSnapshotId());
+          Assert.assertEquals("Data file must match", FILE_B.path(), task.file().path());
+          Assert.assertTrue("Must be no deletes", task.deletes().isEmpty());
+        });
+  }
+
+  @Test
+  public void testOverwrites() {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    table.newOverwrite().addFile(FILE_A2).deleteFile(FILE_A).commit();
+
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    IncrementalChangelogScan scan =
+        newScan()
+            .fromSnapshotExclusive(firstSnapshot.snapshotId())
+            .toSnapshot(secondSnapshot.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 2 tasks", 2, tasks.size());
+
+    AddedRowsScanTask addedRowsTask = (AddedRowsScanTask) tasks.get(0);
+    Assert.assertEquals("Ordinal must match", 0, addedRowsTask.changeOrdinal());
+    Assert.assertEquals(
+        "Snapshot must match", secondSnapshot.snapshotId(), addedRowsTask.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A2.path(), addedRowsTask.file().path());
+    Assert.assertTrue("Must be no deletes", addedRowsTask.deletes().isEmpty());
+
+    DeletedDataFileScanTask deletedDataFileTask = (DeletedDataFileScanTask) tasks.get(1);
+    Assert.assertEquals("Ordinal must match", 0, deletedDataFileTask.changeOrdinal());
+    Assert.assertEquals(
+        "Snapshot must match", secondSnapshot.snapshotId(), deletedDataFileTask.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), deletedDataFileTask.file().path());
+    Assert.assertTrue("Must be no deletes", deletedDataFileTask.existingDeletes().isEmpty());
+  }
+
+  @Test
+  public void testFileDeletes() {
+    table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    table.newDelete().deleteFile(FILE_A).commit();
+
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    IncrementalChangelogScan scan =
+        newScan()
+            .fromSnapshotExclusive(firstSnapshot.snapshotId())
+            .toSnapshot(secondSnapshot.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 1 tasks", 1, tasks.size());
+
+    DeletedDataFileScanTask deletedDataFileTask =
+        (DeletedDataFileScanTask) Iterables.getOnlyElement(tasks);
+    Assert.assertEquals("Ordinal must match", 0, deletedDataFileTask.changeOrdinal());
+    Assert.assertEquals(
+        "Snapshot must match", secondSnapshot.snapshotId(), deletedDataFileTask.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_A.path(), deletedDataFileTask.file().path());
+    Assert.assertTrue("Must be no deletes", deletedDataFileTask.existingDeletes().isEmpty());
+  }
+
+  @Test
+  public void testExistingEntriesInNewDataManifestsAreIgnored() {
+    table
+        .updateProperties()
+        .set(MANIFEST_MIN_MERGE_COUNT, "1")
+        .set(MANIFEST_MERGE_ENABLED, "true")
+        .commit();
+
+    table.newAppend().appendFile(FILE_A).commit();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    table.newAppend().appendFile(FILE_C).commit();
+
+    Snapshot thirdSnapshot = table.currentSnapshot();
+
+    ManifestFile manifest = Iterables.getOnlyElement(thirdSnapshot.dataManifests(table.io()));
+    Assert.assertTrue("Manifest must have existing files", manifest.hasExistingFiles());
+
+    IncrementalChangelogScan scan =
+        newScan()
+            .fromSnapshotInclusive(thirdSnapshot.snapshotId())
+            .toSnapshot(thirdSnapshot.snapshotId());
+
+    List<ChangelogScanTask> tasks = plan(scan);
+
+    Assert.assertEquals("Must have 1 task", 1, tasks.size());
+
+    AddedRowsScanTask task = (AddedRowsScanTask) Iterables.getOnlyElement(tasks);
+    Assert.assertEquals("Ordinal must match", 0, task.changeOrdinal());
+    Assert.assertEquals("Snapshot must match", thirdSnapshot.snapshotId(), task.commitSnapshotId());
+    Assert.assertEquals("Data file must match", FILE_C.path(), task.file().path());
+    Assert.assertTrue("Must be no deletes", task.deletes().isEmpty());
+  }
+
+  @Test
+  public void testManifestRewritesAreIgnored() throws IOException {
+    table.newAppend().appendFile(FILE_A).commit();
+
+    Snapshot firstSnapshot = table.currentSnapshot();
+
+    table.newAppend().appendFile(FILE_B).commit();
+
+    Snapshot secondSnapshot = table.currentSnapshot();
+
+    ManifestFile newManifest =
+        writeManifest(
+            "manifest-file.avro",
+            manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A),
+            manifestEntry(ManifestEntry.Status.EXISTING, secondSnapshot.snapshotId(), FILE_B));
+
+    RewriteManifests rewriteManifests = table.rewriteManifests();
+
+    for (ManifestFile manifest : secondSnapshot.dataManifests(table.io())) {
+      rewriteManifests.deleteManifest(manifest);
+    }
+
+    rewriteManifests.addManifest(newManifest);
+
+    rewriteManifests.commit();
+
+    table.newAppend().appendFile(FILE_C).commit();
+
+    Snapshot fourthSnapshot = table.currentSnapshot();
+
+    List<ChangelogScanTask> tasks = plan(newScan());
+
+    Assert.assertEquals("Must have 3 tasks", 3, tasks.size());
+
+    AddedRowsScanTask firstTask = (AddedRowsScanTask) tasks.get(0);

Review Comment:
   I should probably reconsider names here. Maybe, `task1` would be better?
   Then statements below would fit on one line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r933403339


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changeSnapshots =
+        changeSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changeSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changeSnapshotIds =
+        changeSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changeSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changeSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())

Review Comment:
   The idea is to split this into two separate PRs to simplify reviews. There will be an exception thrown for now if there is a delete file.



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changeSnapshots =

Review Comment:
   Will update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
kbendick commented on PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#issuecomment-1216052752

   > Hi, If this PR don't support deleted rows, whether overwrite cases can't be supported?
   
   There will be a subsequent PR. But I'm not sure if overwrite is intended to be supported in that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r934632029


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>

Review Comment:
   Yeah, that's what I do too. Usually, there is a workaround but I am afraid not this time :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
stevenzwu commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r934975348


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =
+        changelogSnapshots(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+
+    if (changelogSnapshots.isEmpty()) {
+      return CloseableIterable.empty();
+    }
+
+    Set<Long> changelogSnapshotIds =
+        changelogSnapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
+
+    Set<ManifestFile> newDataManifests =
+        FluentIterable.from(changelogSnapshots)
+            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
+            .toSet();
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+            .specsById(table().specs())
+            .caseSensitive(isCaseSensitive())
+            .select(scanColumns())
+            .filterData(filter())
+            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
+            .ignoreExisting();
+
+    if (shouldIgnoreResiduals()) {
+      manifestGroup = manifestGroup.ignoreResiduals();
+    }
+
+    if (newDataManifests.size() > 1 && shouldPlanWithExecutor()) {
+      manifestGroup = manifestGroup.planWith(planExecutor());
+    }
+
+    return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
+  }
+
+  @Override
+  public CloseableIterable<ScanTaskGroup<ChangelogScanTask>> planTasks() {
+    return TableScanUtil.planTaskGroups(
+        planFiles(), targetSplitSize(), splitLookback(), splitOpenFileCost());
+  }
+
+  private Deque<Snapshot> changelogSnapshots(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots = new ArrayDeque<>();
+
+    for (Snapshot snapshot :
+        SnapshotUtil.ancestorsBetween(table(), toSnapshotIdInclusive, fromSnapshotIdExclusive)) {
+
+      if (!snapshot.operation().equals(DataOperations.REPLACE)) {
+        if (snapshot.deleteManifests(table().io()).size() > 0) {
+          throw new UnsupportedOperationException(
+              "Delete files are currently not supported in changelog scans");
+        }
+
+        // insert at the beginning to have old snapshots first
+        changelogSnapshots.addFirst(snapshot);
+      }
+    }
+
+    return changelogSnapshots;
+  }
+
+  private static Map<Long, Integer> computeSnapshotOrdinals(Iterable<Snapshot> snapshots) {

Review Comment:
   nit: should this be moved inside `CreateDataFileChangeTasks`, as it seems to be only used there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
szehon-ho commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937174756


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
+import org.apache.iceberg.ManifestGroup.TaskContext;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+
+class BaseIncrementalChangelogScan
+    extends BaseIncrementalScan<
+        IncrementalChangelogScan, ChangelogScanTask, ScanTaskGroup<ChangelogScanTask>>
+    implements IncrementalChangelogScan {
+
+  BaseIncrementalChangelogScan(TableOperations ops, Table table) {
+    this(ops, table, table.schema(), new TableScanContext());
+  }
+
+  BaseIncrementalChangelogScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  @Override
+  protected IncrementalChangelogScan newRefinedScan(
+      TableOperations newOps, Table newTable, Schema newSchema, TableScanContext newContext) {
+    return new BaseIncrementalChangelogScan(newOps, newTable, newSchema, newContext);
+  }
+
+  @Override
+  protected CloseableIterable<ChangelogScanTask> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive) {
+
+    Deque<Snapshot> changelogSnapshots =

Review Comment:
   Yea , or maybe just clarify the variable or method "changelogSnapshots", that this is in order ascending (by timestamp), and somehow clarify by comment/param name that its what the CreateDataFileChangeTasks expects.
   
   Or how about just create the ordinals in this method, and have CreateDataFileChangeTasks take in the ordinals?
   
   I guess the whole goal for me, if possible, would be to make it easier to see that that the ordinals are in order ascending without having to jump around different methods/ inner classes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937191432


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.events.IncrementalScanEvent;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.SnapshotUtil;
+
+abstract class BaseIncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, G> implements IncrementalScan<ThisT, T, G> {
+
+  protected BaseIncrementalScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+
+  @Override
+  public ThisT fromSnapshotInclusive(long fromSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(fromSnapshotId) != null,
+        "Cannot find the starting snapshot: %s",
+        fromSnapshotId);
+    TableScanContext newContext = context().fromSnapshotIdInclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT fromSnapshotExclusive(long fromSnapshotId) {
+    // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied
+    // as fromSnapshotId could be matched to a parent snapshot that is already expired
+    TableScanContext newContext = context().fromSnapshotIdExclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT toSnapshot(long toSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(toSnapshotId) != null, "Cannot find the end snapshot: %s", toSnapshotId);
+    TableScanContext newContext = context().toSnapshotId(toSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public CloseableIterable<T> planFiles() {
+    if (scanCurrentLineage() && table().currentSnapshot() == null) {
+      // If the table is empty (no current snapshot) and both from and to snapshots aren't set,
+      // simply return an empty iterable. In this case, the listener notification is also skipped.
+      return CloseableIterable.empty();
+    }
+
+    long toSnapshotIdInclusive = toSnapshotIdInclusive();
+    Long fromSnapshotIdExclusive = fromSnapshotIdExclusive(toSnapshotIdInclusive);
+
+    if (fromSnapshotIdExclusive != null) {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              fromSnapshotIdExclusive,
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              false));
+    } else {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              SnapshotUtil.oldestAncestorOf(table(), toSnapshotIdInclusive).snapshotId(),
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              true));
+    }
+
+    return doPlanFiles(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+  }
+
+  private boolean scanCurrentLineage() {
+    return context().fromSnapshotId() == null && context().toSnapshotId() == null;
+  }
+
+  private long toSnapshotIdInclusive() {
+    if (context().toSnapshotId() != null) {
+      return context().toSnapshotId();
+    } else {
+      Snapshot currentSnapshot = table().currentSnapshot();
+      Preconditions.checkArgument(
+          currentSnapshot != null, "End snapshot is not set and table has no current snapshot");
+      return currentSnapshot.snapshotId();
+    }
+  }
+
+  private Long fromSnapshotIdExclusive(long toSnapshotIdInclusive) {
+    Long fromSnapshotId = context().fromSnapshotId();
+
+    if (fromSnapshotId == null) {
+      return null;
+

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5382: Core: Implement IncrementalChangelogScan without deletes

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #5382:
URL: https://github.com/apache/iceberg/pull/5382#discussion_r937191749


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import org.apache.iceberg.events.IncrementalScanEvent;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.SnapshotUtil;
+
+abstract class BaseIncrementalScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
+    extends BaseScan<ThisT, T, G> implements IncrementalScan<ThisT, T, G> {
+
+  protected BaseIncrementalScan(
+      TableOperations ops, Table table, Schema schema, TableScanContext context) {
+    super(ops, table, schema, context);
+  }
+
+  protected abstract CloseableIterable<T> doPlanFiles(
+      Long fromSnapshotIdExclusive, long toSnapshotIdInclusive);
+
+  @Override
+  public ThisT fromSnapshotInclusive(long fromSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(fromSnapshotId) != null,
+        "Cannot find the starting snapshot: %s",
+        fromSnapshotId);
+    TableScanContext newContext = context().fromSnapshotIdInclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT fromSnapshotExclusive(long fromSnapshotId) {
+    // for exclusive behavior, table().snapshot(fromSnapshotId) check can't be applied
+    // as fromSnapshotId could be matched to a parent snapshot that is already expired
+    TableScanContext newContext = context().fromSnapshotIdExclusive(fromSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public ThisT toSnapshot(long toSnapshotId) {
+    Preconditions.checkArgument(
+        table().snapshot(toSnapshotId) != null, "Cannot find the end snapshot: %s", toSnapshotId);
+    TableScanContext newContext = context().toSnapshotId(toSnapshotId);
+    return newRefinedScan(tableOps(), table(), schema(), newContext);
+  }
+
+  @Override
+  public CloseableIterable<T> planFiles() {
+    if (scanCurrentLineage() && table().currentSnapshot() == null) {
+      // If the table is empty (no current snapshot) and both from and to snapshots aren't set,
+      // simply return an empty iterable. In this case, the listener notification is also skipped.
+      return CloseableIterable.empty();
+    }
+
+    long toSnapshotIdInclusive = toSnapshotIdInclusive();
+    Long fromSnapshotIdExclusive = fromSnapshotIdExclusive(toSnapshotIdInclusive);
+
+    if (fromSnapshotIdExclusive != null) {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              fromSnapshotIdExclusive,
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              false));
+    } else {
+      Listeners.notifyAll(
+          new IncrementalScanEvent(
+              table().name(),
+              SnapshotUtil.oldestAncestorOf(table(), toSnapshotIdInclusive).snapshotId(),
+              toSnapshotIdInclusive,
+              filter(),
+              schema(),
+              true));
+    }
+
+    return doPlanFiles(fromSnapshotIdExclusive, toSnapshotIdInclusive);
+  }
+
+  private boolean scanCurrentLineage() {
+    return context().fromSnapshotId() == null && context().toSnapshotId() == null;
+  }
+
+  private long toSnapshotIdInclusive() {
+    if (context().toSnapshotId() != null) {
+      return context().toSnapshotId();
+    } else {
+      Snapshot currentSnapshot = table().currentSnapshot();
+      Preconditions.checkArgument(
+          currentSnapshot != null, "End snapshot is not set and table has no current snapshot");
+      return currentSnapshot.snapshotId();
+    }
+  }
+
+  private Long fromSnapshotIdExclusive(long toSnapshotIdInclusive) {
+    Long fromSnapshotId = context().fromSnapshotId();
+
+    if (fromSnapshotId == null) {
+      return null;
+
+    } else if (context().fromSnapshotInclusive()) {

Review Comment:
   No preference on my side. Changed it to be nested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org