You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Zhanxiang (Patrick) Huang (JIRA)" <ji...@apache.org> on 2019/03/08 05:09:00 UTC

[jira] [Created] (KAFKA-8069) Committed offsets get cleaned up right after the coordinator loading them back from __consumer_offsets in broker with old inter-broker protocol version (< 2.2)

Zhanxiang (Patrick) Huang created KAFKA-8069:
------------------------------------------------

             Summary: Committed offsets get cleaned up right after the coordinator loading them back from __consumer_offsets in broker with old inter-broker protocol version (< 2.2)
                 Key: KAFKA-8069
                 URL: https://issues.apache.org/jira/browse/KAFKA-8069
             Project: Kafka
          Issue Type: Bug
            Reporter: Zhanxiang (Patrick) Huang
            Assignee: Zhanxiang (Patrick) Huang


After the 2.1 release, if the broker hasn't been upgrade to the latest inter-broker protocol version, 
the committed offsets stored in the __consumer_offset topic will get cleaned up way earlier than it should be when the offsets are loaded back from the __consumer_offset topic in GroupCoordinator, which will happen during leadership transition or after broker bounce.

TL;DR
For V1 on-disk format for __consumer_offsets, we have the *expireTimestamp* field and if the inter-broker protocol (IBP) version is prior to 2.1 (prior to [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets]) for a kafka 2.1 broker, the logic of getting the expired offsets looks like:
{code:java}
def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): Map[TopicPartition, OffsetAndMetadata] = {
 offsets.filter {
 case (topicPartition, commitRecordMetadataAndOffset) =>
 ... && {
 commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {
 case None =>
 // current version with no per partition retention
 currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= offsetRetentionMs
 case Some(expireTimestamp) =>
 // older versions with explicit expire_timestamp field => old expiration semantics is used
 currentTimestamp >= expireTimestamp
 }
 }
 }....
 }
{code}
The expireTimestamp in the on-disk offset record can only be set when storing the committed offset in the __consumer_offset topic. But the GroupCoordinator also has keep a in-memory representation for the expireTimestamp (see the codes above), which can be set in the following two cases:
 # Upon the GroupCoordinator receiving OffsetCommitRequest, the expireTimestamp is set using the following logic:
{code:java}
expireTimestamp = offsetCommitRequest.retentionTime match {
 case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
 case retentionTime => Some(currentTimestamp + retentionTime)
}
{code}
In all the latest client versions, the consumer will set out OffsetCommitRequest with DEFAULT_RETENTION_TIME so the expireTimestamp will always be None in this case. *This means any committed offset set in this case will always hit the "case None" in the "getExpiredOffsets(...)" when coordinator is doing the cleanup, which is correct.*

 # Upon the GroupCoordinatorReceiving loading the committed offset stored in the __consumer_offsets topic from disk, the expireTimestamp is set using the following logic if IBP<2.1:
{code:java}
val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
{code}
and the logic to persist the expireTimestamp is:
{code:java}
// OffsetCommitRequest.DEFAULT_TIMESTAMP = -1
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
{code}
Since the in-memory expireTimestamp will always be None in our case as mentioned in 1), we will always store -1 on-disk. Therefore, when the offset is loaded from the __consumer_offsets topic, the in-memory expireTimestamp will always be set to -1. *This means any committed offset set in this case will always hit "case Some(expireTimestamp)" in the "getExpiredOffsets(...)" when coordinator is doing the cleanup, which basically indicates we will always expire the committed offset on the first expiration check (which is shortly after they are loaded from __consumer_offsets topic)*.

I am able to reproduce this bug on my local box with one broker using 2.*,1.* and 0.11.* consumer. The consumer will see null committed offset after the broker is bounced.

This bug is introduced by [PR-5690|https://github.com/apache/kafka/pull/5690] in the kafka 2.1 release and the fix is very straight-forward, which is basically set the expireTimestamp to None if it is -1 in the on-disk format.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)