You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Steven Zhen Wu (JIRA)" <ji...@apache.org> on 2017/07/10 23:05:00 UTC

[jira] [Updated] (FLINK-7143) Partition assignment for Kafka consumer is not stable

     [ https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Steven Zhen Wu updated FLINK-7143:
----------------------------------
    Description: 
while deploying Flink 1.3 release to hundreds of routing jobs, we found some issues with partition assignment for Kafka consumer. some partitions weren't assigned and some partitions got assigned more than once.

Here is the bug introduced in Flink 1.3. 
{code}
	protected static void initializeSubscribedPartitionsToStartOffsets(...) {
                ...
		for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
			if (i % numParallelSubtasks == indexOfThisSubtask) {
				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
					subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
				}
                ...
         }
{code}

The bug is using array index {i} to mode against {code}numParallelSubtasks{code}. if the {code}kafkaTopicPartitions{code} has different order among different subtasks, assignment is not stable cross subtasks and creates the assignment issue mentioned earlier. fix is also very simple, we should use partition id to do the mod {code}if (kafkaTopicPartitions.get(i).getPartition() % numParallelSubtasks == indexOfThisSubtask){code}. That would result in stable assignment cross subtasks that independent of ordering in the array.

marking it as blocker because of its impact.

  was:
while deploying Flink 1.3 release to hundreds of routing jobs, we found some issues with partition assignment for Kafka consumer. some partitions weren't assigned and some partitions got assigned more than once.

Here is the bug introduced in Flink 1.3. 
{code}
	protected static void initializeSubscribedPartitionsToStartOffsets(...) {
                ...
		for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
			if (i % numParallelSubtasks == indexOfThisSubtask) {
				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
					subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
				}
                ...
         }
{code}

The bug is using array index {code}i{code} to mode against {code}numParallelSubtasks{code}. if the {code}kafkaTopicPartitions{code} has different order among different subtasks, assignment is not stable cross subtasks and creates the assignment issue mentioned earlier. fix is also very simple, we should use partition id to do the mod {code}if (kafkaTopicPartitions.get(i).getPartition() % numParallelSubtasks == indexOfThisSubtask){code}. That would result in stable assignment cross subtasks that independent of ordering in the array.

marking it as blocker because of its impact.


> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>
>                 Key: FLINK-7143
>                 URL: https://issues.apache.org/jira/browse/FLINK-7143
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Steven Zhen Wu
>            Priority: Blocker
>
> while deploying Flink 1.3 release to hundreds of routing jobs, we found some issues with partition assignment for Kafka consumer. some partitions weren't assigned and some partitions got assigned more than once.
> Here is the bug introduced in Flink 1.3. 
> {code}
> 	protected static void initializeSubscribedPartitionsToStartOffsets(...) {
>                 ...
> 		for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
> 			if (i % numParallelSubtasks == indexOfThisSubtask) {
> 				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
> 					subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
> 				}
>                 ...
>          }
> {code}
> The bug is using array index {i} to mode against {code}numParallelSubtasks{code}. if the {code}kafkaTopicPartitions{code} has different order among different subtasks, assignment is not stable cross subtasks and creates the assignment issue mentioned earlier. fix is also very simple, we should use partition id to do the mod {code}if (kafkaTopicPartitions.get(i).getPartition() % numParallelSubtasks == indexOfThisSubtask){code}. That would result in stable assignment cross subtasks that independent of ordering in the array.
> marking it as blocker because of its impact.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)