You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/03/14 18:57:52 UTC

[hudi] branch master updated: [HUDI-5927] Improve parallelism of deleting invalid files (#8172)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d6f656e5bb [HUDI-5927] Improve parallelism of deleting invalid files (#8172)
0d6f656e5bb is described below

commit 0d6f656e5bb8a8acf75e7d7e8c3485e3748e06be
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Tue Mar 14 11:57:45 2023 -0700

    [HUDI-5927] Improve parallelism of deleting invalid files (#8172)
    
    This commit improves the parallelism of deleting invalid files when finalizing the write, so that the file deletion is parallelized at the file level instead of the partition level.
---
 .../java/org/apache/hudi/table/HoodieTable.java    | 32 ++++++++++------------
 1 file changed, 15 insertions(+), 17 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 8b1056bca6c..9800cf268ac 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -94,6 +94,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -660,23 +661,20 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
   private void deleteInvalidFilesByPartitions(HoodieEngineContext context, Map<String, List<Pair<String, String>>> invalidFilesByPartition) {
     // Now delete partially written files
     context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation: " + config.getTableName());
-    context.map(new ArrayList<>(invalidFilesByPartition.values()), partitionWithFileList -> {
-      final FileSystem fileSystem = metaClient.getFs();
-      LOG.info("Deleting invalid data files=" + partitionWithFileList);
-      if (partitionWithFileList.isEmpty()) {
-        return true;
-      }
-      // Delete
-      partitionWithFileList.stream().map(Pair::getValue).forEach(file -> {
-        try {
-          fileSystem.delete(new Path(file), false);
-        } catch (IOException e) {
-          throw new HoodieIOException(e.getMessage(), e);
-        }
-      });
-
-      return true;
-    }, config.getFinalizeWriteParallelism());
+    context.map(invalidFilesByPartition.values().stream()
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList()),
+        partitionFilePair -> {
+          final FileSystem fileSystem = metaClient.getFs();
+          LOG.info("Deleting invalid data file=" + partitionFilePair);
+          // Delete
+          try {
+            fileSystem.delete(new Path(partitionFilePair.getValue()), false);
+          } catch (IOException e) {
+            throw new HoodieIOException(e.getMessage(), e);
+          }
+          return true;
+        }, config.getFinalizeWriteParallelism());
   }
 
   /**