You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/08/26 14:11:46 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #3529: [HUDI-2351] Extract common FS and IO utils for marker mechanism

nsivabalan commented on a change in pull request #3529:
URL: https://github.com/apache/hudi/pull/3529#discussion_r696668520



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/BaseTwoToOneDowngradeHandler.java
##########
@@ -117,26 +117,26 @@ private void convertToDirectMarkers(final String commitInstantTime,
     } else {
       // In case of partial failures during downgrade, there is a chance that marker type file was deleted,
       // but timeline server based marker files are left.  So deletes them if any
-      deleteTimelineBasedMarkerFiles(markerDir, fileSystem);
+      deleteTimelineBasedMarkerFiles(
+          context, markerDir, fileSystem, table.getConfig().getMarkersDeleteParallelism());
     }
   }
 
-  private void deleteTimelineBasedMarkerFiles(String markerDir, FileSystem fileSystem) throws IOException {
+  private void deleteTimelineBasedMarkerFiles(HoodieEngineContext context, String markerDir,
+                                              FileSystem fileSystem, int parallelism) throws IOException {
     // Deletes timeline based marker files if any.
-    Path dirPath = new Path(markerDir);
-    FileStatus[] fileStatuses = fileSystem.listStatus(dirPath);
     Predicate<FileStatus> prefixFilter = fileStatus ->
         fileStatus.getPath().getName().startsWith(MARKERS_FILENAME_PREFIX);
-    List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
-        .filter(prefixFilter)
-        .map(fileStatus -> fileStatus.getPath().toString())
-        .collect(Collectors.toList());
-    markerDirSubPaths.forEach(fileToDelete -> {
-      try {
-        fileSystem.delete(new Path(fileToDelete), false);
-      } catch (IOException e) {
-        Log.warn("Deleting Timeline based marker files failed ", e);
-      }
-    });
+    FSUtils.parallelizeSubPathProcess(context, fileSystem, new Path(markerDir), parallelism,
+        prefixFilter, pairOfSubPathAndConf -> {

Review comment:
       sorry, shouldn't we be calling the new util method
   ```
   pairOfSubPathAndConf -> deleteSubPath(pairOfSubPathAndConf.getKey(), pairOfSubPathAndConf.getValue())
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org