You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Chetan (Jira)" <ji...@apache.org> on 2022/11/30 13:00:00 UTC

[jira] [Comment Edited] (KAFKA-14366) Kafka consumer rebalance issue, offsets points back to very old committed offset

    [ https://issues.apache.org/jira/browse/KAFKA-14366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17641284#comment-17641284 ] 

Chetan edited comment on KAFKA-14366 at 11/30/22 12:59 PM:
-----------------------------------------------------------

Hi [~pnee] ,

We were able to finally recreate the scenario in a lower environment.

1. Consumer node 1  stopped at 4:58 PM IST/11:28 UTC  and started at 5:00PM IST/11:30 UTC
2. Consumer node 2 stopped at 5:04PM IST/11:34 UTC and started at 5:07PM IST/11:37 UTC
3. Consumer node 3 stopped at 5:11PM IST/11:41 UTC and started at 5:16PM IST/11:46 UTC

We find that every time the consumer restart activity begins, one of the partitions behaves badly. The offsets get reset to an older offset. And so, we end up having duplicates.

!image-2022-11-30-17-58-15-046.png|width=536,height=210!

 

The pain does not end here, (we have stopped the producer) the lag which would be 0 or slowly decreasing. And due to this rebalance scenario and duplicate offset read, the lag suddenly bumps up. A new scenario occurs post that, the lag decreases, and while restart of another consumer node/rebalances happens, the lag again bumps up to a value (again re-read of duplicates, the second time) and this keeps going on until all the lag comes down to 0. 

We have sometimes read 50K messages twice thrice or even 1.8L messages multiple times at max.
The lag graph would look something like this (taken during one of the iteration).
!image-2022-11-30-18-10-37-180.png|width=617,height=216!

!image-2022-11-30-18-29-36-445.png|width=570,height=104!

This is the consumer configuration :

{{{}kafka.consumer.{}}}{{{}group{}}}{{{}-id-{}}}{{{}=trx{}}}{{{}-event{}}}{{{}-group{}}}{{{}-uk1\{env}{{}}}}
{{{}kafka.consumer.auto{}}}{{{}-offset{}}}{{{}-reset{}}}{{{}=earliest{}}}
*{{{}kafka.consumer.auto{}}}{{{}-offset{}}}{{{}-commit{}}}{{{}=false{}}}*
{{[kafka.consumer.session.timeout.ms|http://kafka.consumer.session.timeout.ms/]=50000}}
{{[kafka.consumer.heartbeat.interval.ms|http://kafka.consumer.heartbeat.interval.ms/]=16500}}
{{kafka.consumer.max.poll.records=100}}
{{[kafka.consumer.max.poll.interval.ms|http://kafka.consumer.max.poll.interval.ms/]=50000}}
{{{}kafka.consumer.fetch{}}}{{{}-max{}}}{{{}-wait{}}}{{{}=500{}}}

 

And consumer code snippet

@Bean
    public Map<String, Object> consumerconfig() {
        [LOGGER.info|http://logger.info/]("Getting Consumer Configuration");
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, getConsumerGroup());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, getAutoOffsetCommit());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset());
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, getMaxPollIntervalMs());
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, getMaxPollRecord());
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, getFetchMaxWait());
        [LOGGER.info|http://logger.info/]("Transaction Consumer config: {}", props);
        return props;
    }

 

   @Bean
    public ConsumerFactory<String, String> consumerFactory()

{         return new DefaultKafkaConsumerFactory<>(consumerConfigs());     }

 

   @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> trxKafkaListenerContainerFactory()

{         RebalanceListener rebalanceListener = new RebalanceListener();         ConcurrentKafkaListenerContainerFactory<String, String> factory =                 new ConcurrentKafkaListenerContainerFactory<>();         factory.setConsumerFactory(consumerFactory());         factory.setConcurrency(1);         +*factory.getContainerProperties().setSyncCommits(false);*+         factory.getContainerProperties().setPollTimeout(0);         *factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);*         factory.getContainerProperties().setConsumerRebalanceListener(rebalanceListener);         return factory;     }

===================================

The open question here is
1. Why does only one partition behaves so?
2.  I see in the metric that the offset for that partition has been committed and moved ahead but still when rebalance happens, the offset which was already committed is being re-read again and this should not happen in the first place. Since kafka does offset management, when rebalance occurs the next committed offset should have been provided to the new consumer node that gets attached to the partition.

Can you kindly help us out here.

 


was (Author: JIRAUSER298161):
Hi [~pnee] ,

We were able to finally recreate the scenario in a lower environment.


1. Consumer node 1  stopped at 4:58 PM IST/11:28 UTC  and started at 5:00PM IST/11:30 UTC
2. Consumer node 2 stopped at 5:04PM IST/11:34 UTC and started at 5:07PM IST/11:37 UTC
3. Consumer node 3 stopped at 5:11PM IST/11:41 UTC and started at 5:16PM IST/11:46 UTC

We find that every time the consumer restart activity begins, one of the partitions behaves badly. The offsets get reset to an older offset. And so, we end up having duplicates.

!image-2022-11-30-17-58-15-046.png!

 

The pain does not end here, (we have stopped the producer) the lag which would be 0. And due to this rebalance scenario and duplicate offset read the lag suddenly bumps up. A new scenario occurs post that, the lag decreases and while restart of another consumer node/rebalances happens, the lag again bumps up to a value (again re-read of duplicates, the second time) and this keeps going on until all the lag comes down to 0. 

We have sometimes read 50K messages twice thrice or even 1.8L messages multiple times at max.
The lag graph would look something like this.
!image-2022-11-30-18-10-37-180.png!

 

This is the consumer configuration :


{{{}kafka.consumer.{}}}{{{}group{}}}{{{}-id{}}}{{{}=trx{}}}{{{}-event{}}}{{{}-group{}}}{{{}-uk1-\{env}{}}}
{{{}kafka.consumer.auto{}}}{{{}-offset{}}}{{{}-reset{}}}{{{}=earliest{}}}
*{{{}kafka.consumer.auto{}}}{{{}-offset{}}}{{{}-commit{}}}{{{}=false{}}}*
{{[kafka.consumer.session.timeout.ms|http://kafka.consumer.session.timeout.ms/]=50000}}
{{[kafka.consumer.heartbeat.interval.ms|http://kafka.consumer.heartbeat.interval.ms/]=16500}}
{{kafka.consumer.max.poll.records=100}}
{{[kafka.consumer.max.poll.interval.ms|http://kafka.consumer.max.poll.interval.ms/]=50000}}
{{{}kafka.consumer.fetch{}}}{{{}-max{}}}{{{}-wait{}}}{{{}=500{}}}

 

And consumer code snippet



@Bean
    public Map<String, Object> consumerconfig() {
        [LOGGER.info|http://logger.info/]("Getting Consumer Configuration");
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, getConsumerGroup());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, getAutoOffsetCommit());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, getAutoOffsetReset());
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, getMaxPollIntervalMs());
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, getSessionTimeoutMs());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, getMaxPollRecord());
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, getHeartbeatIntervalMs());
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, getFetchMaxWait());
        [LOGGER.info|http://logger.info/]("Transaction Consumer config: {}", props);
        return props;
    }

 

   @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

 

   @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> trxKafkaListenerContainerFactory() {
        RebalanceListener rebalanceListener = new RebalanceListener();
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(1);
        +*factory.getContainerProperties().setSyncCommits(false);*+
        factory.getContainerProperties().setPollTimeout(0);
        *factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);*
        factory.getContainerProperties().setConsumerRebalanceListener(rebalanceListener);
        return factory;
    }

===================================

The open question here is
1. Why does only one partition behaves so?
2.  I see in the metric that the offset for that partition has been committed and moved ahead but still when rebalance happens, the offset which was already committed is being re-read again and this should not happen in the first place. Since kafka does offset management, when rebalance occurs the next committed offset should have been provided to the new consumer node that gets attached to the partition.



Can you kindly help us out here.

 

> Kafka consumer rebalance issue, offsets points back to very old committed offset
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-14366
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14366
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, offset manager
>    Affects Versions: 2.8.1
>         Environment: Production
>            Reporter: Chetan
>            Priority: Major
>         Attachments: image-2022-11-30-17-58-15-046.png, image-2022-11-30-18-10-37-180.png, image-2022-11-30-18-29-36-445.png, rebalance issue.docx
>
>
> Hi All,
> We are facing an issue while the client consumer restart (again not all restarts are ending up with this issue) and during the re-balancing scenario, sometimes one of the partition offsets goes back a long way from the committed offset.
> Scenario :
> Assume we have 4 instances of consumer and restarts of consumer one after the other.
>  # At the time of starting restarts assume the offset on partition 10 of a topic being consumed is pointing to 50000. (last offset of the topic and 0 lag)
>  # When restarts start (rebalancing) suddenly the offsets start pointing to 20000.
>  # While all the restarts are going on the consumer who is attached starts reading from 20000 and goes on.
>  # Once all rebalance is completed, and all messages from 20000 to 50000 offset has been read (where it had stopped initially)
> We end up having around 30K duplicates.
> (The numbers here are just an example, in production, we are facing huge duplicates and every two rebalance during restarts of consumer out of 10 restart exercise activity ends up in such duplicates and not all partitions and only one or two partitions behave this way and randomly)
> This seems to be a bug. I am attaching all screenshots for reference as well.
> Can someone kindly help out here?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)