You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/10/14 01:59:04 UTC

[GitHub] [iceberg] amogh-jahagirdar opened a new pull request, #5981: Core: Parallelize the determining of reachable manifests during snapshot expiration file cleanp

amogh-jahagirdar opened a new pull request, #5981:
URL: https://github.com/apache/iceberg/pull/5981

   Follow up PR to https://github.com/apache/iceberg/pull/5669. 
   
   1.) Parallelizing determining the reachable manifests given a set of snapshots
   
   2.) Cleaning up issues in log messages https://github.com/apache/iceberg/pull/5669#discussion_r993986722 and  https://github.com/apache/iceberg/pull/5669#discussion_r993984959
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r999756396


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -63,39 +63,83 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
         }
       }
     }
-
-    Set<ManifestFile> candidateManifestFilesForDeletion = readManifests(expiredSnapshots);
-    Set<ManifestFile> manifestFilesAfterExpiration = readManifests(snapshotsAfterExpiration);
-
-    Set<ManifestFile> manifestsToDelete = Sets.newHashSet();
-    for (ManifestFile candidateManifestFile : candidateManifestFilesForDeletion) {
-      if (!manifestFilesAfterExpiration.contains(candidateManifestFile)) {
-        manifestsToDelete.add(candidateManifestFile);
+    Set<ManifestFile> deletionCandidates = readManifests(expiredSnapshots);
+
+    if (!deletionCandidates.isEmpty()) {
+      Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+      Set<ManifestFile> manifestsToDelete =
+          pruneReferencedManifests(
+              snapshotsAfterExpiration, deletionCandidates, currentManifests::add);
+
+      if (!manifestsToDelete.isEmpty()) {
+        Set<String> dataFilesToDelete = findFilesToDelete(manifestsToDelete, currentManifests);
+        deleteFiles(dataFilesToDelete, "data");
+        Set<String> manifestPathsToDelete =
+            manifestsToDelete.stream().map(ManifestFile::path).collect(Collectors.toSet());
+        deleteFiles(manifestPathsToDelete, "manifest");
       }
     }
 
-    Set<String> dataFilesToDelete =
-        findFilesToDelete(manifestsToDelete, manifestFilesAfterExpiration);
-    deleteFiles(dataFilesToDelete, "data");
-    Set<String> manifestPathsToDelete =
-        manifestsToDelete.stream().map(ManifestFile::path).collect(Collectors.toSet());
-
-    deleteFiles(manifestPathsToDelete, "manifest");
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
+  private Set<ManifestFile> pruneReferencedManifests(
+      Set<Snapshot> snapshots,
+      Set<ManifestFile> deletionCandidates,
+      Consumer<ManifestFile> currentManifestCallback) {
+    Set<ManifestFile> candidateSet = ConcurrentHashMap.newKeySet();
+    candidateSet.addAll(deletionCandidates);
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFiles = readManifests(snapshot)) {
+                for (ManifestFile manifestFile : manifestFiles) {
+                  candidateSet.remove(manifestFile);
+                  if (candidateSet.isEmpty()) {
+                    return;
+                  }
+
+                  currentManifestCallback.accept(manifestFile.copy());

Review Comment:
   Very minor, but it seems weird to copy here rather than in the callback. If the callback were a noop, we'd be doing work for nothing. Since we know it needs to be copied, it seems fine though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996382670


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;
     }
 
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFiles = readManifests(snapshot)) {
+                for (ManifestFile manifestFile : manifestFiles) {
+                  manifestsToDelete.remove(manifestFile);
+                  if (manifestsToDelete.isEmpty()) {
+                    return;
+                  }
+
+                  currentManifests.add(manifestFile.copy());

Review Comment:
   This seems potentially incorrect to me. It seems like we could get different results in `currentManifests` based on whether or not there are more manifests left to delete.
   
   Specifically, if some manifest file M is the last remaining manifest to delete, then it won't be included in `currentManifests`.
   
   However, if the snapshots are evaluated in an ordering such that M is considered while there are still manigests left to delete, then M _will be_ included in `currentManifests`.
   
   This behavior that depends on ordering is suspicious to me, though I was admittedly not involved in the previous PR review. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during snapshot expiration file cleanp

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r995269913


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -64,15 +65,8 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
       }
     }
 
-    Set<ManifestFile> candidateManifestFilesForDeletion = readManifests(expiredSnapshots);
     Set<ManifestFile> manifestFilesAfterExpiration = readManifests(snapshotsAfterExpiration);
-
-    Set<ManifestFile> manifestsToDelete = Sets.newHashSet();
-    for (ManifestFile candidateManifestFile : candidateManifestFilesForDeletion) {
-      if (!manifestFilesAfterExpiration.contains(candidateManifestFile)) {
-        manifestsToDelete.add(candidateManifestFile);
-      }
-    }
+    Set<ManifestFile> manifestsToDelete = manifestFilesToDelete(manifestFilesAfterExpiration, expiredSnapshots);

Review Comment:
   With the task based approach we should guarantee if there's a failure that it gets thrown and doesn't proceed to the next step. I've made sure to throw in case determining manifests fails 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996248026


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -85,19 +79,60 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
   }
 
   private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
-    }
+    Set<ManifestFile> manifests = ConcurrentHashMap.newKeySet();
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFilesForSnapshot =
+                  readManifestFiles(snapshot)) {
+                for (ManifestFile manifestFile : manifestFilesForSnapshot) {
+                  manifests.add(manifestFile.copy());
+                }
+              } catch (IOException e) {
+                throw new RuntimeIOException(
+                    e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
+              }
+            });
+
+    return manifests;
+  }
+
+  private Set<ManifestFile> manifestFilesToDelete(
+      Set<ManifestFile> currentManifests, Set<Snapshot> expiredSnapshots) {

Review Comment:
   Updated 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996389820


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   I think this is ultimately a naming problem, I struggled to find a concise name since it's really doing multiple things:
   
   1.) The function is reading the current manifests and pruning out the candidate manifests to delete at the same time. The reason it is done this way is ultimately for this procedure we are optimizing for memory usage, and the current manifests are expected to be a larger set on average compared to the manifests to delete. 
   
   2.) Following 1. what this means is the procedure should aim to minimize the amount of the current table state to read. The procedure can stop reading manifests in the current table state if all the candidate manifests have been pruned.
   
   >Generally speaking, that's not true to say that if there are no manifests to delete, there are no current manifests.
   
   3.) You're definitely correct here but from a "deletion correctness"  standpoint this state is still good because if there are no manifests to delete this means that there are no data files we can actually delete and subsequent findFilesToDelete operations are guaranteed to be no-ops.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996532693


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +77,74 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  // Returns the current set of manifests while pruning the manifests to delete
+  // If all candidate manifests are pruned the returned set of current manifests will be empty
+  private Set<ManifestFile> findCurrentManifestsAndPruneCandidates(
+      Set<Snapshot> snapshots, Set<ManifestFile> deletionCandidates) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (deletionCandidates.isEmpty()) {

Review Comment:
   I think the caller should check for this. If the candidate set is empty before calling this, then the whole operation should short circuit and return, not just this operation. That avoids the awkwardness of getting an empty set of `currentManifests` that is probably not correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996466631


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   I renamed to pruneManifestsToDelete and added some inliine comments; although still not great naming since the return set is the current manifests. 
   
   I think another way to make it more explicit and avoid the mutating of state of the candidate manifests to delete is to have the function return a Pair<Set<ManifestFile>, Set<ManifestFIle>> , where the first entry is the current manifests and the second is the manifests that can safely be removed, and these 2 sets can be used when determining the data files to delete later on. I initially avoided this since the Pair seemed a bit more complex than just mutating the state, 
   
   Let me know which approach you find preferable @kbendick @rdblue 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996201966


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -85,19 +79,60 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
   }
 
   private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
-    }
+    Set<ManifestFile> manifests = ConcurrentHashMap.newKeySet();
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFilesForSnapshot =
+                  readManifestFiles(snapshot)) {
+                for (ManifestFile manifestFile : manifestFilesForSnapshot) {
+                  manifests.add(manifestFile.copy());
+                }
+              } catch (IOException e) {
+                throw new RuntimeIOException(
+                    e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
+              }
+            });
+
+    return manifests;
+  }
+
+  private Set<ManifestFile> manifestFilesToDelete(
+      Set<ManifestFile> currentManifests, Set<Snapshot> expiredSnapshots) {

Review Comment:
   Yeah in my mind I was thinking in the worst case we anyways need to read the current manifests for the data files reachability and structuring the code this way allows the set of current manifests to be re-used during the reachable data file analysis. But it's important to consider the average/common case more and I think i can structure the code in a readable way to re-use the determined set of current manifests. will update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996151983


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -85,19 +79,60 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
   }
 
   private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
-    }
+    Set<ManifestFile> manifests = ConcurrentHashMap.newKeySet();
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFilesForSnapshot =

Review Comment:
   Less verbose variable names would help statements fit on one line. I think it's pretty clear where the manifest files come from, so you can probably use `manifests` rather than `manifestFilesForSnapshot`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996389820


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   I think this is ultimately a naming problem, I struggled to find a concise name since it's really doing multiple things:
   
   1.) The function is reading the current manifests and pruning out the candidate manifests to delete at the same time. The reason it is done this way is ultimately for this procedure we are optimizing for memory usage, and the current manifests are expected to be a larger set on average compared to the manifests to delete. 
   
   2.) Following 1. what this means is the procedure should aim to minimize the amount of the current table state to read. The procedure can stop reading manifests in the current table state if all the candidate manifests for deletion have been pruned.  That's because if all the candidate manifests for deletion have been pruned there's no data files to delete.
   
   >Generally speaking, that's not true to say that if there are no manifests to delete, there are no current manifests.
   
   3.) You're definitely correct here but from a "deletion correctness"  standpoint this state is still good because if there are no manifests to delete this means that there are no data files we can actually delete and subsequent findFilesToDelete operations are guaranteed to be no-ops.
   
   Let me know if there's still any concerns on correctness, right now the TestRemoveSnapshots goes through both strategies but I'll think if there's any cases missing . Also definitely open to better names here,  and I'll also look into structuring/commenting the code so it's more clear. I think inline comments can probably go a long way here.



##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   I think this is ultimately a naming problem, I struggled to find a concise name since it's really doing multiple things:
   
   1.) The function is reading the current manifests and pruning out the candidate manifests to delete at the same time. The reason it is done this way is ultimately for this procedure we are optimizing for memory usage, and the current manifests are expected to be a larger set on average compared to the manifests to delete. 
   
   2.) Following 1. what this means is the procedure should aim to minimize the amount of the current table state to read. The procedure can stop reading manifests in the current table state if all the candidate manifests for deletion have been pruned.  That's because if all the candidate manifests for deletion have been pruned there's no data files to delete.
   
   >Generally speaking, that's not true to say that if there are no manifests to delete, there are no current manifests.
   
   3.) You're definitely correct here but from a "deletion correctness"  standpoint this state is still good because if there are no manifests to delete this means that there are no data files we can actually delete and subsequent findFilesToDelete operations are guaranteed to be no-ops.
   
   Let me know if there's still any concerns on correctness, right now the TestRemoveSnapshots goes through both strategies but I'll think if there's any cases missing. Also definitely open to better names here,  and I'll also look into structuring/commenting the code so it's more clear. I think inline comments can probably go a long way here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996534140


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   Ha, I just commented above that I'd prefer the Pair option. This is just an awkward case.
   
   What about adding a callback to do this? Like this:
   
   ```java
   private Set<ManifestFile> pruneReferencedManifests(Set<ManifestFile> deleteCandidates, Set<Snapshot> currentSnapshots, Consumer<ManifestFile> currentManifestCallback) {
     Set<ManifestFile> candidateSet = ConcurrentHashMap.keySet();
     candidateSet.addAll(deleteCandidates); // ensure the incoming set is thread safe!
     for (ManifestFile manifest : manifestFiles) {
       candidateSet.remove(manifest);
       currentManifestCallback.accept(manifest);
     }
     ...
     return candidateSet;
   }
   
   // when calling pruneReferencedManifests:
   Set<ManifestFile> currentManifests = ConcurrentHashMap.keySet();
   Set<ManifestFile> manifestsToDelete = pruneReferencedManifests(deleteCandidates, currentSnapshots, currentManifests::add);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996380949


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;
     }
 
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFiles = readManifests(snapshot)) {
+                for (ManifestFile manifestFile : manifestFiles) {
+                  manifestsToDelete.remove(manifestFile);

Review Comment:
   Nit: Mutating `manifestsToDelete` is somewhat odd to me / is not expected from the function name & signature.
   
   Maybe naming the method something more explicit, like `findCurrentManifestsAndRemoveFromManifestsToDelete` would be more appropriate? Or maybe breaking this into multiple functions?
   
   If for performance reasons it's best to do it this way, then that makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996380949


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;
     }
 
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFiles = readManifests(snapshot)) {
+                for (ManifestFile manifestFile : manifestFiles) {
+                  manifestsToDelete.remove(manifestFile);

Review Comment:
   Nit: Mutating `manifestsToDelete` is somewhat odd to me / is not expected from the function name & signature.
   
   Maybe naming the method something more explicit, like `findCurrentManifestsAndRemoveFromManifestsToDelete` would be more appropriate? Or maybe breaking this into multiple functions? That name is far too long, but `currentManifests` doesn't seem to capture what this function does given the mutation.
   
   If for performance reasons it's best to do it this way, then that makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996153635


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -85,19 +79,60 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
   }
 
   private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
-    }
+    Set<ManifestFile> manifests = ConcurrentHashMap.newKeySet();
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFilesForSnapshot =
+                  readManifestFiles(snapshot)) {
+                for (ManifestFile manifestFile : manifestFilesForSnapshot) {
+                  manifests.add(manifestFile.copy());
+                }
+              } catch (IOException e) {
+                throw new RuntimeIOException(
+                    e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
+              }
+            });
+
+    return manifests;
+  }
+
+  private Set<ManifestFile> manifestFilesToDelete(
+      Set<ManifestFile> currentManifests, Set<Snapshot> expiredSnapshots) {

Review Comment:
   Why not use the same idea as the file reachability method? We expect the number of current manifests to be much larger than the number of expired manifests, so keeping the expired set in memory is a better option. In addition, if we eliminate all of the expired manifests, we can exit early rather than reading all of the current manifests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996334062


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -85,19 +79,60 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
   }
 
   private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
-    }
+    Set<ManifestFile> manifests = ConcurrentHashMap.newKeySet();
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFilesForSnapshot =

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996389820


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   I think this is ultimately a naming problem, I struggled to find a concise name since it's really doing multiple things:
   
   1.) The function is reading the current manifests and pruning out the candidate manifests to delete at the same time. The reason it is done this way is ultimately for this procedure we are optimizing for memory usage, and the current manifests are expected to be a larger set on average compared to the manifests to delete. 
   
   2.) Following 1. what this means is the procedure should aim to minimize the amount of the current table state to read. The procedure can stop reading manifests in the current table state if all the candidate manifests for deletion have been pruned.  That's because if all the candidate manifests for deletion have been pruned there's no data files to delete.
   
   >Generally speaking, that's not true to say that if there are no manifests to delete, there are no current manifests.
   
   3.) You're definitely correct here but from a "deletion correctness"  standpoint this state is still good because if there are no manifests to delete this means that there are no data files we can actually delete and subsequent findFilesToDelete operations are guaranteed to be no-ops.
   
   Let me know if there's still any concerns on correctness, right now the TestRemoveSnapshots goes through both  incremental and reachable cleanup strategies for all the cases but I'll think if there's any cases missing. Also definitely open to better names here,  and I'll also look into structuring/commenting the code so it's more clear. I think inline comments can probably go a long way here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996553275


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +77,74 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  // Returns the current set of manifests while pruning the manifests to delete
+  // If all candidate manifests are pruned the returned set of current manifests will be empty
+  private Set<ManifestFile> findCurrentManifestsAndPruneCandidates(
+      Set<Snapshot> snapshots, Set<ManifestFile> deletionCandidates) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (deletionCandidates.isEmpty()) {

Review Comment:
   makes sense, I updated so that finding manifests and is only done if there are deletion candidates, I definitely think it's cleaner that way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996380125


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   Correctness Question / Naming Nit -This seems somewhat odd to me. The way the function is used, this might produce correct results.
   
   But looking at the function name / signature directly and the first few lines of code, it's strange to return an empty set as the "current manifests" if the manifests to delete are empty.
   
   Generally speaking, that's not true to say that if there are no manifests to delete, there are no current manifests.
   
   Maybe the function needs to be renamed or the local variable name needs to be renamed?
   
   Again, the usage is very possibly correct, but the name of the function and logic in the base case of empty manifests seems off.
   
   Is it supposed to return an empty set when `snapshots` is empty?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996390379


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;
     }
 
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFiles = readManifests(snapshot)) {
+                for (ManifestFile manifestFile : manifestFiles) {
+                  manifestsToDelete.remove(manifestFile);
+                  if (manifestsToDelete.isEmpty()) {
+                    return;
+                  }
+
+                  currentManifests.add(manifestFile.copy());

Review Comment:
   I think [this](https://github.com/apache/iceberg/pull/5981/files#r996389820) should explain the rationale for why there is correctness, but do please let me know if you still have concerns. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996382670


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;
     }
 
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFiles = readManifests(snapshot)) {
+                for (ManifestFile manifestFile : manifestFiles) {
+                  manifestsToDelete.remove(manifestFile);
+                  if (manifestsToDelete.isEmpty()) {
+                    return;
+                  }
+
+                  currentManifests.add(manifestFile.copy());

Review Comment:
   This seems potentially incorrect to me. It seems like we could get different results in `currentManifests` based on whether or not there are more manifests left to delete.
   
   Specifically, if some manifest file M is the last remaining manifest to delete as the others were considered in previous snapshots, then it _will not_ be included in `currentManifests`.
   
   However, if the snapshots are evaluated in an ordering such that M is considered while there are still manigests left to delete, then M _will be_ included in `currentManifests`.
   
   Do I have that correct or am I misunderstanding what this code is doing?
   
   Any behavior that depends on ordering is somewhat suspicious to me, though I was admittedly not involved in the previous PR review. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996532450


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +77,74 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  // Returns the current set of manifests while pruning the manifests to delete
+  // If all candidate manifests are pruned the returned set of current manifests will be empty
+  private Set<ManifestFile> findCurrentManifestsAndPruneCandidates(

Review Comment:
   I'm not a big fan of doing multiple things in the same helper method. In this case, I'll admit that it makes sense because we don't want to read all the manifest lists twice. Still, I find this a bit awkward. The name is long and it modifies an argument (calling `deletionCandidates.remove`). I'd probably return both sets to avoid modifying the one passed in, but that's just me. I think this is okay.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996466631


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   I renamed to findCurrentManifestsAndPruneCandidates and added some inline comments.
   
   I think another way to make it more explicit and avoid the mutating of state of the candidate manifests to delete is to have the function return a Pair<Set<ManifestFile>, Set<ManifestFIle>> , where the first entry is the current manifests and the second is the manifests that can safely be removed, and these 2 sets can be used when determining the data files to delete later on. I initially avoided this since the Pair seemed a bit more complex than just mutating the state. 
   
   Let me know which approach you find preferable @kbendick @rdblue 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996390379


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;
     }
 
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFiles = readManifests(snapshot)) {
+                for (ManifestFile manifestFile : manifestFiles) {
+                  manifestsToDelete.remove(manifestFile);
+                  if (manifestsToDelete.isEmpty()) {
+                    return;
+                  }
+
+                  currentManifests.add(manifestFile.copy());

Review Comment:
   > It seems like we could get different results in currentManifests based on whether or not there are more manifests left to delete.
   
   We would but from a deletion correctness standpoint this should be fine, 
   I think [this](https://github.com/apache/iceberg/pull/5981/files#r996389820) should explain the rationale for why there is correctness, but do please let me know if you still have concerns. 
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996553076


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   Updated to use the callback approach. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#issuecomment-1284358526

   Thanks for the reviews @rdblue @kbendick @jackye1995 !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996389820


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   I think this is ultimately a naming problem, I struggled to find a concise name since it's really doing multiple things:
   
   1.) The function is reading the current manifests and pruning out the candidate manifests to delete at the same time. The reason it is done this way is ultimately for this procedure we are optimizing for memory usage, and the current manifests are expected to be a larger set on average compared to the manifests to delete. 
   
   2.) Following 1. what this means is the procedure should aim to minimize the amount of the current table state to read. The procedure can stop reading manifests in the current table state if all the candidate manifests for deletion have been pruned.  That's because if all the candidate manifests for deletion have been pruned there's no data files to delete, and the procedure has no reason to continue to read the current manifests.
   
   >Generally speaking, that's not true to say that if there are no manifests to delete, there are no current manifests.
   
   3.) You're definitely correct here but from a "deletion correctness"  standpoint this state is still good because if there are no manifests to delete this means that there are no data files we can actually delete and subsequent findFilesToDelete operations are guaranteed to be no-ops.
   
   Let me know if there's still any concerns on correctness, right now the TestRemoveSnapshots goes through both  incremental and reachable cleanup strategies for all the cases but I'll think if there's any cases missing. Also definitely open to better names here,  and I'll also look into structuring/commenting the code so it's more clear. I think inline comments can probably go a long way here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996543457


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   Nice, yeah the Callback approach makes sense to me. That's a nifty trick I'm going to apply more in cases like this! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996543787


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -64,18 +64,11 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
       }
     }
 
-    Set<ManifestFile> candidateManifestFilesForDeletion = readManifests(expiredSnapshots);
-    Set<ManifestFile> manifestFilesAfterExpiration = readManifests(snapshotsAfterExpiration);
+    Set<ManifestFile> manifestsToDelete = readManifests(snapshotsBeforeExpiration);

Review Comment:
   Ooh great catch, it was expiredSnapshots in the initial implementation but I think when shuffling code I somehow changed it to snapshotsBeforeExpiration. will fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996553599


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +77,74 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  // Returns the current set of manifests while pruning the manifests to delete
+  // If all candidate manifests are pruned the returned set of current manifests will be empty
+  private Set<ManifestFile> findCurrentManifestsAndPruneCandidates(

Review Comment:
   Yeah since it was private and we're passing in the concurrent hashset I wasn't worried too much about thread safety because it was in the method's control but agree in this case it's safer to just implement in a way that's unassuming about the input. The callback approach should address this, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996389820


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   I think this is ultimately a naming problem, I struggled to find a concise name since it's really doing multiple things:
   
   1.) The function is reading the current manifests and pruning out the candidate manifests to delete at the same time. The reason it is done this way is ultimately for this procedure we are optimizing for memory usage, and the current manifests are expected to be a larger set on average compared to the manifests to delete. 
   
   2.) Following 1. what this means is the procedure should aim to minimize the amount of the current table state to read. The procedure can stop reading manifests in the current table state if all the candidate manifests have been pruned.
   
   >Generally speaking, that's not true to say that if there are no manifests to delete, there are no current manifests.
   
   3.) You're definitely correct here but from a "deletion correctness"  standpoint this state is still good because if there are no manifests to delete this means that there are no data files we can actually delete and subsequent findFilesToDelete operations are guaranteed to be no-ops.
   
   Let me know if there's still any concerns on correctness, right now the TestRemoveSnapshots goes through both strategies but I'll think if there's any cases missing . Also definitely open to better names here,  and I'll also look into structuring/commenting the code so it's more clear. I think inline comments can probably go a long way here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996390379


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;
     }
 
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFiles = readManifests(snapshot)) {
+                for (ManifestFile manifestFile : manifestFiles) {
+                  manifestsToDelete.remove(manifestFile);
+                  if (manifestsToDelete.isEmpty()) {
+                    return;
+                  }
+
+                  currentManifests.add(manifestFile.copy());

Review Comment:
   > It seems like we could get different results in currentManifests based on whether or not there are more manifests left to delete.
   
   We would but from a deletion correctness standpoint this should be fine, 
   I think [this](https://github.com/apache/iceberg/pull/5981/files#r996389820) should explain the rationale for why there is correctness and why it's done this way, but do please let me know if you still have concerns. 
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during snapshot expiration file cleanp

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r995269029


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -85,19 +79,62 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
   }
 
   private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
-    }
+    Set<ManifestFile> manifests = ConcurrentHashMap.newKeySet();
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}",
+                    snapshot.snapshotId(),
+                    exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
+                for (ManifestFile manifestFile : manifestFilesForSnapshot) {
+                  manifests.add(manifestFile.copy());
+                }
+              } catch (IOException e) {
+                throw new RuntimeIOException(
+                    e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
+              }
+            });
+
+    return manifests;
+  }
+
+  private Set<ManifestFile> manifestFilesToDelete(
+      Set<ManifestFile> currentManifests, Set<Snapshot> expiredSnapshots) {
+    Set<ManifestFile> manifestFilesToDelete = ConcurrentHashMap.newKeySet();
 
-    return manifestFiles;
+    Tasks.foreach(expiredSnapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}",
+                    snapshot.snapshotId(),
+                    exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
+                for (ManifestFile manifestFile : manifestFilesForSnapshot) {
+                  if (!currentManifests.contains(manifestFile)) {
+                    manifestFilesToDelete.add(manifestFile.copy());
+                  }
+                }
+              } catch (IOException e) {
+                throw new RuntimeIOException(
+                    e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
+              }
+            });
+    return manifestFilesToDelete;

Review Comment:
   @rdblue regarding https://github.com/apache/iceberg/pull/5669#discussion_r993986400
   
   I don't think we can exactly follow the pattern that we do for determining reachable data file. After determining the current reachable manifest set, that should be reused when determining the reachable data files. So first the set of currently reachable manifests is built in parallel. Then the candidate set is built from parallelizing across the expired snapshots.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996390229


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;
     }
 
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFiles = readManifests(snapshot)) {
+                for (ManifestFile manifestFile : manifestFiles) {
+                  manifestsToDelete.remove(manifestFile);

Review Comment:
   https://github.com/apache/iceberg/pull/5981/files#r996389820 I think my reply here should cover this, but yeah totally I think we probably need a better name here or at least inline comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996534013


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +77,74 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  // Returns the current set of manifests while pruning the manifests to delete
+  // If all candidate manifests are pruned the returned set of current manifests will be empty
+  private Set<ManifestFile> findCurrentManifestsAndPruneCandidates(

Review Comment:
   Actually, I think there's a concurrency bug here: you don't know whether the set that was passed in is thread-safe so you shouldn't modify it. I have another solution below to consider that also fixes this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996531195


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -64,18 +64,11 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
       }
     }
 
-    Set<ManifestFile> candidateManifestFilesForDeletion = readManifests(expiredSnapshots);
-    Set<ManifestFile> manifestFilesAfterExpiration = readManifests(snapshotsAfterExpiration);
+    Set<ManifestFile> manifestsToDelete = readManifests(snapshotsBeforeExpiration);

Review Comment:
   Why does this use `snapshotsBeforeExpiration` rather than `expiredSnapshots`? I think the ones referenced by expired snapshots are the only possible manifests to delete, and having a smaller initial set may save some work later if the new logic can short circuit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996201966


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -85,19 +79,60 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
   }
 
   private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
-    }
+    Set<ManifestFile> manifests = ConcurrentHashMap.newKeySet();
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFilesForSnapshot =
+                  readManifestFiles(snapshot)) {
+                for (ManifestFile manifestFile : manifestFilesForSnapshot) {
+                  manifests.add(manifestFile.copy());
+                }
+              } catch (IOException e) {
+                throw new RuntimeIOException(
+                    e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
+              }
+            });
+
+    return manifests;
+  }
+
+  private Set<ManifestFile> manifestFilesToDelete(
+      Set<ManifestFile> currentManifests, Set<Snapshot> expiredSnapshots) {

Review Comment:
   Yeah in my mind I was thinking in the worst case we anyways need to read the current manifests for the data files and structuring the code this way allows the set of current manifests to be re-used during the reachable data file analysis. But it's important to consider the average/common case more and I think i can structure the code in a readable way to re-use the determined set of current manifests. will update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996380125


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   Correctness Question / Naming Nit -This seems somewhat odd to me. The way the function is used, this could very well produce correct results.
   
   But looking at the function name / signature directly and the first few lines of code, it's strange to return an empty set as the "current manifests" if the manifests to delete are empty.
   
   Generally speaking, that's not true to say that if there are no manifests to delete, there are no current manifests.
   
   Maybe the function needs to be renamed or the local variable name needs to be renamed?
   
   Again, the usage is very possibly correct, but the name of the function and logic in the base case of empty manifests seems off.
   
   Is it maybe supposed to return an empty set when `snapshots` is empty?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996543787


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -64,18 +64,11 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
       }
     }
 
-    Set<ManifestFile> candidateManifestFilesForDeletion = readManifests(expiredSnapshots);
-    Set<ManifestFile> manifestFilesAfterExpiration = readManifests(snapshotsAfterExpiration);
+    Set<ManifestFile> manifestsToDelete = readManifests(snapshotsBeforeExpiration);

Review Comment:
   Ooh great catch, it was expiredSnapshots in the initial implementation but I think when refactoring the code I somehow changed it to snapshotsBeforeExpiration. will fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996534140


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   Ha, I just commented above that I'd prefer the Pair option. This is just an awkward case.
   
   What about adding a callback to do this? Like this:
   
   ```java
   private Set<ManifestFile> pruneReferencedManifests(Set<ManifestFile> deleteCandidates, Set<Snapshot> currentSnapshots, Consumer<ManifestFile> currentManifestCallback) {
     Set<ManifestFile> candidateSet = ConcurrentHashMap.keySet();
     candidateSet.addAll(deleteCandidates); // ensure the incoming set is thread safe!
     ...
     for (ManifestFile manifest : manifestFiles) {
       candidateSet.remove(manifest);
       currentManifestCallback.accept(manifest);
     }
     ...
     return candidateSet;
   }
   
   // when calling pruneReferencedManifests:
   Set<ManifestFile> currentManifests = ConcurrentHashMap.keySet();
   Set<ManifestFile> manifestsToDelete = pruneReferencedManifests(deleteCandidates, currentSnapshots, currentManifests::add);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996390379


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;
     }
 
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFiles = readManifests(snapshot)) {
+                for (ManifestFile manifestFile : manifestFiles) {
+                  manifestsToDelete.remove(manifestFile);
+                  if (manifestsToDelete.isEmpty()) {
+                    return;
+                  }
+
+                  currentManifests.add(manifestFile.copy());

Review Comment:
   I think [this](https://github.com/apache/iceberg/pull/5981/files#r996389820) should explain the rationale for why there is correctness, but do please let me know if you still have concerns. This procedure should certainly guarantee we're deleting the expected files 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996390229


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;
     }
 
+    Tasks.foreach(snapshots)
+        .retry(3)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(planExecutorService)
+        .onFailure(
+            (snapshot, exc) ->
+                LOG.warn(
+                    "Failed to determine manifests for snapshot {}", snapshot.snapshotId(), exc))
+        .run(
+            snapshot -> {
+              try (CloseableIterable<ManifestFile> manifestFiles = readManifests(snapshot)) {
+                for (ManifestFile manifestFile : manifestFiles) {
+                  manifestsToDelete.remove(manifestFile);

Review Comment:
   I think my reply [here](https://github.com/apache/iceberg/pull/5981/files#r996389820) should cover this, but yeah totally I think we probably need a better name here or at least inline comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996466631


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   I renamed to pruneManifestsToDelete and added some inliine comments; although still not great naming since the return set is the current manifests. 
   
   I think another way to make it more explicit and avoid the mutating of state of the candidate manifests to delete is to have the function return a Pair<Set<ManifestFile>, Set<ManifestFIle>> , where the first entry is the current manifests and the second is the manifests that can safely be removed, and these 2 sets can be used when determining the data files to delete later on. I initially avoided this since the Pair seemed a bit more complex than just mutating the state. 
   
   Let me know which approach you find preferable @kbendick @rdblue 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996466631


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   I renamed to findCurrentManifestsAndPruneCandidates and added some inliine comments; although still not great naming since the return set is the current manifests. 
   
   I think another way to make it more explicit and avoid the mutating of state of the candidate manifests to delete is to have the function return a Pair<Set<ManifestFile>, Set<ManifestFIle>> , where the first entry is the current manifests and the second is the manifests that can safely be removed, and these 2 sets can be used when determining the data files to delete later on. I initially avoided this since the Pair seemed a bit more complex than just mutating the state. 
   
   Let me know which approach you find preferable @kbendick @rdblue 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#discussion_r996466631


##########
core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java:
##########
@@ -84,19 +76,65 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
     deleteFiles(manifestListsToDelete, "manifest list");
   }
 
-  private Set<ManifestFile> readManifests(Set<Snapshot> snapshots) {
-    Set<ManifestFile> manifestFiles = Sets.newHashSet();
-    for (Snapshot snapshot : snapshots) {
-      try (CloseableIterable<ManifestFile> manifestFilesForSnapshot = readManifestFiles(snapshot)) {
-        for (ManifestFile manifestFile : manifestFilesForSnapshot) {
-          manifestFiles.add(manifestFile.copy());
-        }
-      } catch (IOException e) {
-        throw new RuntimeIOException(
-            e, "Failed to close manifest list: %s", snapshot.manifestListLocation());
-      }
+  private Set<ManifestFile> currentManifests(
+      Set<Snapshot> snapshots, Set<ManifestFile> manifestsToDelete) {
+    Set<ManifestFile> currentManifests = ConcurrentHashMap.newKeySet();
+    if (manifestsToDelete.isEmpty()) {
+      return currentManifests;

Review Comment:
   I renamed to findCurrentManifestsAndPruneCandidates and added some inline comments. I've also updated to avoid returning a partial set of current manifests if all candidates have been pruned. In this case we can just return an empty set. it ultimately does not matter what the current manifests are if all candidate manifests have been pruned but I updated just so the return value of the helper is consistent.
   
   I think another way to make it more explicit and avoid the mutating of state of the candidate manifests to delete is to have the function return a Pair<Set<ManifestFile>, Set<ManifestFIle>> , where the first entry is the current manifests and the second is the manifests that can safely be removed, and these 2 sets can be used when determining the data files to delete later on. I initially avoided this since the Pair seemed a bit more complex than just mutating the state. 
   
   Let me know which approach you find preferable @kbendick @rdblue 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #5981:
URL: https://github.com/apache/iceberg/pull/5981#issuecomment-1284353228

   Thanks, @amogh-jahagirdar! Looks great.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #5981: Core: Parallelize the determining of reachable manifests during file cleanup

Posted by GitBox <gi...@apache.org>.
rdblue merged PR #5981:
URL: https://github.com/apache/iceberg/pull/5981


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org