You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "alexjo2144 (via GitHub)" <gi...@apache.org> on 2023/04/11 20:07:00 UTC

[GitHub] [iceberg] alexjo2144 commented on a diff in pull request #6182: Core: Support IncrementalChangelogScan with deletes.

alexjo2144 commented on code in PR #6182:
URL: https://github.com/apache/iceberg/pull/6182#discussion_r1163245890


##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -64,32 +66,48 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
       return CloseableIterable.empty();
     }
 
-    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+    Map<Long, SnapshotInfo> changelogSnapshotInfos = computeSnapshotInfos(table(), changelogSnapshots);
 
-    Set<ManifestFile> newDataManifests =
-        FluentIterable.from(changelogSnapshots)
-            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
-            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
-            .toSet();
+    boolean containAddedDeleteFiles = false;
+    boolean containRemovedDataFiles = false;
+    for (SnapshotInfo info : changelogSnapshotInfos.values()) {
+      containAddedDeleteFiles = containAddedDeleteFiles || !info.addedDeleteFiles().isEmpty();
+      containRemovedDataFiles = containRemovedDataFiles || info.hasRemovedDataFiles();

Review Comment:
   You could short-circuit this loop if both are true. Only matters if the set of snapshot infos may be large though.



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -64,32 +66,48 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
       return CloseableIterable.empty();
     }
 
-    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+    Map<Long, SnapshotInfo> changelogSnapshotInfos = computeSnapshotInfos(table(), changelogSnapshots);
 
-    Set<ManifestFile> newDataManifests =
-        FluentIterable.from(changelogSnapshots)
-            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
-            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
-            .toSet();
+    boolean containAddedDeleteFiles = false;
+    boolean containRemovedDataFiles = false;
+    for (SnapshotInfo info : changelogSnapshotInfos.values()) {
+      containAddedDeleteFiles = containAddedDeleteFiles || !info.addedDeleteFiles().isEmpty();
+      containRemovedDataFiles = containRemovedDataFiles || info.hasRemovedDataFiles();
+    }
+
+    Set<ManifestFile> dataManifests;
+    Set<ManifestFile> deleteManifests;
+    if (containAddedDeleteFiles) {
+      // scan all dataFiles to locate the deleted record and ensure that this record has not been deleted before
+      dataManifests = Sets.newHashSet(changelogSnapshots.getLast().dataManifests(tableOps().io()));

Review Comment:
   Why can you get away with just the last snapshot in the queue here, but in the other branch you have to iterator over all of them?



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -64,32 +66,48 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
       return CloseableIterable.empty();
     }
 
-    Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
+    Map<Long, SnapshotInfo> changelogSnapshotInfos = computeSnapshotInfos(table(), changelogSnapshots);
 
-    Set<ManifestFile> newDataManifests =
-        FluentIterable.from(changelogSnapshots)
-            .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
-            .filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
-            .toSet();
+    boolean containAddedDeleteFiles = false;
+    boolean containRemovedDataFiles = false;
+    for (SnapshotInfo info : changelogSnapshotInfos.values()) {
+      containAddedDeleteFiles = containAddedDeleteFiles || !info.addedDeleteFiles().isEmpty();
+      containRemovedDataFiles = containRemovedDataFiles || info.hasRemovedDataFiles();
+    }
+
+    Set<ManifestFile> dataManifests;
+    Set<ManifestFile> deleteManifests;
+    if (containAddedDeleteFiles) {
+      // scan all dataFiles to locate the deleted record and ensure that this record has not been deleted before
+      dataManifests = Sets.newHashSet(changelogSnapshots.getLast().dataManifests(tableOps().io()));
+      deleteManifests = Sets.newHashSet(changelogSnapshots.getLast().deleteManifests(tableOps().io()));
+    } else {
+      dataManifests = FluentIterable.from(changelogSnapshots)
+          .transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
+          .filter(manifest -> changelogSnapshotInfos.containsKey(manifest.snapshotId()))
+          .toSet();
+
+      // scan all deleteFiles to locate the deleted records when there are removed data files
+      deleteManifests = !containRemovedDataFiles ? ImmutableSet.of() :
+          Sets.newHashSet(changelogSnapshots.getLast().deleteManifests(tableOps().io()));
+    }
 
     ManifestGroup manifestGroup =
-        new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
+        new ManifestGroup(table().io(), dataManifests, deleteManifests)
             .specsById(table().specs())
             .caseSensitive(isCaseSensitive())
             .select(scanColumns())
-            .filterData(filter())
-            .filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
-            .ignoreExisting();

Review Comment:
   Were these two lines redundant, or are they no longer needed for another reason?



##########
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java:
##########
@@ -105,79 +123,161 @@ private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl
 
     for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {
       if (!snapshot.operation().equals(DataOperations.REPLACE)) {
-        if (snapshot.deleteManifests(table().io()).size() > 0) {
-          throw new UnsupportedOperationException(
-              "Delete files are currently not supported in changelog scans");
-        }
-
         changelogSnapshots.addFirst(snapshot);
       }
     }
 
     return changelogSnapshots;
   }
 
-  private Set<Long> toSnapshotIds(Collection<Snapshot> snapshots) {
-    return snapshots.stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
-  }
-
-  private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapshots) {
-    Map<Long, Integer> snapshotOrdinals = Maps.newHashMap();
+  private static Map<Long, SnapshotInfo> computeSnapshotInfos(Table table, Deque<Snapshot> snapshots) {
+    Map<Long, SnapshotInfo> snapshotInfos = Maps.newHashMap();
 
     int ordinal = 0;
-
     for (Snapshot snapshot : snapshots) {
-      snapshotOrdinals.put(snapshot.snapshotId(), ordinal++);
+      Set<CharSequence> removedDataFiles = FluentIterable
+          .from(snapshot.removedDataFiles(table.io()))
+          .transform(ContentFile::path)
+          .toSet();
+
+      Set<CharSequence> addedDeleteFiles = FluentIterable
+          .from(snapshot.addedDeleteFiles(table.io()))
+          .transform(ContentFile::path)
+          .toSet();
+      snapshotInfos.put(snapshot.snapshotId(),
+          new SnapshotInfo(snapshot.snapshotId(), ordinal++, removedDataFiles.isEmpty(), addedDeleteFiles));
     }
 
-    return snapshotOrdinals;
+    return snapshotInfos;
   }
 
-  private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> {
-    private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
-
-    private final Map<Long, Integer> snapshotOrdinals;
+  private static class CreateChangelogScanTaskTasks implements CreateTasksFunction<ChangelogScanTask> {
+    private final Map<Long, Integer> snapshotOrdinals = Maps.newHashMap();
+    private final Map<CharSequence, Long> addedDeleteSnapshotIds = Maps.newHashMap();
 
-    CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
-      this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
+    CreateChangelogScanTaskTasks(Map<Long, SnapshotInfo> snapshotInfos) {
+      for (Map.Entry<Long, SnapshotInfo> kv : snapshotInfos.entrySet()) {
+        snapshotOrdinals.put(kv.getKey(), kv.getValue().ordinals());
+        kv.getValue().addedDeleteFiles().forEach(file -> addedDeleteSnapshotIds.put(file, kv.getKey()));
+      }
     }
 
     @Override
     public CloseableIterable<ChangelogScanTask> apply(
         CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext context) {
 
-      return CloseableIterable.transform(
+      return CloseableIterable.filter(CloseableIterable.transform(
           entries,
           entry -> {
             long commitSnapshotId = entry.snapshotId();
-            int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
             DataFile dataFile = entry.file().copy(context.shouldKeepStats());
+            DeleteFile[] deleteFiles = context.deletes().forEntry(entry);
 
             switch (entry.status()) {
               case ADDED:
-                return new BaseAddedRowsScanTask(
-                    changeOrdinal,
-                    commitSnapshotId,
-                    dataFile,
-                    NO_DELETES,
-                    context.schemaAsString(),
-                    context.specAsString(),
-                    context.residuals());
-
+                return snapshotOrdinals.containsKey(commitSnapshotId) ?
+                  getAddedRowsScanTask(commitSnapshotId, dataFile, deleteFiles, context) :
+                  getDeletedRowsScanTask(commitSnapshotId, dataFile, deleteFiles, context);
               case DELETED:
-                return new BaseDeletedDataFileScanTask(
-                    changeOrdinal,
-                    commitSnapshotId,
-                    dataFile,
-                    NO_DELETES,
-                    context.schemaAsString(),
-                    context.specAsString(),
-                    context.residuals());
-
+                return getDeletedDataFileScanTask(commitSnapshotId, dataFile, deleteFiles, context);
+              case EXISTING:
+                return getDeletedRowsScanTask(commitSnapshotId, dataFile, deleteFiles, context);
               default:
                 throw new IllegalArgumentException("Unexpected entry status: " + entry.status());
             }
-          });
+          }), Objects::nonNull);
+    }
+
+    private ChangelogScanTask getAddedRowsScanTask(
+        long commitSnapshotId, DataFile dataFile, DeleteFile[] deleteFiles, TaskContext context) {
+      return new BaseAddedRowsScanTask(
+          snapshotOrdinals.get(commitSnapshotId),
+          commitSnapshotId,
+          dataFile,
+          deleteFiles,
+          context.schemaAsString(),
+          context.specAsString(),
+          context.residuals());
+    }
+
+    private ChangelogScanTask getDeletedDataFileScanTask(
+        long commitSnapshotId, DataFile dataFile, DeleteFile[] deleteFiles, TaskContext context) {
+      if (snapshotOrdinals.containsKey(commitSnapshotId)) {
+        return new BaseDeletedDataFileScanTask(
+            snapshotOrdinals.get(commitSnapshotId),
+            commitSnapshotId,
+            dataFile,
+            deleteFiles,
+            context.schemaAsString(),
+            context.specAsString(),
+            context.residuals());
+      } else {
+        // ignore removed data files.

Review Comment:
   This case where the commit snapshot id doesn't exist in `snapshotOrdinals` didn't seem to be possible before, what changed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org