You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2022/06/13 14:30:00 UTC

[jira] [Created] (HUDI-4242) Follow up on getAllPartitionPaths perf enhancement

sivabalan narayanan created HUDI-4242:
-----------------------------------------

             Summary: Follow up on getAllPartitionPaths perf enhancement
                 Key: HUDI-4242
                 URL: https://issues.apache.org/jira/browse/HUDI-4242
             Project: Apache Hudi
          Issue Type: Improvement
          Components: reader-core
            Reporter: sivabalan narayanan


GetAllPartitionPaths had some perf degradation from 0.9.0 to 0.10.0 and hence we had reverted the change for now. But the change as such was good. So, we want to follow up to see if we can fix/enhance the new code. Old code does not leverage the spark engine to parallelize across diff folders. So, there could be scope for improvement. but from the perf nos, its not straight forward. So creating a follow up ticket.

 

excerpt from the findings. 

For one of my test tables in S3, with EMR cluster (10k partitions) 
 # With 0.11.0:
147 secs.
 # With this patch as is (where engine context is not used for 2nd phase)
5.7 secs.
 # Latest master + adding engineContext for 2nd phase:
16 secs.
 # I also tried completely rewriting the dag.
12 secs.

      while (!pathsToList.isEmpty()) \{
        // TODO: Get the parallelism from HoodieWriteConfig
        int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());

        // List all directories in parallel
        List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList, path -> {
          FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
          return Arrays.stream(fileSystem.listStatus(path));
        }, listingParallelism);
        pathsToList.clear();

        // if current dictionary contains PartitionMetadata, add it to result
        // if current dictionary does not contain PartitionMetadata, add it to queue
        int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, dirToFileListing.size());
        List<Pair<Option<String>, Option<Path>>> result = engineContext.map(dirToFileListing, fileStatus -> \{
          FileSystem fileSystem = fileStatus.getPath().getFileSystem(hadoopConf.get());
          if (fileStatus.isDirectory()) {
            if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, fileStatus.getPath())) {
              return Pair.of(Option.of(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath())), Option.empty());
            } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) \{
              return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
            }
          } else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) \{
            String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
            return Pair.of(Option.of(partitionName), Option.empty());
          }
          return Pair.of(Option.empty(), Option.empty());
        }, fileListingParallelism);

        partitionPaths.addAll(result.stream().filter(entry -> entry.getKey().isPresent()).map(entry -> entry.getKey().get())
            .collect(Collectors.toList()));

        pathsToList.addAll(result.stream().filter(entry -> entry.getValue().isPresent()).map(entry -> entry.getValue().get())
            .collect(Collectors.toList()));
}
So, based on above findings, I will go w/ what we have in this patch in its current state. Will address Raymond's and Alexey's feedback alone and unblock 0.11.1.

 

Ref patch: https://github.com/apache/hudi/pull/5829



--
This message was sent by Atlassian Jira
(v8.20.7#820007)