You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/06/11 20:49:05 UTC

[hudi] 06/08: [HUDI-4221] Fixing getAllPartitionPaths perf hit w/ FileSystemBackedMetadata (#5829)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.11.1-rc2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 58e25ba8563bb45b629fd534eb12423aeb948e6e
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Sat Jun 11 16:17:42 2022 -0400

    [HUDI-4221] Fixing getAllPartitionPaths perf hit w/ FileSystemBackedMetadata (#5829)
---
 .../metadata/FileSystemBackedTableMetadata.java    | 39 ++++++++++++----------
 1 file changed, 22 insertions(+), 17 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index b77bb12c49..f029995ba0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -68,13 +68,14 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
 
   @Override
   public List<String> getAllPartitionPaths() throws IOException {
-    FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get());
+    Path basePath = new Path(datasetBasePath);
+    FileSystem fs = basePath.getFileSystem(hadoopConf.get());
     if (assumeDatePartitioning) {
       return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath);
     }
 
     List<Path> pathsToList = new CopyOnWriteArrayList<>();
-    pathsToList.add(new Path(datasetBasePath));
+    pathsToList.add(basePath);
     List<String> partitionPaths = new CopyOnWriteArrayList<>();
 
     while (!pathsToList.isEmpty()) {
@@ -82,27 +83,31 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
       int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());
 
       // List all directories in parallel
-      List<FileStatus[]> dirToFileListing = engineContext.map(pathsToList, path -> {
+      List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
         FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
-        return fileSystem.listStatus(path);
+        return Pair.of(path, fileSystem.listStatus(path));
       }, listingParallelism);
       pathsToList.clear();
 
       // if current dictionary contains PartitionMetadata, add it to result
       // if current dictionary does not contain PartitionMetadata, add it to queue
-      dirToFileListing.stream().flatMap(Arrays::stream).parallel()
-          .forEach(fileStatus -> {
-            if (fileStatus.isDirectory()) {
-              if (HoodiePartitionMetadata.hasPartitionMetadata(fs, fileStatus.getPath())) {
-                partitionPaths.add(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath()));
-              } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
-                pathsToList.add(fileStatus.getPath());
-              }
-            } else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
-              String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
-              partitionPaths.add(partitionName);
-            }
-          });
+      dirToFileListing.forEach(p -> {
+        Option<FileStatus> partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel()
+            .filter(fileStatus -> fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
+            .findFirst());
+
+        if (partitionMetaFile.isPresent()) {
+          // Is a partition.
+          String partitionName = FSUtils.getRelativePartitionPath(basePath, p.getLeft());
+          partitionPaths.add(partitionName);
+        } else {
+          // Add sub-dirs to the queue
+          pathsToList.addAll(Arrays.stream(p.getRight())
+              .filter(fileStatus -> fileStatus.isDirectory() && !fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
+              .map(fileStatus -> fileStatus.getPath())
+              .collect(Collectors.toList()));
+        }
+      });
     }
     return partitionPaths;
   }