You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/11/14 03:56:04 UTC

[GitHub] [iceberg] Reo-LEI opened a new pull request, #6182: Core: Support IncrementalChangelogScan with deletes.

Reo-LEI opened a new pull request, #6182:
URL: https://github.com/apache/iceberg/pull/6182

   This PR is trying to support `IncrementalChangelogScan` with deletes after https://github.com/apache/iceberg/pull/5382
   
   Any feedback is welcome!
   cc @rdblue @aokolnychyi @flyrain @stevenzwu @kbendick @chenjunjiedada 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] alexjo2144 commented on a diff in pull request #6182: Core: Support IncrementalChangelogScan with deletes.

Posted by "alexjo2144 (via GitHub)" <gi...@apache.org>.
alexjo2144 commented on code in PR #6182:
URL: https://github.com/apache/iceberg/pull/6182#discussion_r1163245890


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -64,32 +66,48 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
       return CloseableIterable.empty();
     }
 
-    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+    Map<Long, SnapshotInfo> changelogSnapshotInfos = computeSnapshotInfos(table(), changelogSnapshots);
 
-    Set<ManifestFile> newDataManifests =
-        FluentIterable.from(changelogSnapshots)
-            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
-            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
-            .toSet();
+    boolean containAddedDeleteFiles = false;
+    boolean containRemovedDataFiles = false;
+    for (SnapshotInfo info : changelogSnapshotInfos.values()) {
+      containAddedDeleteFiles = containAddedDeleteFiles || !info.addedDeleteFiles().isEmpty();
+      containRemovedDataFiles = containRemovedDataFiles || info.hasRemovedDataFiles();

Review Comment:
   You could short-circuit this loop if both are true. Only matters if the set of snapshot infos may be large though.



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -64,32 +66,48 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
       return CloseableIterable.empty();
     }
 
-    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+    Map<Long, SnapshotInfo> changelogSnapshotInfos = computeSnapshotInfos(table(), changelogSnapshots);
 
-    Set<ManifestFile> newDataManifests =
-        FluentIterable.from(changelogSnapshots)
-            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
-            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
-            .toSet();
+    boolean containAddedDeleteFiles = false;
+    boolean containRemovedDataFiles = false;
+    for (SnapshotInfo info : changelogSnapshotInfos.values()) {
+      containAddedDeleteFiles = containAddedDeleteFiles || !info.addedDeleteFiles().isEmpty();
+      containRemovedDataFiles = containRemovedDataFiles || info.hasRemovedDataFiles();
+    }
+
+    Set<ManifestFile> dataManifests;
+    Set<ManifestFile> deleteManifests;
+    if (containAddedDeleteFiles) {
+      // scan all dataFiles to locate the deleted record and ensure that this record has not been deleted before
+      dataManifests = Sets.newHashSet(changelogSnapshots.getLast().dataManifests(tableOps().io()));

Review Comment:
   Why can you get away with just the last snapshot in the queue here, but in the other branch you have to iterator over all of them?



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -64,32 +66,48 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
       return CloseableIterable.empty();
     }
 
-    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+    Map<Long, SnapshotInfo> changelogSnapshotInfos = computeSnapshotInfos(table(), changelogSnapshots);
 
-    Set<ManifestFile> newDataManifests =
-        FluentIterable.from(changelogSnapshots)
-            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
-            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
-            .toSet();
+    boolean containAddedDeleteFiles = false;
+    boolean containRemovedDataFiles = false;
+    for (SnapshotInfo info : changelogSnapshotInfos.values()) {
+      containAddedDeleteFiles = containAddedDeleteFiles || !info.addedDeleteFiles().isEmpty();
+      containRemovedDataFiles = containRemovedDataFiles || info.hasRemovedDataFiles();
+    }
+
+    Set<ManifestFile> dataManifests;
+    Set<ManifestFile> deleteManifests;
+    if (containAddedDeleteFiles) {
+      // scan all dataFiles to locate the deleted record and ensure that this record has not been deleted before
+      dataManifests = Sets.newHashSet(changelogSnapshots.getLast().dataManifests(tableOps().io()));
+      deleteManifests = Sets.newHashSet(changelogSnapshots.getLast().deleteManifests(tableOps().io()));
+    } else {
+      dataManifests = FluentIterable.from(changelogSnapshots)
+          .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+          .filter(manifest -> changelogSnapshotInfos.containsKey(manifest.snapshotId()))
+          .toSet();
+
+      // scan all deleteFiles to locate the deleted records when there are removed data files
+      deleteManifests = !containRemovedDataFiles ? ImmutableSet.of() :
+          Sets.newHashSet(changelogSnapshots.getLast().deleteManifests(tableOps().io()));
+    }
 
     ManifestGroup manifestGroup =
-        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+        new ManifestGroup(table().io(), dataManifests, deleteManifests)
             .specsById(table().specs())
             .caseSensitive(isCaseSensitive())
             .select(scanColumns())
-            .filterData(filter())
-            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
-            .ignoreExisting();

Review Comment:
   Were these two lines redundant, or are they no longer needed for another reason?



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -105,79 +123,161 @@ private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl
 
     for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {
       if (!snapshot.operation().equals(DataOperations.REPLACE)) {
-        if (snapshot.deleteManifests(table().io()).size() > 0) {
-          throw new UnsupportedOperationException(
-              "Delete files are currently not supported in changelog scans");
-        }
-
         changelogSnapshots.addFirst(snapshot);
       }
     }
 
     return changelogSnapshots;
   }
 
-  private Set<Long> toSnapshotIds(Collection<Snapshot> snapshots) {
-    return snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
-  }
-
-  private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapshots) {
-    Map<Long, Integer> snapshotOrdinals = Maps.newHashMap();
+  private static Map<Long, SnapshotInfo> computeSnapshotInfos(Table table, Deque<Snapshot> snapshots) {
+    Map<Long, SnapshotInfo> snapshotInfos = Maps.newHashMap();
 
     int ordinal = 0;
-
     for (Snapshot snapshot : snapshots) {
-      snapshotOrdinals.put(snapshot.snapshotId(), ordinal++);
+      Set<CharSequence> removedDataFiles = FluentIterable
+          .from(snapshot.removedDataFiles(table.io()))
+          .transform(ContentFile::path)
+          .toSet();
+
+      Set<CharSequence> addedDeleteFiles = FluentIterable
+          .from(snapshot.addedDeleteFiles(table.io()))
+          .transform(ContentFile::path)
+          .toSet();
+      snapshotInfos.put(snapshot.snapshotId(),
+          new SnapshotInfo(snapshot.snapshotId(), ordinal++, removedDataFiles.isEmpty(), addedDeleteFiles));
     }
 
-    return snapshotOrdinals;
+    return snapshotInfos;
   }
 
-  private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> {
-    private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
-
-    private final Map<Long, Integer> snapshotOrdinals;
+  private static class CreateChangelogScanTaskTasks implements CreateTasksFunction<ChangelogScanTask> {
+    private final Map<Long, Integer> snapshotOrdinals = Maps.newHashMap();
+    private final Map<CharSequence, Long> addedDeleteSnapshotIds = Maps.newHashMap();
 
-    CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
-      this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+    CreateChangelogScanTaskTasks(Map<Long, SnapshotInfo> snapshotInfos) {
+      for (Map.Entry<Long, SnapshotInfo> kv : snapshotInfos.entrySet()) {
+        snapshotOrdinals.put(kv.getKey(), kv.getValue().ordinals());
+        kv.getValue().addedDeleteFiles().forEach(file -> addedDeleteSnapshotIds.put(file, kv.getKey()));
+      }
     }
 
     @Override
     public CloseableIterable<ChangelogScanTask> apply(
         CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext context) {
 
-      return CloseableIterable.transform(
+      return CloseableIterable.filter(CloseableIterable.transform(
           entries,
           entry -> {
             long commitSnapshotId = entry.snapshotId();
-            int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
             DataFile dataFile = entry.file().copy(context.shouldKeepStats());
+            DeleteFile[] deleteFiles = context.deletes().forEntry(entry);
 
             switch (entry.status()) {
               case ADDED:
-                return new BaseAddedRowsScanTask(
-                    changeOrdinal,
-                    commitSnapshotId,
-                    dataFile,
-                    NO_DELETES,
-                    context.schemaAsString(),
-                    context.specAsString(),
-                    context.residuals());
-
+                return snapshotOrdinals.containsKey(commitSnapshotId) ?
+                  getAddedRowsScanTask(commitSnapshotId, dataFile, deleteFiles, context) :
+                  getDeletedRowsScanTask(commitSnapshotId, dataFile, deleteFiles, context);
               case DELETED:
-                return new BaseDeletedDataFileScanTask(
-                    changeOrdinal,
-                    commitSnapshotId,
-                    dataFile,
-                    NO_DELETES,
-                    context.schemaAsString(),
-                    context.specAsString(),
-                    context.residuals());
-
+                return getDeletedDataFileScanTask(commitSnapshotId, dataFile, deleteFiles, context);
+              case EXISTING:
+                return getDeletedRowsScanTask(commitSnapshotId, dataFile, deleteFiles, context);
               default:
                 throw new IllegalArgumentException("Unexpected entry status: " + entry.status());
             }
-          });
+          }), Objects::nonNull);
+    }
+
+    private ChangelogScanTask getAddedRowsScanTask(
+        long commitSnapshotId, DataFile dataFile, DeleteFile[] deleteFiles, TaskContext context) {
+      return new BaseAddedRowsScanTask(
+          snapshotOrdinals.get(commitSnapshotId),
+          commitSnapshotId,
+          dataFile,
+          deleteFiles,
+          context.schemaAsString(),
+          context.specAsString(),
+          context.residuals());
+    }
+
+    private ChangelogScanTask getDeletedDataFileScanTask(
+        long commitSnapshotId, DataFile dataFile, DeleteFile[] deleteFiles, TaskContext context) {
+      if (snapshotOrdinals.containsKey(commitSnapshotId)) {
+        return new BaseDeletedDataFileScanTask(
+            snapshotOrdinals.get(commitSnapshotId),
+            commitSnapshotId,
+            dataFile,
+            deleteFiles,
+            context.schemaAsString(),
+            context.specAsString(),
+            context.residuals());
+      } else {
+        // ignore removed data files.

Review Comment:
   This case where the commit snapshot id doesn't exist in `snapshotOrdinals` didn't seem to be possible before, what changed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #6182: Core: Support IncrementalChangelogScan with deletes.

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #6182:
URL: https://github.com/apache/iceberg/pull/6182#issuecomment-1456639264

   Sorry for the delay on this one. I will get to it as soon as I can.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #6182: Core: Support IncrementalChangelogScan with deletes.

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #6182:
URL: https://github.com/apache/iceberg/pull/6182#issuecomment-1314530819

   I had a local PoC for this which was blocked by some changes around sequence numbers. I'll try to find some time to review this PR later this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Core: Support IncrementalChangelogScan with deletes. [iceberg]

Posted by "Reo-LEI (via GitHub)" <gi...@apache.org>.
Reo-LEI commented on PR #6182:
URL: https://github.com/apache/iceberg/pull/6182#issuecomment-1954152890

   > @Reo-LEI and @aokolnychyi, are you still working on this?
   
   Sorry for the late reply. I haven't been doing this recently. You can continue to push forward. @manuzhang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] nastra commented on pull request #6182: Core: Support IncrementalChangelogScan with deletes.

Posted by GitBox <gi...@apache.org>.
nastra commented on PR #6182:
URL: https://github.com/apache/iceberg/pull/6182#issuecomment-1359525244

   @Reo-LEI @aokolnychyi any updates on this one? Would be great to get this in


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Core: Support IncrementalChangelogScan with deletes. [iceberg]

Posted by "manuzhang (via GitHub)" <gi...@apache.org>.
manuzhang commented on PR #6182:
URL: https://github.com/apache/iceberg/pull/6182#issuecomment-1923299165

   @Reo-LEI and @aokolnychyi, are you still working on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Core: Support IncrementalChangelogScan with deletes. [iceberg]

Posted by "manuzhang (via GitHub)" <gi...@apache.org>.
manuzhang commented on PR #6182:
URL: https://github.com/apache/iceberg/pull/6182#issuecomment-1982827028

   I just submitted https://github.com/apache/iceberg/pull/9888


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org