You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/03/14 18:57:52 UTC
[hudi] branch master updated: [HUDI-5927] Improve parallelism of deleting invalid files (#8172)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0d6f656e5bb [HUDI-5927] Improve parallelism of deleting invalid files (#8172)
0d6f656e5bb is described below
commit 0d6f656e5bb8a8acf75e7d7e8c3485e3748e06be
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Tue Mar 14 11:57:45 2023 -0700
[HUDI-5927] Improve parallelism of deleting invalid files (#8172)
This commit improves the parallelism of deleting invalid files when finalizing the write, so that the file deletion is parallelized at the file level instead of the partition level.
---
.../java/org/apache/hudi/table/HoodieTable.java | 32 ++++++++++------------
1 file changed, 15 insertions(+), 17 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 8b1056bca6c..9800cf268ac 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -94,6 +94,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -660,23 +661,20 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable {
private void deleteInvalidFilesByPartitions(HoodieEngineContext context, Map<String, List<Pair<String, String>>> invalidFilesByPartition) {
// Now delete partially written files
context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation: " + config.getTableName());
- context.map(new ArrayList<>(invalidFilesByPartition.values()), partitionWithFileList -> {
- final FileSystem fileSystem = metaClient.getFs();
- LOG.info("Deleting invalid data files=" + partitionWithFileList);
- if (partitionWithFileList.isEmpty()) {
- return true;
- }
- // Delete
- partitionWithFileList.stream().map(Pair::getValue).forEach(file -> {
- try {
- fileSystem.delete(new Path(file), false);
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
- }
- });
-
- return true;
- }, config.getFinalizeWriteParallelism());
+ context.map(invalidFilesByPartition.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ partitionFilePair -> {
+ final FileSystem fileSystem = metaClient.getFs();
+ LOG.info("Deleting invalid data file=" + partitionFilePair);
+ // Delete
+ try {
+ fileSystem.delete(new Path(partitionFilePair.getValue()), false);
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ return true;
+ }, config.getFinalizeWriteParallelism());
}
/**