You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Sean Morris (semorris)" <se...@cisco.com> on 2016/08/10 19:45:44 UTC

Kafka 0.9.0.1 allows offset commits for offsets already committed

I ran in to a strange scenario today that after looking closer I am seeing that Kafka doesn't fail on several invalid scenarios of committing offsets. I have two partitions and two processes in the same consumer group, so each process has one partition. Process 1 retrieved a set of records from partition 0 (say offset 1 to 10) and then as it processed them one by one it committed the offsets one by one. Process 2 was consuming from partition 1 at this time. Meanwhile a rebalance occurred, I see this message in the Kafka logs


2016-08-10 11:00:33,702] INFO [kafka.coordinator.GroupCoordinator] [GroupCoordinator 1]: Preparing to restabilize group casemonGatewayNotifier with old generation 2

[2016-08-10 11:00:35,503] INFO [kafka.coordinator.GroupCoordinator] [GroupCoordinator 1]: Group casemonGatewayNotifier generation 2 is dead and removed

[2016-08-10 11:00:39,700] INFO [kafka.coordinator.GroupCoordinator] [GroupCoordinator 1]: Preparing to restabilize group casemonGatewayNotifier with old generation 0

[2016-08-10 11:00:39,700] INFO [kafka.coordinator.GroupCoordinator] [GroupCoordinator 1]: Stabilized group casemonGatewayNotifier generation 1

[2016-08-10 11:00:39,701] INFO [kafka.coordinator.GroupCoordinator] [GroupCoordinator 1]: Assignment received from leader for group casemonGatewayNotifier for generation 1

[2016-08-10 11:00:39,751] INFO [kafka.coordinator.GroupCoordinator] [GroupCoordinator 1]: Preparing to restabilize group casemonGatewayNotifier with old generation 1

[2016-08-10 11:00:43,699] INFO [kafka.coordinator.GroupCoordinator] [GroupCoordinator 1]: Stabilized group casemonGatewayNotifier generation 2

[2016-08-10 11:00:43,701] INFO [kafka.coordinator.GroupCoordinator] [GroupCoordinator 1]: Assignment received from leader for group casemonGatewayNotifier for generation 2

At this point the two processes swapped partitions but process 1 was still processing and committing records with partition 0, some of these failed but then they started to succeed again. While it was doing this process 2 called "poll" and received records from partition 0 and received records with offset 7 to 10. So at this point both processes have some of the same records from partition 0 and are both processing and committing them. The strange part is that none of the commits are failing. Both processes are able to commit the same records and process 1 is able to commit offsets to the partition it doesn't even own.

I created some test code to further verify this,  I continuously commit the same offset every time and what I saw was that "poll" would continue to return me new records (records with a higher offset then what I committed). kafka.admin.ConsumerGroupCommand continues to show the old offset and a lag. When I restart the process it correctly starts back at the old offset, but continues to get new records beyond the offset again.  It seems that invalid commits confuse the consumer logic.

Since the commits don't fail after the rebalance I don't seem to have a way to know that I should no longer be processing these records, therefore I end up processing them in both processes.  I know in 0.10 the poll has been changed to specify the number of records to retrieve, it seems that the only way to avoid these kind of duplicate processing is to only every process the 1st record returned by "poll", this seems inefficient. I know there is a rebalance callback but if I read the documentation correct this will only get invoked if you call "poll" so when working off my memory buffer I wouldn't know. How are people handling this currently?

Thanks,
Sean