You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Javier Holguera (Jira)" <ji...@apache.org> on 2021/02/10 19:40:00 UTC
[jira] [Updated] (CAMEL-16181) KafkaIdempotentRepository cache
incorrectly flagged as ready
[ https://issues.apache.org/jira/browse/CAMEL-16181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Javier Holguera updated CAMEL-16181:
------------------------------------
Description:
The `KafkaIdempotentRepository` initialises its cache off the back of the pre-existing Kafka topic with previous entries, with the following code:
{code:java}
log.debug("Subscribing consumer to {}", topic);
consumer.subscribe(Collections.singleton(topic));
log.debug("Seeking to beginning");
consumer.seekToBeginning(consumer.assignment());
POLL_LOOP: while (running.get()) {
log.trace("Polling");
ConsumerRecords<String, String> consumerRecords = consumer.poll(pollDurationMs);
if (consumerRecords.isEmpty()) {
// the first time this happens, we can assume that we have consumed all messages up to this point
log.trace("0 messages fetched on poll");
if (cacheReadyLatch.getCount() > 0) {
log.debug("Cache warmed up");
cacheReadyLatch.countDown();
}
}{code}
\{code} The problem with this code is: # `consumer.subscribe` doesn't instantaneously assign partitions to the consumer # When `consumer.seekToBeginning` is called, the operation doesn't do anything because it has no partitions yet (see [seekToBeginning doesn't work without auto.offset.reset (apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3cCAKWX9VUMPLiqTu9o0MpepauPszaPW9Lm91mWexVaFWKtgd3rgQ@mail.gmail.com%3e] # When later the first `consumer.poll` is issued, it returns nothing, triggering the sequence to *confirm the cache as ready when it isn't yet*. That can cause upstream messages not been correctly de-duplicated. The solution is: # Use a different overload of `consumer.subscribe` that accepts an implementation of the `ConsumerRebalanceListener`. When partitions are assigned to the `consumer` instance, call `seekToBeginning` there. # Doing an initial `poll(0)` that will never return records but will force the partition assignment process
was:
The `KafkaIdempotentRepository` initialises its cache off the back of the pre-existing Kafka topic with previous entries, with the following code:
{code:java} log.debug("Subscribing consumer to {}", topic); consumer.subscribe(Collections.singleton(topic)); log.debug("Seeking to beginning"); consumer.seekToBeginning(consumer.assignment()); POLL_LOOP: while (running.get()) \{ log.trace("Polling"); ConsumerRecords<String, String> consumerRecords = consumer.poll(pollDurationMs); if (consumerRecords.isEmpty()) { // the first time this happens, we can assume that we have // consumed all // messages up to this point log.trace("0 messages fetched on poll"); if (cacheReadyLatch.getCount() > 0) { log.debug("Cache warmed up"); cacheReadyLatch.countDown(); } } \{code} The problem with this code is: # `consumer.subscribe` doesn't instantaneously assign partitions to the consumer # When `consumer.seekToBeginning` is called, the operation doesn't do anything because it has no partitions yet (see [seekToBeginning doesn't work without auto.offset.reset (apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3cCAKWX9VUMPLiqTu9o0MpepauPszaPW9Lm91mWexVaFWKtgd3rgQ@mail.gmail.com%3e] # When later the first `consumer.poll` is issued, it returns nothing, triggering the sequence to *confirm the cache as ready when it isn't yet*. That can cause upstream messages not been correctly de-duplicated. The solution is: # Use a different overload of `consumer.subscribe` that accepts an implementation of the `ConsumerRebalanceListener`. When partitions are assigned to the `consumer` instance, call `seekToBeginning` there. # Doing an initial `poll(0)` that will never return records but will force the partition assignment process
> KafkaIdempotentRepository cache incorrectly flagged as ready
> ------------------------------------------------------------
>
> Key: CAMEL-16181
> URL: https://issues.apache.org/jira/browse/CAMEL-16181
> Project: Camel
> Issue Type: Improvement
> Components: camel-kafka
> Affects Versions: 3.7.2
> Reporter: Javier Holguera
> Priority: Major
> Fix For: 3.8.0
>
>
> The `KafkaIdempotentRepository` initialises its cache off the back of the pre-existing Kafka topic with previous entries, with the following code:
>
> {code:java}
> log.debug("Subscribing consumer to {}", topic);
> consumer.subscribe(Collections.singleton(topic));
> log.debug("Seeking to beginning");
> consumer.seekToBeginning(consumer.assignment());
>
> POLL_LOOP: while (running.get()) {
> log.trace("Polling");
> ConsumerRecords<String, String> consumerRecords = consumer.poll(pollDurationMs);
> if (consumerRecords.isEmpty()) {
> // the first time this happens, we can assume that we have consumed all messages up to this point
> log.trace("0 messages fetched on poll");
> if (cacheReadyLatch.getCount() > 0) {
> log.debug("Cache warmed up");
> cacheReadyLatch.countDown();
> }
> }{code}
> \{code} The problem with this code is: # `consumer.subscribe` doesn't instantaneously assign partitions to the consumer # When `consumer.seekToBeginning` is called, the operation doesn't do anything because it has no partitions yet (see [seekToBeginning doesn't work without auto.offset.reset (apache.org)|https://mail-archives.apache.org/mod_mbox/kafka-users/201603.mbox/%3cCAKWX9VUMPLiqTu9o0MpepauPszaPW9Lm91mWexVaFWKtgd3rgQ@mail.gmail.com%3e] # When later the first `consumer.poll` is issued, it returns nothing, triggering the sequence to *confirm the cache as ready when it isn't yet*. That can cause upstream messages not been correctly de-duplicated. The solution is: # Use a different overload of `consumer.subscribe` that accepts an implementation of the `ConsumerRebalanceListener`. When partitions are assigned to the `consumer` instance, call `seekToBeginning` there. # Doing an initial `poll(0)` that will never return records but will force the partition assignment process
--
This message was sent by Atlassian Jira
(v8.3.4#803005)