You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/07/29 07:50:08 UTC

[hudi] branch master updated: [HUDI-4221] Optimzing getAllPartitionPaths (#6234)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 765dd2eae6 [HUDI-4221] Optimzing getAllPartitionPaths  (#6234)
765dd2eae6 is described below

commit 765dd2eae6937eae12e2bf0b9141b7bd388be73d
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Fri Jul 29 03:49:56 2022 -0400

    [HUDI-4221] Optimzing getAllPartitionPaths  (#6234)
    
    - Levering spark par for dir processing
---
 .../metadata/FileSystemBackedTableMetadata.java    | 50 +++++++++++++---------
 1 file changed, 29 insertions(+), 21 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 9877755b3c..bcfd891711 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -29,11 +29,11 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieMetadataException;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hudi.exception.HoodieMetadataException;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -83,31 +83,39 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
       int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());
 
       // List all directories in parallel
-      List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
+      List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList, path -> {
         FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
-        return Pair.of(path, fileSystem.listStatus(path));
+        return Arrays.stream(fileSystem.listStatus(path));
       }, listingParallelism);
       pathsToList.clear();
 
       // if current dictionary contains PartitionMetadata, add it to result
-      // if current dictionary does not contain PartitionMetadata, add it to queue
-      dirToFileListing.forEach(p -> {
-        Option<FileStatus> partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel()
-            .filter(fileStatus -> fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
-            .findFirst());
-
-        if (partitionMetaFile.isPresent()) {
-          // Is a partition.
-          String partitionName = FSUtils.getRelativePartitionPath(basePath, p.getLeft());
-          partitionPaths.add(partitionName);
-        } else {
-          // Add sub-dirs to the queue
-          pathsToList.addAll(Arrays.stream(p.getRight())
-              .filter(fileStatus -> fileStatus.isDirectory() && !fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
-              .map(fileStatus -> fileStatus.getPath())
-              .collect(Collectors.toList()));
-        }
-      });
+      // if current dictionary does not contain PartitionMetadata, add it to queue to be processed.
+      int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, dirToFileListing.size());
+      if (!dirToFileListing.isEmpty()) {
+        // result below holds a list of pair. first entry in the pair optionally holds the deduced list of partitions.
+        // and second entry holds optionally a directory path to be processed further.
+        List<Pair<Option<String>, Option<Path>>> result = engineContext.map(dirToFileListing, fileStatus -> {
+          FileSystem fileSystem = fileStatus.getPath().getFileSystem(hadoopConf.get());
+          if (fileStatus.isDirectory()) {
+            if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, fileStatus.getPath())) {
+              return Pair.of(Option.of(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath())), Option.empty());
+            } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
+              return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
+            }
+          } else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
+            String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
+            return Pair.of(Option.of(partitionName), Option.empty());
+          }
+          return Pair.of(Option.empty(), Option.empty());
+        }, fileListingParallelism);
+
+        partitionPaths.addAll(result.stream().filter(entry -> entry.getKey().isPresent()).map(entry -> entry.getKey().get())
+            .collect(Collectors.toList()));
+
+        pathsToList.addAll(result.stream().filter(entry -> entry.getValue().isPresent()).map(entry -> entry.getValue().get())
+            .collect(Collectors.toList()));
+      }
     }
     return partitionPaths;
   }