You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/11 13:11:15 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4564: [HUDI-3007] Fix issues in HoodieRepairTool

nsivabalan commented on a change in pull request #4564:
URL: https://github.com/apache/hudi/pull/4564#discussion_r782122923



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
##########
@@ -276,39 +272,71 @@ static boolean copyFiles(
             }
           });
           return results.iterator();
-        })
-        .collect();
+        }, true)
+        .collectAsList();
     return allResults.stream().reduce((r1, r2) -> r1 && r2).orElse(false);
   }
 
   /**
    * Lists all Hoodie files from the table base path.
    *
-   * @param basePathStr Table base path.
-   * @param conf        {@link Configuration} instance.
-   * @return An array of {@link FileStatus} of all Hoodie files.
+   * @param context       {@link HoodieEngineContext} instance.
+   * @param basePathStr   Table base path.
+   * @param expectedLevel Expected level in the directory hierarchy to include the file status.
+   * @param parallelism   Parallelism for the file listing.
+   * @return A list of absolute file paths of all Hoodie files.
    * @throws IOException upon errors.
    */
-  static FileStatus[] listFilesFromBasePath(String basePathStr, Configuration conf) throws IOException {
+  static List<String> listFilesFromBasePath(
+      HoodieEngineContext context, String basePathStr, int expectedLevel, int parallelism) {
     final Set<String> validFileExtensions = Arrays.stream(HoodieFileFormat.values())
         .map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
     final String logFileExtension = HoodieFileFormat.HOODIE_LOG.getFileExtension();
-    FileSystem fs = FSUtils.getFs(basePathStr, conf);
+    FileSystem fs = FSUtils.getFs(basePathStr, context.getHadoopConf().get());
     Path basePath = new Path(basePathStr);
+    return FSUtils.getFileStatusAtLevel(
+            context, fs, basePath, expectedLevel, parallelism).stream()
+        .filter(fileStatus -> {
+          if (!fileStatus.isFile()) {
+            return false;
+          }
+          Path path = fileStatus.getPath();
+          String extension = FSUtils.getFileExtension(path.getName());
+          return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension);

Review comment:
       We have FSUtils.isDataFile(Path) if it can assist you here. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java
##########
@@ -276,39 +272,71 @@ static boolean copyFiles(
             }
           });
           return results.iterator();
-        })
-        .collect();
+        }, true)
+        .collectAsList();
     return allResults.stream().reduce((r1, r2) -> r1 && r2).orElse(false);
   }
 
   /**
    * Lists all Hoodie files from the table base path.
    *
-   * @param basePathStr Table base path.
-   * @param conf        {@link Configuration} instance.
-   * @return An array of {@link FileStatus} of all Hoodie files.
+   * @param context       {@link HoodieEngineContext} instance.
+   * @param basePathStr   Table base path.
+   * @param expectedLevel Expected level in the directory hierarchy to include the file status.
+   * @param parallelism   Parallelism for the file listing.
+   * @return A list of absolute file paths of all Hoodie files.
    * @throws IOException upon errors.
    */
-  static FileStatus[] listFilesFromBasePath(String basePathStr, Configuration conf) throws IOException {
+  static List<String> listFilesFromBasePath(
+      HoodieEngineContext context, String basePathStr, int expectedLevel, int parallelism) {
     final Set<String> validFileExtensions = Arrays.stream(HoodieFileFormat.values())
         .map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
     final String logFileExtension = HoodieFileFormat.HOODIE_LOG.getFileExtension();
-    FileSystem fs = FSUtils.getFs(basePathStr, conf);
+    FileSystem fs = FSUtils.getFs(basePathStr, context.getHadoopConf().get());
     Path basePath = new Path(basePathStr);
+    return FSUtils.getFileStatusAtLevel(
+            context, fs, basePath, expectedLevel, parallelism).stream()
+        .filter(fileStatus -> {
+          if (!fileStatus.isFile()) {
+            return false;
+          }
+          Path path = fileStatus.getPath();
+          String extension = FSUtils.getFileExtension(path.getName());
+          return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension);
+        })
+        .map(fileStatus -> fileStatus.getPath().toString())
+        .collect(Collectors.toList());
+  }
 
-    try {
-      return Arrays.stream(fs.listStatus(basePath, path -> {
-        String extension = FSUtils.getFileExtension(path.getName());
-        return validFileExtensions.contains(extension) || path.getName().contains(logFileExtension);
-      })).filter(FileStatus::isFile).toArray(FileStatus[]::new);
-    } catch (IOException e) {
-      // return empty FileStatus if partition does not exist already
-      if (!fs.exists(basePath)) {
-        return new FileStatus[0];
-      } else {
-        throw e;
-      }
-    }
+  /**
+   * Deletes files from table base path.
+   *
+   * @param context           {@link HoodieEngineContext} instance.
+   * @param basePath          Base path of the table.
+   * @param relativeFilePaths A {@link List} of relative file paths for deleting.
+   */
+  static boolean deleteFiles(
+      HoodieEngineContext context, String basePath, List<String> relativeFilePaths) {
+    SerializableConfiguration conf = context.getHadoopConf();
+    return context.parallelize(relativeFilePaths)
+        .mapPartitions(iterator -> {

Review comment:
       we can do just map instead of mapPartitions. any particular reason. I am fine either ways, just curious.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org