You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Andrea Tarocchi (Jira)" <ji...@apache.org> on 2021/03/30 22:01:00 UTC

[jira] [Comment Edited] (CAMEL-16181) KafkaIdempotentRepository cache incorrectly flagged as ready

    [ https://issues.apache.org/jira/browse/CAMEL-16181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311828#comment-17311828 ] 

Andrea Tarocchi edited comment on CAMEL-16181 at 3/30/21, 10:00 PM:
--------------------------------------------------------------------

[~javierholguera] I stumbled upon this one after some time investigating another possibly correlated issue and was wondering why not additionally set {{auto.offset.reset=earliest}} in the idempotent kafka repository consumer?


was (Author: valdar):
[~javierholguera] I stumbled upon this one after some time investigating another possibly correlated issue and was wondering why not just set {{auto.offset.reset=earliest}} in the idempotent kafka repository consumer?

> KafkaIdempotentRepository cache incorrectly flagged as ready
> ------------------------------------------------------------
>
>                 Key: CAMEL-16181
>                 URL: https://issues.apache.org/jira/browse/CAMEL-16181
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-kafka
>    Affects Versions: 3.7.2
>            Reporter: Javier Holguera
>            Priority: Major
>             Fix For: 3.7.3, 3.8.0
>
>
> The `KafkaIdempotentRepository` initialises its cache off the back of the pre-existing Kafka topic with previous entries, with the following code:
>  
> {code:java}
> log.debug("Subscribing consumer to {}", topic);             
> consumer.subscribe(Collections.singleton(topic));             
> log.debug("Seeking to beginning");             
> consumer.seekToBeginning(consumer.assignment());
>              
> POLL_LOOP: while (running.get()) {                 
>   log.trace("Polling");                 
>   ConsumerRecords<String, String> consumerRecords = consumer.poll(pollDurationMs);                 
>   if (consumerRecords.isEmpty()) {                     
>     // the first time this happens, we can assume that we have consumed all messages up to this point                     
>     log.trace("0 messages fetched on poll");                     
>     if (cacheReadyLatch.getCount() > 0) {                         
>       log.debug("Cache warmed up");                         
>       cacheReadyLatch.countDown();                     
>     }
>   }{code}
>  
> The problem with this code is:
>  # `consumer.subscribe` doesn't instantaneously assign partitions to the consumer
>  # When `consumer.seekToBeginning` is called, the operation doesn't do anything because it has no partitions yet (see [seekToBeginning doesn't work without auto.offset.reset (apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3cCAKWX9VUMPLiqTu9o0MpepauPszaPW9Lm91mWexVaFWKtgd3rgQ@mail.gmail.com%3e] 
>  # When later the first `consumer.poll` is issued, it returns nothing, triggering the sequence to *confirm the cache as ready when it isn't yet*. That can cause upstream messages not been correctly de-duplicated.  
> The solution is:
>  # Use a different overload of `consumer.subscribe` that accepts an implementation of the `ConsumerRebalanceListener`.
>  # When partitions are assigned to the `consumer` instance, call `seekToBeginning` there.
>  # Doing an initial `poll(0)` that will never return records but will force the partition assignment process



--
This message was sent by Atlassian Jira
(v8.3.4#803005)