You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Nicholas Harezga <n....@apexxs.com> on 2016/08/25 00:58:31 UTC

Multiple Consumers From Same Group Assigned Same Partition

I am currently attempting to upgrade my software to use Kafka 0.9 from 0.8.2. I am trying to switch over to the new Consumer API to allow for rebalancing as machines are added or removed from our cluster. I am running into an issue where the same partition on a topic is being assigned to multiple consumers for a short period of time when a machine is added to the group. This results in some of the messages being processed more than once, while I am aiming for exactly once. I followed the setup instructions in the Javadocs and use an external data store for saving the offsets while consuming and when rebalancing.

In my test cluster I start initially with 2 machines consuming and a single producer. Everything works fine at the start and each consumer gets half of the partitions. When I add a third machine it is assigned a portion of the partitions but these partitions aren't revoked from one of the two initial machines. Below are some log statements from my program, hopefully they help illustrate my situation.

Partition 14 is initially assigned to machine 1. Machine 1 reads a number of messages before machine 3 is added. Partition 14 is assigned to machine 3 when started, but partition 14 was not revoked from machine 1. Both machines then read the same message at offset 3 before the system rebalances and both have access to partition 14 revoked. Machine 2 is then assigned partition 14 after it is revoked from machine 1 but is still assigned to machine 3. After it is revoked from machine 3, machine 2 is the only one with access to partition 14.

Machine 1 (Turned on at start)
2016-08-24 14:17:08 DEBUG KafkaStreamReader:351 - ASSIGNED: Assigning partition 14 for topic assignments to worker with offset 0
2016-08-24 14:18:48 DEBUG KafkaStreamReader:312 - Committing topic assignments partition 14 offset 3
2016-08-24 14:19:38 DEBUG KafkaStreamReader:200 - partition = 14, offset = 3 (Message read from kafka)
2016-08-24 14:19:38 DEBUG KafkaStreamReader:312 - Committing topic assignments partition 14 offset 4
2016-08-24 14:19:39 DEBUG KafkaStreamReader:338 - REVOKED: Committing for partition 14 of topic assignments offset 4

Machine 2 (Turned on at start)
2016-08-24 14:19:51 DEBUG KafkaStreamReader:351 - ASSIGNED: Assigning partition 14 for topic assignments to worker with offset 4

Machine 3 (Turned on a few minutes later)
2016-08-24 14:19:21 DEBUG KafkaStreamReader:351 - ASSIGNED: Assigning partition 14 for topic assignments to worker with offset 3
2016-08-24 14:19:48 DEBUG KafkaStreamReader:200 - partition = 14, offset = 3 (Message read from kafka - already read by machine 1)
2016-08-24 14:20:00 DEBUG KafkaStreamReader:338 - REVOKED: Committing for partition 14 of topic assignments offset 4

My cluster is running Cloudera 5.7.0 with Kafka version 2.0.1-1.2.0.1.p0.5 which corresponds to Kafka version 0.9.0.0+kafka2.0.1+283. (https://www.cloudera.com/documentation/kafka/latest/topics/kafka_packaging.html)

Can anyone help explain what I'm doing wrong here? If there is any further information I can provide to help this along please let me know and I will be happy to provide it if I can.