You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Zhang, Chenyuan (WorldQuant)" <Ch...@worldquant.com> on 2018/03/07 02:28:16 UTC

Kafka message replay/ consumer offset corruption

Hi all,

I'm experiencing a message replay problem in Kafka, which I suspect it's being caused by corrupted consumer offset, which is caused by corrupted group metadata.

Background:

*         Kafka cluster of 3 brokers with version 0.11.0.0.

*         Zookeeper cluster of 3 nodes with version 3.4.8.

*         Group xxx only consumes 1 topic, which has a partition of 1, and replication of 3.

The issue occurs when one of the brokers (broker 0) disconnected from zookeeper, which triggered group coordinator to migrate from broker 0 to broker 1. During the migration, broker 1 tried to load group xxx's metadata 9 times, and ended up loading an old group metadata instead of the latest (because of val currentGroup = groupMetadataCache.putIfNotExists(group.groupId, group) https://github.com/apache/kafka/blob/1cabef0d3dc7a3c245f260b8d34a60d7d044bb9c/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala), which caused consumer offset corruption and message replay.

Looking at Kafka source code, groupCoordinator.handleGroupImmigration(partition.partitionId) should be executed once per partition, which means, group xxx's metadata should be loaded once during group coordinator migration. But from the Kafka server logs, this group metadata was loaded 9 times.

Kafka logs:
                [Broker 0] [2018-02-02 09:51:29,599] INFO [GroupCoordinator 0]: Stabilized group xxx generation 352992 (__consumer_offsets-1) (kafka.coordinator.group.GroupCoordinator)
[Broker 0] [2018-02-02 09:51:29,667] INFO zookeeper state changed (Disconnected)
[Broker 1] [2018-02-02 09:51:30,000] INFO [GroupCoordinator 1]: Loading group metadata for xxx with generation 338355 (kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,117] INFO [GroupCoordinator 1]: Loading group metadata for xxx with generation 340494 (kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,248] INFO [GroupCoordinator 1]: Loading group metadata for xxx with generation 342313 (kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,366] INFO [GroupCoordinator 1]: Loading group metadata for xxx with generation 344311 (kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,506] INFO [GroupCoordinator 1]: Loading group metadata for xxx with generation 346157 (kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,615] INFO [GroupCoordinator 1]: Loading group metadata for xxx with generation 348051 (kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,735] INFO [GroupCoordinator 1]: Loading group metadata for xxx with generation 350699 (kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,835] INFO [GroupCoordinator 1]: Loading group metadata for xxx with generation 352762 (kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,849] INFO [GroupCoordinator 1]: Loading group metadata for xxx with generation 352992 (kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:31,526] INFO [GroupCoordinator 1]: Preparing to rebalance group xxx with old generation 338355 (__consumer_offsets-1)

Do you know why there are so many "Loading group metadata" actions in broker 1 for group xxx? Could this be because of some configuration issue or operation issue? How can I prevent it from happening again?

Any help is appreciated.

Thanks,
Chenyuan



###################################################################################

The information contained in this communication is confidential, may be

subject to legal privilege, and is intended only for the individual named.

If you are not the named addressee, please notify the sender immediately and

delete this email from your system.  The views expressed in this email are

the views of the sender only.  Outgoing and incoming electronic communications

to this address are electronically archived and subject to review and/or disclosure

to someone other than the recipient.

###################################################################################