You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "ming li (Jira)" <ji...@apache.org> on 2023/02/09 08:06:00 UTC
[jira] [Created] (FLINK-30985) [Flink][table-store] Change the Splits allocation algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
ming li created FLINK-30985:
-------------------------------
Summary: [Flink][table-store] Change the Splits allocation algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
Key: FLINK-30985
URL: https://issues.apache.org/jira/browse/FLINK-30985
Project: Flink
Issue Type: Improvement
Components: Table Store
Reporter: ming li
Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in {{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the number of buckets is fixed, the order of traversal is also fixed.
{code:java}
private void assignSplits() {
bucketSplits.forEach(
(bucket, splits) -> {
if (splits.size() > 0) {
// To ensure the order of consumption, the data of the same bucket is given
// to a task to be consumed.
int task = bucket % context.currentParallelism();
if (readersAwaitingSplit.remove(task)) {
// if the reader that requested another split has failed in the
// meantime, remove
// it from the list of waiting readers
if (!context.registeredReaders().containsKey(task)) {
return;
}
context.assignSplit(splits.poll(), task);
}
}
});
}{code}
Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough split in each {{bucket}} , so that the first {{bucket}} will always be assigned to the task, and other buckets may not be consumed for a long time, resulting in uneven consumption and difficulty in advancing {{{}watermark{}}}. So I think we should change the split allocation algorithm to a fair algorithm.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)