You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/07/29 07:50:08 UTC
[hudi] branch master updated: [HUDI-4221] Optimzing getAllPartitionPaths (#6234)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 765dd2eae6 [HUDI-4221] Optimzing getAllPartitionPaths (#6234)
765dd2eae6 is described below
commit 765dd2eae6937eae12e2bf0b9141b7bd388be73d
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Fri Jul 29 03:49:56 2022 -0400
[HUDI-4221] Optimzing getAllPartitionPaths (#6234)
- Levering spark par for dir processing
---
.../metadata/FileSystemBackedTableMetadata.java | 50 +++++++++++++---------
1 file changed, 29 insertions(+), 21 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 9877755b3c..bcfd891711 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -29,11 +29,11 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hudi.exception.HoodieMetadataException;
import java.io.IOException;
import java.util.Arrays;
@@ -83,31 +83,39 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());
// List all directories in parallel
- List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
+ List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList, path -> {
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
- return Pair.of(path, fileSystem.listStatus(path));
+ return Arrays.stream(fileSystem.listStatus(path));
}, listingParallelism);
pathsToList.clear();
// if current dictionary contains PartitionMetadata, add it to result
- // if current dictionary does not contain PartitionMetadata, add it to queue
- dirToFileListing.forEach(p -> {
- Option<FileStatus> partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel()
- .filter(fileStatus -> fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
- .findFirst());
-
- if (partitionMetaFile.isPresent()) {
- // Is a partition.
- String partitionName = FSUtils.getRelativePartitionPath(basePath, p.getLeft());
- partitionPaths.add(partitionName);
- } else {
- // Add sub-dirs to the queue
- pathsToList.addAll(Arrays.stream(p.getRight())
- .filter(fileStatus -> fileStatus.isDirectory() && !fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
- .map(fileStatus -> fileStatus.getPath())
- .collect(Collectors.toList()));
- }
- });
+ // if current dictionary does not contain PartitionMetadata, add it to queue to be processed.
+ int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, dirToFileListing.size());
+ if (!dirToFileListing.isEmpty()) {
+ // result below holds a list of pair. first entry in the pair optionally holds the deduced list of partitions.
+ // and second entry holds optionally a directory path to be processed further.
+ List<Pair<Option<String>, Option<Path>>> result = engineContext.map(dirToFileListing, fileStatus -> {
+ FileSystem fileSystem = fileStatus.getPath().getFileSystem(hadoopConf.get());
+ if (fileStatus.isDirectory()) {
+ if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, fileStatus.getPath())) {
+ return Pair.of(Option.of(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath())), Option.empty());
+ } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
+ return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
+ }
+ } else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
+ String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
+ return Pair.of(Option.of(partitionName), Option.empty());
+ }
+ return Pair.of(Option.empty(), Option.empty());
+ }, fileListingParallelism);
+
+ partitionPaths.addAll(result.stream().filter(entry -> entry.getKey().isPresent()).map(entry -> entry.getKey().get())
+ .collect(Collectors.toList()));
+
+ pathsToList.addAll(result.stream().filter(entry -> entry.getValue().isPresent()).map(entry -> entry.getValue().get())
+ .collect(Collectors.toList()));
+ }
}
return partitionPaths;
}