You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Gao (Jira)" <ji...@apache.org> on 2022/04/13 06:28:05 UTC
[jira] [Updated] (FLINK-21317) Downstream keyed state not work after FlinkKafkaShuffle
[ https://issues.apache.org/jira/browse/FLINK-21317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yun Gao updated FLINK-21317:
----------------------------
Fix Version/s: 1.16.0
> Downstream keyed state not work after FlinkKafkaShuffle
> -------------------------------------------------------
>
> Key: FLINK-21317
> URL: https://issues.apache.org/jira/browse/FLINK-21317
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.13.0
> Reporter: Kezhu Wang
> Priority: Minor
> Labels: auto-deprioritized-major
> Fix For: 1.15.0, 1.16.0
>
>
> {{FlinkKafkaShuffle}} uses {{KeyGroupRangeAssignment.assignKeyToParallelOperator}} to assign partition records to kafka topic partition. The assignment works as follow:
> # {{KeyGroupRangeAssignment.assignToKeyGroup(Object key, int maxParallelism)}} assigns key to key group.
> # {{KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId)}} assigns that key group to operator/subtask index.
> When kafka topic partitions are consumed, they are redistributed by {{KafkaTopicPartitionAssigner.assign(KafkaTopicPartition partition, int numParallelSubtasks)}}. I copied code of this redistribution here.
> {code:java}
> public class KafkaTopicPartitionAssigner {
> public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
> int startIndex =
> ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
> // here, the assumption is that the id of Kafka partitions are always ascending
> // starting from 0, and therefore can be used directly as the offset clockwise from the
> // start index
> return (startIndex + partition.getPartition()) % numParallelSubtasks;
> }
> }
> {code}
> This partition redistribution breaks prerequisites for {{DataStreamUtils.reinterpretAsKeyedStream}}, that is key groups are messed up. The consequence is unusable keyed state. I list deepest stack trace captured here:
> {noformat}
> Caused by: java.lang.NullPointerException
> at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:205)
> at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:100)
> {noformat}
> cc [~ym] [~sewen] [~AHeise] [~pnowojski]
> Below is my proposed changes:
> * Make assignment between partition and subtask customizable.
> * Provide a 0-based round-robin assignment. (This is making {{startIndex}} 0 in existing assignment algorithms.)
> I saw FLINK-8570, above changes could be helpful if we finally decide to deliver FLINK-8570.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)