You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "aokolnychyi (via GitHub)" <gi...@apache.org> on 2023/02/02 06:27:53 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6682: Bulk delete

aokolnychyi commented on code in PR #6682:
URL: https://github.com/apache/iceberg/pull/6682#discussion_r1094075265


##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -67,7 +67,11 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
    *
    * @param deleteFunc a function that will be called to delete files
    * @return this for method chaining
+   * @deprecated Deletes are now performed in bulk see {@link #deleteBulkWith(Consumer)}. This

Review Comment:
   This will break the public API if a table is configured with a non-bulk IO and the user passes a custom delete function. In a lot of cases, a custom delete simply adds elements to a set. After this change, the custom delete function may be ignored and we will actually delete files. 
   
   Do we actually need to attempt bulk deletes in if a custom function is provided? I'd consider just going through the non-bulk API in that case. That can simplify the implementation.
   
   ```
   private Consumer<String> deleteFunc = null;
   
   ...
   
   @Override
   public DeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
     this.deleteFunc = newDeleteFunc;
     return this;
   }
   
   ...
   
   private void deleteFiles(Iterable<String> paths) {
     if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
       SupportsBulkOperations bulkIO = (SupportsBulkOperations) table.io();
       if (deleteExecutorService != null) {
         bulkIO.deleteFiles(deleteExecutorService, paths);
       } else {
         bulkIO.deleteFiles(paths);
       }
   
     } else {
       Tasks.foreach(paths)
           .noRetry()
           .executeWith(deleteExecutorService)
           .suppressFailureWhenFinished()
           .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc))
           .run(deleteFunc != null ? deleteFunc::accept : table.io()::deleteFile);
     }
   }
   ```
   



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,9 +84,16 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
    *
    * @param executorService the service to use
    * @return this for method chaining
+   * @deprecated All deletes should be performed using the bulk delete api if available. Use FileIO
+   *     specific parallelism controls to adjust bulk delete concurrency within that api.
    */
+  @Deprecated
   DeleteOrphanFiles executeDeleteWith(ExecutorService executorService);
 
+  default DeleteOrphanFiles deleteBulkWith(Consumer<Iterable<String>> deleteFunc) {

Review Comment:
   Do we have a use case for this? If not, I'd skip it and just support the old `deleteWith` method.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -253,6 +257,39 @@ protected DeleteSummary deleteFiles(
     return summary;
   }
 
+  protected DeleteSummary deleteFiles(
+      Consumer<Iterable<String>> bulkDeleteFunc, Iterator<FileInfo> files) {
+    DeleteSummary summary = new DeleteSummary();
+    Iterator<List<FileInfo>> groupedIterator = Iterators.partition(files, DELETE_GROUP_SIZE);
+
+    Tasks.foreach(groupedIterator)
+        .suppressFailureWhenFinished()
+        .run(
+            fileList -> {
+              Map<String, List<FileInfo>> filesByType =
+                  fileList.stream().collect(Collectors.groupingBy(FileInfo::getType));
+              filesByType.entrySet().stream()
+                  .forEach(
+                      entry -> {
+                        List<String> pathsToDelete =

Review Comment:
   What about using `ListMultimap`? 
   
   ```
   ListMultimap<String, String> pathsByType =
       Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
   
   for (FileInfo fileInfo : fileGroup) {
     pathsByType.put(fileInfo.getType(), fileInfo.getPath());
   }
   
   for (Map.Entry<String, Collection<String>> entry : filesByType.asMap().entrySet()) {
     String type = entry.getKey();
     Collection<String> pathsToDelete = entry.getValue();
   
     ...
   }
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -85,6 +88,7 @@
   private static final Logger LOG = LoggerFactory.getLogger(BaseSparkAction.class);
   private static final AtomicInteger JOB_COUNTER = new AtomicInteger();
   private static final int DELETE_NUM_RETRIES = 3;
+  private static final int DELETE_GROUP_SIZE = 100000;

Review Comment:
   I don't think we would want to make it configurable but can 100K be too many? This will be used in the path that collects chunk by chunk to the driver to avoid OOM, which is supposed to help small drivers or scenarios with lots of files to delete. Do we think the records are tiny so 100K won't cause any issues?
   
   I think S3FileIO would send 250 files in a single bulk delete by default.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -261,6 +298,24 @@ static class DeleteSummary {
     private final AtomicLong manifestListsCount = new AtomicLong(0L);
     private final AtomicLong otherFilesCount = new AtomicLong(0L);
 
+    public void deletedFiles(String type, int numFiles) {
+      if (FileContent.DATA.name().equalsIgnoreCase(type)) {
+        dataFilesCount.addAndGet(numFiles);

Review Comment:
   nit: What about empty lines after each if clause like in the method below? We do that quite frequently in large if-else statements.



##########
core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java:
##########
@@ -18,27 +18,42 @@
  */
 package org.apache.iceberg.hadoop;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileInfo;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SupportsBulkOperations;
 import org.apache.iceberg.io.SupportsPrefixOperations;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.util.SerializableMap;
 import org.apache.iceberg.util.SerializableSupplier;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class HadoopFileIO implements FileIO, HadoopConfigurable, SupportsPrefixOperations {
+public class HadoopFileIO
+    implements FileIO, HadoopConfigurable, SupportsPrefixOperations, SupportsBulkOperations {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class);
+  private static final String DELETE_FILE_PARALLELISM = "iceberg.hadoop.delete_file_parallelism";

Review Comment:
   Do we have any other Hadoop configs? Which naming style did we use there? I thought we were using `-` to separate words. 



##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -80,9 +84,16 @@ public interface DeleteOrphanFiles extends Action<DeleteOrphanFiles, DeleteOrpha
    *
    * @param executorService the service to use
    * @return this for method chaining
+   * @deprecated All deletes should be performed using the bulk delete api if available. Use FileIO
+   *     specific parallelism controls to adjust bulk delete concurrency within that api.
    */
+  @Deprecated

Review Comment:
   I am not sure the deprecation is worth it. Instead of deprecating a bunch of widely used methods in actions and arguments in procedures, can we consider supporting external executor services with bulk deletes so that these arguments can be still supported?
   
   I like the idea of bulk deletes in `HadoopFileIO`, but I don't see anything fundamentally bad in an ability to override the delete parallelism in actions or procedures. Especially, because it can make the implementation simpler. We already discussed passing executor services in the original PR that added the bulk API.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -253,6 +257,39 @@ protected DeleteSummary deleteFiles(
     return summary;
   }
 
+  protected DeleteSummary deleteFiles(
+      Consumer<Iterable<String>> bulkDeleteFunc, Iterator<FileInfo> files) {
+    DeleteSummary summary = new DeleteSummary();
+    Iterator<List<FileInfo>> groupedIterator = Iterators.partition(files, DELETE_GROUP_SIZE);
+
+    Tasks.foreach(groupedIterator)
+        .suppressFailureWhenFinished()
+        .run(
+            fileList -> {
+              Map<String, List<FileInfo>> filesByType =

Review Comment:
   What about adding a private method to handle this? I feel it would help with formatting (if the call fits on a single line). Spotless formats these closures in a weird way.
   
   ```
   Iterator<List<FileInfo>> fileGroups = Iterators.partition(files, DELETE_GROUP_SIZE);
   
   Tasks.foreach(fileGroups)
       .noRetry()
       .suppressFailureWhenFinished()
       .run(fileGroup -> deleteFileGroup(...));
   
   LOG.info("Deleted {} total files with bulk deletes", summary.totalFilesCount());
   
   return summary;
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -253,6 +257,39 @@ protected DeleteSummary deleteFiles(
     return summary;
   }
 
+  protected DeleteSummary deleteFiles(
+      Consumer<Iterable<String>> bulkDeleteFunc, Iterator<FileInfo> files) {
+    DeleteSummary summary = new DeleteSummary();
+    Iterator<List<FileInfo>> groupedIterator = Iterators.partition(files, DELETE_GROUP_SIZE);

Review Comment:
   nit: Is there a more descriptive name? fileGroups, fileLists?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org