You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2017/08/03 09:59:00 UTC

[jira] [Updated] (FLINK-7143) Partition assignment for Kafka consumer is not stable

     [ https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Aljoscha Krettek updated FLINK-7143:
------------------------------------
    Description: 
h3. Important Notice: 

Upgrading jobs from 1.2.x exhibits no known problems. Jobs from 1.3.0 and 1.3.1 with incorrect partition assignments cannot be automatically fixed by upgrading to Flink 1.3.2 via a savepoint, because the upgraded version would resume the wrong partition assignment from the savepoint. A workaround is to assign a different uuid to the Kafka source (so the offsets won't be resumed from the savepoint) and let it start from the latest offsets committed to Kafka instead. Note that this may violate exactly-once semantics and introduce some duplicates, because Kafka's committed offsets are not guaranteed to be 100% up date date with Flink's internal offset tracking. To maximize the alignment between the offsets in Kafka and those tracked by Flink, we suggest to abort the 1.3.x job via the "cancel with savepoint" command (https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html#cancel-job-with-savepoint) during the upgrade process.

h3. Original Issue Description
While deploying Flink 1.3 release to hundreds of routing jobs, we found some issues with partition assignment for Kafka consumer. some partitions weren't assigned and some partitions got assigned more than once.

Here is the bug introduced in Flink 1.3. 
{code}
	protected static void initializeSubscribedPartitionsToStartOffsets(...) {
                ...
		for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
			if (i % numParallelSubtasks == indexOfThisSubtask) {
				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
					subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
				}
                ...
         }
{code}

The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if the {{kafkaTopicPartitions}} has different order among different subtasks, assignment is not stable cross subtasks and creates the assignment issue mentioned earlier. 

fix is also very simple, we should use partitionId to do the mod {{if (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks == indexOfThisSubtask)}}. That would result in stable assignment cross subtasks that is independent of ordering in the array.

marking it as blocker because of its impact.

  was:
while deploying Flink 1.3 release to hundreds of routing jobs, we found some issues with partition assignment for Kafka consumer. some partitions weren't assigned and some partitions got assigned more than once.

Here is the bug introduced in Flink 1.3. 
{code}
	protected static void initializeSubscribedPartitionsToStartOffsets(...) {
                ...
		for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
			if (i % numParallelSubtasks == indexOfThisSubtask) {
				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
					subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
				}
                ...
         }
{code}

The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if the {{kafkaTopicPartitions}} has different order among different subtasks, assignment is not stable cross subtasks and creates the assignment issue mentioned earlier. 

fix is also very simple, we should use partitionId to do the mod {{if (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks == indexOfThisSubtask)}}. That would result in stable assignment cross subtasks that is independent of ordering in the array.

marking it as blocker because of its impact.


> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>
>                 Key: FLINK-7143
>                 URL: https://issues.apache.org/jira/browse/FLINK-7143
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Steven Zhen Wu
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.2
>
>
> h3. Important Notice: 
> Upgrading jobs from 1.2.x exhibits no known problems. Jobs from 1.3.0 and 1.3.1 with incorrect partition assignments cannot be automatically fixed by upgrading to Flink 1.3.2 via a savepoint, because the upgraded version would resume the wrong partition assignment from the savepoint. A workaround is to assign a different uuid to the Kafka source (so the offsets won't be resumed from the savepoint) and let it start from the latest offsets committed to Kafka instead. Note that this may violate exactly-once semantics and introduce some duplicates, because Kafka's committed offsets are not guaranteed to be 100% up date date with Flink's internal offset tracking. To maximize the alignment between the offsets in Kafka and those tracked by Flink, we suggest to abort the 1.3.x job via the "cancel with savepoint" command (https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html#cancel-job-with-savepoint) during the upgrade process.
> h3. Original Issue Description
> While deploying Flink 1.3 release to hundreds of routing jobs, we found some issues with partition assignment for Kafka consumer. some partitions weren't assigned and some partitions got assigned more than once.
> Here is the bug introduced in Flink 1.3. 
> {code}
> 	protected static void initializeSubscribedPartitionsToStartOffsets(...) {
>                 ...
> 		for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
> 			if (i % numParallelSubtasks == indexOfThisSubtask) {
> 				if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
> 					subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
> 				}
>                 ...
>          }
> {code}
> The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if the {{kafkaTopicPartitions}} has different order among different subtasks, assignment is not stable cross subtasks and creates the assignment issue mentioned earlier. 
> fix is also very simple, we should use partitionId to do the mod {{if (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks == indexOfThisSubtask)}}. That would result in stable assignment cross subtasks that is independent of ordering in the array.
> marking it as blocker because of its impact.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)