You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Kezhu Wang (Jira)" <ji...@apache.org> on 2021/02/08 03:48:00 UTC

[jira] [Created] (FLINK-21317) Downstream keyed state not work after FlinkKafkaShuffle

Kezhu Wang created FLINK-21317:
----------------------------------

             Summary: Downstream keyed state not work after FlinkKafkaShuffle
                 Key: FLINK-21317
                 URL: https://issues.apache.org/jira/browse/FLINK-21317
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.13.0
            Reporter: Kezhu Wang


{{FlinkKafkaShuffle}} uses {{KeyGroupRangeAssignment.assignKeyToParallelOperator}} to assign partition records to kafka topic partition. The assignment works as follow:
 # {{KeyGroupRangeAssignment.assignToKeyGroup(Object key, int maxParallelism)}} assigns key to key group.
 # {{KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId)}} assigns that key group to operator/subtask index.

When kafka topic partitions are consumed, they are redistributed by {{KafkaTopicPartitionAssigner.assign(KafkaTopicPartition partition, int numParallelSubtasks)}}. I copied code of this redistribution here.
{code:java}
public class KafkaTopicPartitionAssigner {
    public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
        int startIndex =
                ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

        // here, the assumption is that the id of Kafka partitions are always ascending
        // starting from 0, and therefore can be used directly as the offset clockwise from the
        // start index
        return (startIndex + partition.getPartition()) % numParallelSubtasks;
    }
}
{code}

This partition redistribution breaks prerequisites for {{DataStreamUtils.reinterpretAsKeyedStream}}, that is key groups are messed up. The consequence is unusable keyed state. I list deepest stack trace captured here:
{noformat}
Caused by: java.lang.NullPointerException
	at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:205)
	at org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:100)
{noformat}

cc [~ym]  [~sewen] [~AHeise]  [~pnowojski]

Below is my proposed changes:
* Make assignment between partition and subtask customizable.
* Provide a 0-based round-robin assignment. (This is making {{startIndex}} 0 in existing assignment algorithms.)

I saw FLINK-8570, above changes could be helpful if we finally decide to deliver FLINK-8570.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)