You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2022/04/12 15:07:00 UTC

[jira] [Updated] (HUDI-3864) Avoid fetching all files for all partitions on the read/query path for flink

     [ https://issues.apache.org/jira/browse/HUDI-3864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

sivabalan narayanan updated HUDI-3864:
--------------------------------------
    Fix Version/s: 0.12.0

> Avoid fetching all files for all partitions on the read/query path for flink
> ----------------------------------------------------------------------------
>
>                 Key: HUDI-3864
>                 URL: https://issues.apache.org/jira/browse/HUDI-3864
>             Project: Apache Hudi
>          Issue Type: Task
>          Components: flink
>            Reporter: sivabalan narayanan
>            Assignee: Danny Chen
>            Priority: Major
>             Fix For: 0.12.0
>
>
> Fetching all files across all partitions should be avoided in hot path. especially on the query side. I inspected HoodieFileIndex for spark and things looks to be ok. We only load files for the partitions involved in the query. 
>  
> {code:java}
> public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
>                                 HoodieTableMetaClient metaClient,
>                                 TypedProperties configProperties,
>                                 HoodieTableQueryType queryType,
>                                 List<Path> queryPaths, 
> {code}
> Querypaths in above argument contains only the partitions involved in the split. 
>  
> later when we load the files, we load only for the matched partitions. 
>  
> {code:java}
> private Map<PartitionPath, FileStatus[]> loadPartitionPathFiles() {
>   // List files in all partition paths
>   List<PartitionPath> pathToFetch = new ArrayList<>();
>   Map<PartitionPath, FileStatus[]> cachedPartitionToFiles = new HashMap<>();
>   // Fetch from the FileStatusCache
>   List<PartitionPath> partitionPaths = getAllQueryPartitionPaths();
>   partitionPaths.forEach(partitionPath -> {
>     Option<FileStatus[]> filesInPartition = fileStatusCache.get(partitionPath.fullPartitionPath(basePath));
>     if (filesInPartition.isPresent()) {
>       cachedPartitionToFiles.put(partitionPath, filesInPartition.get());
>     } else {
>       pathToFetch.add(partitionPath);
>     }
>   });
>   Map<PartitionPath, FileStatus[]> fetchedPartitionToFiles;
>   if (pathToFetch.isEmpty()) {
>     fetchedPartitionToFiles = Collections.emptyMap();
>   } else {
>     Map<String, PartitionPath> fullPartitionPathsMapToFetch = pathToFetch.stream()
>         .collect(Collectors.toMap(
>             partitionPath -> partitionPath.fullPartitionPath(basePath).toString(),
>             Function.identity())
>         );
>     fetchedPartitionToFiles =
>         FSUtils.getFilesInPartitions(
>                 engineContext,
>                 metadataConfig,
>                 basePath,
>                 fullPartitionPathsMapToFetch.keySet().toArray(new String[0]),
>                 fileSystemStorageConfig.getSpillableDir())
>             .entrySet()
>             .stream()
>             .collect(Collectors.toMap(e -> fullPartitionPathsMapToFetch.get(e.getKey()), e -> e.getValue()));
>   }
>   // Update the fileStatusCache
>   fetchedPartitionToFiles.forEach((partitionPath, filesInPartition) -> {
>     fileStatusCache.put(partitionPath.fullPartitionPath(basePath), filesInPartition);
>   });
>   return CollectionUtils.combine(cachedPartitionToFiles, fetchedPartitionToFiles);
> } {code}
>  
>  
> But I do see in flink, we are loading across all partitions. Lets try to see if this can be avoided. 
>  
> IncrementalInputSplits [L180|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L180]
> fileStatuses = fileIndex.getFilesInPartitions();
>  
> HoodieTableSource [L298|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java#L298]
> FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
>  
> I do see we pass in required partition paths in both places. But will leave it to flink experts to inspect the code once and close out the ticket if no action required. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)