You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Danny Chen (Jira)" <ji...@apache.org> on 2023/03/24 07:15:00 UTC

[jira] [Closed] (HUDI-5967) Add partition ordering for full table scans

     [ https://issues.apache.org/jira/browse/HUDI-5967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Danny Chen closed HUDI-5967.
----------------------------
    Fix Version/s: 0.14.0
                       (was: 0.13.1)
       Resolution: Fixed

Fixed via master branch: a36c8e0f9732217198b8a2b425ea1bd16287f9b5

> Add partition ordering for full table scans
> -------------------------------------------
>
>                 Key: HUDI-5967
>                 URL: https://issues.apache.org/jira/browse/HUDI-5967
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: flink
>            Reporter: Alex Guo
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 0.14.0
>
>
> I am running a streaming read query on an hourly partitioned COW table with the following settings and using [StreamReadMonitoringFunction|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java#L206]
> {code:java}
> config.set(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_SNAPSHOT);
> config.setBoolean(FlinkOptions.READ_AS_STREAMING, true);
> config.set(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST);{code}
> Since I am reading from the earliest commit, the query starts off with a [full table scan|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L284], which gets all the partitions in the table. Then, it [maps|https://github.com/apache/hudi/blob/eca57b51866f2dff98437ec60ac935cdc6c18d91/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L378] each readPartition in the readPartitions set into a list of {{MergeOnReadInputSplits}} like [[partition1split1, partition1split2], [partition2split1, partition2split2], …] and then flatmapping so that all splits under a single partition are adjacent in the resulting list. Then, these splits are distributed to the read subtasks. This is why I see splits with e.g. num 1-55 all corresponding to one partition, then 56-115 corresponding to another partition, then 116-175 to another, etc.
> {code:java}
> FileIndex fileIndex = getFileIndex();
> readPartitions = new HashSet<>(fileIndex.getOrBuildPartitionPaths());{code}
> The problem is that since the readPartitions is initialized as a HashSet, the order of partitions in this list is random, so subtasks could read partition 11, advance the watermarks, then read partition 8 after and consider everything late.
>  
> So, within a subtask, there is no partition level order. i.e. even with parallelism = 1, partitions are read out of order.
>  
> The goal is to use a sorted set (e.g. TreeSet) to have this partition level ordering on the initial full table scan.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)