You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "ming li (Jira)" <ji...@apache.org> on 2023/02/09 08:06:00 UTC

[jira] [Created] (FLINK-30985) [Flink][table-store] Change the Splits allocation algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm.

ming li created FLINK-30985:
-------------------------------

             Summary: [Flink][table-store] Change the Splits allocation algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
                 Key: FLINK-30985
                 URL: https://issues.apache.org/jira/browse/FLINK-30985
             Project: Flink
          Issue Type: Improvement
          Components: Table Store
            Reporter: ming li


Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in {{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the number of buckets is fixed, the order of traversal is also fixed.
{code:java}
private void assignSplits() {
    bucketSplits.forEach(
            (bucket, splits) -> {
                if (splits.size() > 0) {
                    // To ensure the order of consumption, the data of the same bucket is given
                    // to a task to be consumed.
                    int task = bucket % context.currentParallelism();
                    if (readersAwaitingSplit.remove(task)) {
                        // if the reader that requested another split has failed in the
                        // meantime, remove
                        // it from the list of waiting readers
                        if (!context.registeredReaders().containsKey(task)) {
                            return;
                        }
                        context.assignSplit(splits.poll(), task);
                    }
                }
            });
}{code}
Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough split in each {{bucket}} , so that the first {{bucket}} will always be assigned to the task, and other buckets may not be consumed for a long time, resulting in uneven consumption and difficulty in advancing {{{}watermark{}}}. So I think we should change the split allocation algorithm to a fair algorithm.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)