You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "amogh-jahagirdar (via GitHub)" <gi...@apache.org> on 2023/05/19 18:02:12 UTC

[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7651: Core: Compacted position delete files should use the max data sequence number of source files

amogh-jahagirdar commented on code in PR #7651:
URL: https://github.com/apache/iceberg/pull/7651#discussion_r1199232563


##########
core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java:
##########
@@ -38,12 +38,18 @@
 public class RewritePositionDeletesGroup {
   private final FileGroupInfo info;
   private final List<PositionDeletesScanTask> tasks;
+  private final long maxRewrittenDataSequenceNumber;
 
   private Set<DeleteFile> addedDeleteFiles = Collections.emptySet();
 
   public RewritePositionDeletesGroup(FileGroupInfo info, List<PositionDeletesScanTask> tasks) {
     this.info = info;
     this.tasks = tasks;
+    this.maxRewrittenDataSequenceNumber =
+        tasks.stream()
+            .map(t -> t.file().dataSequenceNumber())
+            .max(Long::compare)
+            .orElseThrow(() -> new IllegalArgumentException("Empty file group"));

Review Comment:
   Nit: Could we just Preconditions.checkArgument the passed in task list?



##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java:
##########
@@ -739,4 +789,43 @@ private void checkResult(
         size(newDeletes),
         result.rewriteResults().stream().mapToLong(FileGroupRewriteResult::addedBytesCount).sum());
   }
+
+  private void checkSequenceNumbers(
+      Table table, List<DeleteFile> rewrittenDeletes, List<DeleteFile> addedDeletes) {
+    StructLikeMap<List<DeleteFile>> rewrittenFilesPerPartition =
+        groupPerPartition(table, rewrittenDeletes);
+    StructLikeMap<List<DeleteFile>> addedFilesPerPartition = groupPerPartition(table, addedDeletes);
+    for (StructLike partition : rewrittenFilesPerPartition.keySet()) {
+      Long maxRewrittenSeq =
+          rewrittenFilesPerPartition.get(partition).stream()
+              .map(ContentFile::dataSequenceNumber)
+              .max(Long::compare)
+              .get();
+      List<DeleteFile> addedPartitionFiles = addedFilesPerPartition.get(partition);
+      if (addedPartitionFiles != null) {
+        addedPartitionFiles.forEach(
+            d ->
+                Assert.assertEquals(
+                    "Sequence number should be max of rewritten set",
+                    d.dataSequenceNumber(),
+                    maxRewrittenSeq));
+      }
+    }
+  }
+
+  private StructLikeMap<List<DeleteFile>> groupPerPartition(
+      Table table, List<DeleteFile> deleteFiles) {
+    StructLikeMap<List<DeleteFile>> result =
+        StructLikeMap.create(Partitioning.partitionType(table));
+    for (DeleteFile deleteFile : deleteFiles) {
+      StructLike partition = deleteFile.partition();
+      List<DeleteFile> partitionFiles = result.get(partition);
+      if (partitionFiles == null) {
+        partitionFiles = Lists.newArrayList();
+      }

Review Comment:
   Nit: newline after the if block



##########
core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java:
##########
@@ -1129,4 +1145,39 @@ protected ManifestReader<DeleteFile> newManifestReader(ManifestFile manifest) {
       return MergingSnapshotProducer.this.newDeleteManifestReader(manifest);
     }
   }
+
+  private static class DeleteFileHolder {
+    private final DeleteFile deleteFile;
+    private final Long dataSequenceNumber;
+
+    /**
+     * Queue a delete file for commit with a given data sequence number

Review Comment:
   Nit: Not sure if `queue` is the right word here, it's a bit redundant considering the class name but could we just use the word `Holds` or `Contains`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org