You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Theo Diefenthal (Jira)" <ji...@apache.org> on 2020/09/23 15:54:00 UTC

[jira] [Created] (FLINK-19383) Per Partition State

Theo Diefenthal created FLINK-19383:
---------------------------------------

             Summary: Per Partition State
                 Key: FLINK-19383
                 URL: https://issues.apache.org/jira/browse/FLINK-19383
             Project: Flink
          Issue Type: Improvement
            Reporter: Theo Diefenthal


With Kafka possibly being the mostly used data source in Flink, I'd like to propse a new "per-partition-state".

Right now, Flink only knows about OperatorState (evenly distributed or union) or keyedState.

With Kafka having multiple partitions per topic, Flink already exploits that nicely. Most widely used is the feature that one can produce data with ascending timestamps per kafka partition. (e.g. server logs with one server sending data to one partition). In Flink, this results in a huge optimization namingly that in that case, one can use an AscendingTimestampWatermarkAssigner and windows can be closed quickly. 

Making use of the performance optimization leads me to thinking that we could go a step further and introduce a per-kafka-partition state. In my current scenario, I need to buffer the data per server (and thus per kafka partition) for 1 minute in event time, waiting if during that time certain other events arrive or not.

A state per kafka partition is currently hard to implement. The best to do is keyby the datastream by kafka-partition. However, the KafkaAssigner has different assignment characteristics then the KeyGroupRangeAssignment leading to an unnecessary shuffle step. Even worse, the KeyGroupRangeAssignment is kind of random whereas the kafka-partition assignment from source works round robing. Having similarly loaded kafka-partitions, after keying, the load can be skewed on the taskmanagers. For a simple pipeline with parallelism 3 and 3 partitions, this can lead to e.g. one taskManager processing 2 partitions, one taskmanager 1 partition and one taskManager being idle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)