You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/15 21:25:27 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #6580: [HUDI-4792] Batch clean files to delete

nsivabalan commented on code in PR #6580:
URL: https://github.com/apache/hudi/pull/6580#discussion_r972428841


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -110,9 +112,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
       context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());
 
       Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
-          .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
+          .parallelize(partitionsToClean, cleanerParallelism)
+          .mapPartitions((Iterator<String> it) -> {
+            List<String> list = new ArrayList<>();

Review Comment:
   minor: `list` -> `partitionList`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -110,9 +112,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
       context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());
 
       Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
-          .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
+          .parallelize(partitionsToClean, cleanerParallelism)
+          .mapPartitions((Iterator<String> it) -> {
+            List<String> list = new ArrayList<>();
+            it.forEachRemaining(list::add);
+            Map<String, Pair<Boolean, List<CleanFileInfo>>> res = planner.getDeletePaths(list);

Review Comment:
   cleanResult



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -735,6 +735,34 @@ public final Stream<HoodieFileGroup> getAllFileGroups(String partitionStr) {
     return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
   }
 
+  @Override
+  public final Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionStr) {
+    return getAllFileGroupsIncludingReplaced(partitionStr)
+        .map(pair -> Pair.of(pair.getLeft(), pair.getRight().stream().filter(fg -> !isFileGroupReplaced(fg)).collect(Collectors.toList())));
+  }
+
+  private Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroupsIncludingReplaced(final List<String> partitionStrList) {
+    try {

Review Comment:
   shouldn't we be looking to call the exiting method here. 
   ```
   getAllFileGroupsIncludingReplaced(final String partitionStr)
   ```
   and then union the outputs for multiple partition paths. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -735,6 +735,34 @@ public final Stream<HoodieFileGroup> getAllFileGroups(String partitionStr) {
     return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
   }
 
+  @Override
+  public final Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionStr) {
+    return getAllFileGroupsIncludingReplaced(partitionStr)

Review Comment:
   same here. lets try to see if we can re-use methods and avoid code dedup.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -110,9 +112,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
       context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());
 
       Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
-          .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
+          .parallelize(partitionsToClean, cleanerParallelism)
+          .mapPartitions((Iterator<String> it) -> {
+            List<String> list = new ArrayList<>();
+            it.forEachRemaining(list::add);
+            Map<String, Pair<Boolean, List<CleanFileInfo>>> res = planner.getDeletePaths(list);
+            return res.entrySet().iterator();
+          }, false).collectAsList()
           .stream()
-          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+          .collect(Collectors.toMap(it -> it.getKey(), it -> it.getValue()));

Review Comment:
   why this change ? we can leave it as Pair::getKey and Pair::getValue .



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -233,43 +235,47 @@ private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(
 
     // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
     // In other words, the file versions only apply to the active file groups.
-    deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
-    boolean toDeletePartition = false;
-    List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
-    for (HoodieFileGroup fileGroup : fileGroups) {
-      int keepVersions = config.getCleanerFileVersionsRetained();
-      // do not cleanup slice required for pending compaction
-      Iterator<FileSlice> fileSliceIterator =
-          fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator();
-      if (isFileGroupInPendingCompaction(fileGroup)) {
-        // We have already saved the last version of file-groups for pending compaction Id
-        keepVersions--;
-      }
+    List<Pair<String, List<HoodieFileGroup>>> fileGroups = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList());
+    for (Pair<String, List<HoodieFileGroup>> pairFileGroup : fileGroups) {
+
+      deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, pairFileGroup.getLeft(), Option.empty()));

Review Comment:
   guess this is the actual change right in this class? i.e. moving getReplacedFilesEligibleToClean() from outside for loop to within. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -916,6 +944,8 @@ protected abstract Option<Pair<String, CompactionOperation>> getPendingCompactio
    */
   abstract Stream<HoodieFileGroup> fetchAllStoredFileGroups(String partitionPath);
 
+  abstract Stream<Pair<String, List<HoodieFileGroup>>> fetchAllStoredFileGroups(List<String> partitionPath);

Review Comment:
   we can probably avoid some of these additional methods if above suggestion is followed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org