You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/06/11 20:49:05 UTC
[hudi] 06/08: [HUDI-4221] Fixing getAllPartitionPaths perf hit w/ FileSystemBackedMetadata (#5829)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch release-0.11.1-rc2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 58e25ba8563bb45b629fd534eb12423aeb948e6e
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Sat Jun 11 16:17:42 2022 -0400
[HUDI-4221] Fixing getAllPartitionPaths perf hit w/ FileSystemBackedMetadata (#5829)
---
.../metadata/FileSystemBackedTableMetadata.java | 39 ++++++++++++----------
1 file changed, 22 insertions(+), 17 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index b77bb12c49..f029995ba0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -68,13 +68,14 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
@Override
public List<String> getAllPartitionPaths() throws IOException {
- FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get());
+ Path basePath = new Path(datasetBasePath);
+ FileSystem fs = basePath.getFileSystem(hadoopConf.get());
if (assumeDatePartitioning) {
return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath);
}
List<Path> pathsToList = new CopyOnWriteArrayList<>();
- pathsToList.add(new Path(datasetBasePath));
+ pathsToList.add(basePath);
List<String> partitionPaths = new CopyOnWriteArrayList<>();
while (!pathsToList.isEmpty()) {
@@ -82,27 +83,31 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());
// List all directories in parallel
- List<FileStatus[]> dirToFileListing = engineContext.map(pathsToList, path -> {
+ List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
- return fileSystem.listStatus(path);
+ return Pair.of(path, fileSystem.listStatus(path));
}, listingParallelism);
pathsToList.clear();
// if current dictionary contains PartitionMetadata, add it to result
// if current dictionary does not contain PartitionMetadata, add it to queue
- dirToFileListing.stream().flatMap(Arrays::stream).parallel()
- .forEach(fileStatus -> {
- if (fileStatus.isDirectory()) {
- if (HoodiePartitionMetadata.hasPartitionMetadata(fs, fileStatus.getPath())) {
- partitionPaths.add(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath()));
- } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
- pathsToList.add(fileStatus.getPath());
- }
- } else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
- String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
- partitionPaths.add(partitionName);
- }
- });
+ dirToFileListing.forEach(p -> {
+ Option<FileStatus> partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel()
+ .filter(fileStatus -> fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
+ .findFirst());
+
+ if (partitionMetaFile.isPresent()) {
+ // Is a partition.
+ String partitionName = FSUtils.getRelativePartitionPath(basePath, p.getLeft());
+ partitionPaths.add(partitionName);
+ } else {
+ // Add sub-dirs to the queue
+ pathsToList.addAll(Arrays.stream(p.getRight())
+ .filter(fileStatus -> fileStatus.isDirectory() && !fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
+ .map(fileStatus -> fileStatus.getPath())
+ .collect(Collectors.toList()));
+ }
+ });
}
return partitionPaths;
}