You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Arseniy Tashoyan (Jira)" <ji...@apache.org> on 2023/12/06 17:43:00 UTC

[jira] [Commented] (CAMEL-16181) KafkaIdempotentRepository cache incorrectly flagged as ready

    [ https://issues.apache.org/jira/browse/CAMEL-16181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793859#comment-17793859 ] 

Arseniy Tashoyan commented on CAMEL-16181:
------------------------------------------

This fix does not work. The callback to onPartitionsAssigned() gets executed after several calls to consumer.poll(). You can see in the log (attached), that several messages "0 messages fetched on poll" appear in the log before the message "Seeking to beginning" (printed by the callback). As a result, KafkaIdempotentRepository starts with empty local cache and re-consumes all input files. Practically it does not work.
 [^kafka-idempotent-repository.log] 

> KafkaIdempotentRepository cache incorrectly flagged as ready
> ------------------------------------------------------------
>
>                 Key: CAMEL-16181
>                 URL: https://issues.apache.org/jira/browse/CAMEL-16181
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-kafka
>    Affects Versions: 3.7.2
>            Reporter: Javier Holguera
>            Priority: Major
>             Fix For: 3.7.3, 3.8.0
>
>         Attachments: kafka-idempotent-repository.log
>
>
> The `KafkaIdempotentRepository` initialises its cache off the back of the pre-existing Kafka topic with previous entries, with the following code:
>  
> {code:java}
> log.debug("Subscribing consumer to {}", topic);             
> consumer.subscribe(Collections.singleton(topic));             
> log.debug("Seeking to beginning");             
> consumer.seekToBeginning(consumer.assignment());
>              
> POLL_LOOP: while (running.get()) {                 
>   log.trace("Polling");                 
>   ConsumerRecords<String, String> consumerRecords = consumer.poll(pollDurationMs);                 
>   if (consumerRecords.isEmpty()) {                     
>     // the first time this happens, we can assume that we have consumed all messages up to this point                     
>     log.trace("0 messages fetched on poll");                     
>     if (cacheReadyLatch.getCount() > 0) {                         
>       log.debug("Cache warmed up");                         
>       cacheReadyLatch.countDown();                     
>     }
>   }{code}
>  
> The problem with this code is:
>  # `consumer.subscribe` doesn't instantaneously assign partitions to the consumer
>  # When `consumer.seekToBeginning` is called, the operation doesn't do anything because it has no partitions yet (see [seekToBeginning doesn't work without auto.offset.reset (apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3cCAKWX9VUMPLiqTu9o0MpepauPszaPW9Lm91mWexVaFWKtgd3rgQ@mail.gmail.com%3e] 
>  # When later the first `consumer.poll` is issued, it returns nothing, triggering the sequence to *confirm the cache as ready when it isn't yet*. That can cause upstream messages not been correctly de-duplicated.  
> The solution is:
>  # Use a different overload of `consumer.subscribe` that accepts an implementation of the `ConsumerRebalanceListener`.
>  # When partitions are assigned to the `consumer` instance, call `seekToBeginning` there.
>  # Doing an initial `poll(0)` that will never return records but will force the partition assignment process



--
This message was sent by Atlassian Jira
(v8.20.10#820010)