You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Piotr Nowojski (Jira)" <ji...@apache.org> on 2021/12/13 13:09:00 UTC

[jira] [Created] (FLINK-25275) Weighted KeyGroup assignment

Piotr Nowojski created FLINK-25275:
--------------------------------------

             Summary: Weighted KeyGroup assignment
                 Key: FLINK-25275
                 URL: https://issues.apache.org/jira/browse/FLINK-25275
             Project: Flink
          Issue Type: New Feature
          Components: Runtime / Network
    Affects Versions: 1.14.0
            Reporter: Piotr Nowojski


Currently key groups are split into key group ranges naively in the simplest way. Key groups are split into equally sized continuous ranges (number of ranges = parallelism = number of keygroups / size of single keygroup). Flink could avoid data skew between key groups, by assigning them to tasks based on their "weight". "Weight" could be defined as frequency of an access for the given key group. 

Arbitrary, non-continuous, key group assignment (for example TM1 is processing kg1 and kg3 while TM2 is processing only kg2) would require extensive changes to the state backends for example. However the data skew could be mitigated to some extent by creating key group ranges in a more clever way, while keeping the key group range continuity. For example TM1 processes range [kg1, kg9], while TM2 just [kg10, kg11].

[This branch shows a PoC of such approach.|https://github.com/pnowojski/flink/commits/antiskew]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)