You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/31 09:59:48 UTC

(camel) branch camel-3.22.x updated: CAMEL-20218: On startup populate local cache synchronously

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.22.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.22.x by this push:
     new 30aedca6b0e CAMEL-20218: On startup populate local cache synchronously
30aedca6b0e is described below

commit 30aedca6b0ef932d9f8f7b7032e09bb35515115c
Author: Arseniy Tashoyan <ar...@swisscom.com>
AuthorDate: Tue Jan 30 10:08:05 2024 +0100

    CAMEL-20218: On startup populate local cache synchronously
---
 .../idempotent/kafka/KafkaConsumerUtil.java        |  54 +++++
 .../kafka/KafkaIdempotentRepository.java           | 233 +++++++--------------
 .../idempotent/kafka/KafkaConsumerUtilsTest.java   | 167 +++++++++++++++
 .../KafkaIdempotentRepositoryPersistenceIT.java    |   1 -
 4 files changed, 291 insertions(+), 164 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaConsumerUtil.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaConsumerUtil.java
new file mode 100644
index 00000000000..fdcef5e4d13
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaConsumerUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.camel.util.ObjectHelper;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+public class KafkaConsumerUtil {
+
+    /**
+     * Tests whether the Kafka consumer reached the target offsets for all specified topic partitions.
+     *
+     * @param  consumer      Kafka consumer. It is expected to have some assignment to topic partitions.
+     * @param  targetOffsets Target offsets for topic partitions.
+     * @param  <K>           Key type.
+     * @param  <V>           Value type.
+     * @return               {@code true} if the consumer has reached the target offsets for all specified topic
+     *                       partitions.
+     */
+    public static <K, V> boolean isReachedOffsets(Consumer<K, V> consumer, Map<TopicPartition, Long> targetOffsets) {
+        if (ObjectHelper.isEmpty(targetOffsets)) {
+            throw new IllegalArgumentException("Target offsets must be non-empty");
+        }
+
+        Set<TopicPartition> partitions = consumer.assignment();
+
+        /* If some partition is missing in the targetOffsets map, then we do not check the offset for this partition. */
+        Map<TopicPartition, Long> extendedTargetOffsets = new HashMap<>(targetOffsets);
+        partitions.forEach(partition -> extendedTargetOffsets.putIfAbsent(partition, Long.MIN_VALUE));
+
+        return partitions.stream()
+                .allMatch(partition -> consumer.position(partition) >= extendedTargetOffsets.get(partition));
+    }
+
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index b0340feeb55..fafcfed61bc 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -21,11 +21,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import org.apache.camel.CamelContext;
@@ -64,12 +60,9 @@ import org.slf4j.LoggerFactory;
  * partitions (it is designed to consume from all at the same time), or replication factor of the topic. Each repository
  * instance that uses the topic (e.g. typically on different machines running in parallel) controls its own consumer
  * group, so in a cluster of 10 Camel processes using the same topic each will control its own offset. On startup, the
- * instance subscribes to the topic and rewinds the offset to the beginning, rebuilding the cache to the latest state.
- * The cache will not be considered warmed up until one poll of {@link #pollDurationMs} in length returns 0 records.
- * Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the latter happens the
- * idempotent repository may be in an inconsistent state until its consumer catches up to the end of the topic. To use,
- * this repository must be placed in the Camel registry, either manually or by registration as a bean in
- * Spring/Blueprint, as it is CamelContext aware.
+ * instance consumes the full content of the topic, rebuilding the cache to the latest state. To use, this repository
+ * must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is
+ * CamelContext aware.
  */
 @ManagedResource(description = "Kafka IdempotentRepository")
 public class KafkaIdempotentRepository extends ServiceSupport implements IdempotentRepository, CamelContextAware {
@@ -93,11 +86,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
     private Map<String, Object> cache;
     private Consumer<String, String> consumer;
     private Producer<String, String> producer;
-    private TopicPoller topicPoller;
 
     private CamelContext camelContext;
-    private ExecutorService executorService;
-    private CountDownLatch cacheReadyLatch;
 
     enum CacheAction {
         add,
@@ -185,7 +175,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
     /**
      * Sets the name of the Kafka topic used by this idempotent repository. Each functionally-separate repository should
      * use a different topic.
-     * 
+     *
      * @param topic The topic name.
      */
     public void setTopic(String topic) {
@@ -198,21 +188,21 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
 
     /**
      * Sets the
-     * 
+     *
      * <pre>
      * bootstrap.servers
      * </pre>
-     * 
+     *
      * property on the internal Kafka producer and consumer. Use this as shorthand if not setting
      * {@link #consumerConfig} and {@link #producerConfig}. If used, this component will apply sensible default
      * configurations for the producer and consumer.
-     * 
+     *
      * @param bootstrapServers The
-     * 
+     *
      *                         <pre>
      *                         bootstrap.servers
      *                         </pre>
-     * 
+     *
      *                         value to use.
      */
     public void setBootstrapServers(String bootstrapServers) {
@@ -226,14 +216,14 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
     /**
      * Sets the properties that will be used by the Kafka producer. Overrides {@link #bootstrapServers}, so must define
      * the
-     * 
+     *
      * <pre>
      * bootstrap.servers
      * </pre>
-     * 
+     *
      * property itself. Prefer using {@link #bootstrapServers} for default configuration unless you specifically need
      * non-standard configuration options such as SSL/SASL.
-     * 
+     *
      * @param producerConfig The producer configuration properties.
      */
     public void setProducerConfig(Properties producerConfig) {
@@ -247,14 +237,14 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
     /**
      * Sets the properties that will be used by the Kafka consumer. Overrides {@link #bootstrapServers}, so must define
      * the
-     * 
+     *
      * <pre>
      * bootstrap.servers
      * </pre>
-     * 
+     *
      * property itself. Prefer using {@link #bootstrapServers} for default configuration unless you specifically need
      * non-standard configuration options such as SSL/SASL.
-     * 
+     *
      * @param consumerConfig The consumer configuration properties.
      */
     public void setConsumerConfig(Properties consumerConfig) {
@@ -267,7 +257,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
 
     /**
      * Sets the maximum size of the local key cache.
-     * 
+     *
      * @param maxCacheSize The maximum key cache size.
      */
     public void setMaxCacheSize(int maxCacheSize) {
@@ -288,7 +278,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
      * the stream has been consumed up to the current point. If the poll duration is excessively long for the rate at
      * which messages are sent on the topic, there exists a possibility that the cache cannot be warmed up and will
      * operate in an inconsistent state relative to its peers until it catches up.
-     * 
+     *
      * @param pollDurationMs The poll duration in milliseconds.
      */
     public void setPollDurationMs(int pollDurationMs) {
@@ -325,7 +315,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     protected void doStart() throws Exception {
         ObjectHelper.notNull(camelContext, "camelContext");
         StringHelper.notEmpty(topic, "topic");
@@ -361,41 +350,60 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
         producerConfig.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0");
         producer = new KafkaProducer<>(producerConfig);
 
-        cacheReadyLatch = new CountDownLatch(1);
-        topicPoller = new TopicPoller(consumer, cacheReadyLatch, pollDurationMs);
+        populateCache();
+    }
 
-        // warm up the cache
-        executorService = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "KafkaIdempotentRepository");
-        executorService.submit(topicPoller);
-        log.info("Warming up cache from topic {}", topic);
-        try {
-            if (cacheReadyLatch.await(30, TimeUnit.SECONDS)) {
-                log.info("Cache OK");
-            } else {
-                log.warn("Timeout waiting for cache warm-up from topic {}. Proceeding anyway. "
-                         + "Duplicate records may not be detected.",
-                        topic);
+    private void populateCache() {
+        log.debug("Getting partitions of topic {}", topic);
+        List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+        Collection<TopicPartition> partitions = partitionInfos.stream()
+                .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
+                .collect(Collectors.toUnmodifiableList());
+
+        log.debug("Assigning consumer to partitions {}", partitions);
+        consumer.assign(partitions);
+
+        log.debug("Seeking consumer to beginning of partitions {}", partitions);
+        consumer.seekToBeginning(partitions);
+
+        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
+        log.debug("Consuming records from partitions {} till end offsets {}", partitions, endOffsets);
+        while (!KafkaConsumerUtil.isReachedOffsets(consumer, endOffsets)) {
+            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(pollDurationMs));
+            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
+                addToCache(consumerRecord);
             }
-        } catch (InterruptedException e) {
-            log.warn("Interrupted while warming up cache. This exception is ignored.", e);
         }
+
     }
 
-    @Override
-    protected void doStop() {
-        // stop the thread
-        topicPoller.setRunning(false);
+    private void addToCache(ConsumerRecord<String, String> consumerRecord) {
+        CacheAction action = null;
         try {
-            if (topicPoller.getShutdownLatch().await(30, TimeUnit.SECONDS)) {
-                log.info("Cache from topic {} shutdown successfully", topic);
-            } else {
-                log.warn("Timeout waiting for cache to shutdown from topic {}. Proceeding anyway.", topic);
-            }
-        } catch (InterruptedException e) {
-            log.warn("Interrupted waiting on shutting down cache due {}. This exception is ignored.", e.getMessage());
+            action = CacheAction.valueOf(consumerRecord.value());
+        } catch (IllegalArgumentException iax) {
+            log.error(
+                    "Unexpected action value:\"{}\" received on [topic:{}, partition:{}, offset:{}]. Shutting down.",
+                    consumerRecord.key(), consumerRecord.topic(),
+                    consumerRecord.partition(), consumerRecord.offset());
         }
-        camelContext.getExecutorServiceManager().shutdown(executorService);
+        String messageId = consumerRecord.key();
+        if (action == CacheAction.add) {
+            log.debug("Adding to cache messageId:{}", messageId);
+            cache.put(messageId, messageId);
+        } else if (action == CacheAction.remove) {
+            log.debug("Removing from cache messageId:{}", messageId);
+            cache.remove(messageId);
+        } else if (action == CacheAction.clear) {
+            cache.clear();
+        } else {
+            // this should never happen
+            throw new RuntimeException("Illegal action " + action + " for key " + consumerRecord.key());
+        }
+    }
 
+    @Override
+    protected void doStop() {
         IOHelper.close(consumer, "consumer", log);
         IOHelper.close(producer, "producer", log);
     }
@@ -417,9 +425,13 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
     private void broadcastAction(String key, CacheAction action) {
         try {
             log.debug("Broadcasting action:{} for key:{}", action, key);
-            producer.send(new ProducerRecord<>(topic, key, action.toString())).get(); // sync
-                                                                                     // send
-        } catch (ExecutionException | InterruptedException e) {
+            ObjectHelper.notNull(producer, "producer");
+
+            producer.send(new ProducerRecord<>(topic, key, action.toString())).get(); // sync send
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeCamelException(e);
+        } catch (ExecutionException e) {
             throw new RuntimeCamelException(e);
         }
     }
@@ -451,109 +463,4 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
     public void clear() {
         broadcastAction(null, CacheAction.clear);
     }
-
-    @ManagedOperation(description = "Number of times duplicate messages have been detected")
-    public boolean isPollerRunning() {
-        return topicPoller.isRunning();
-    }
-
-    public boolean isCacheReady() {
-        return cacheReadyLatch.getCount() == 0;
-    }
-
-    private class TopicPoller implements Runnable {
-
-        private final Logger log = LoggerFactory.getLogger(this.getClass());
-        private final Consumer<String, String> consumer;
-        private final CountDownLatch cacheReadyLatch;
-        private final int pollDurationMs;
-
-        private final CountDownLatch shutdownLatch = new CountDownLatch(1);
-        private final AtomicBoolean running = new AtomicBoolean(true);
-
-        TopicPoller(Consumer<String, String> consumer, CountDownLatch cacheReadyLatch, int pollDurationMs) {
-            this.consumer = consumer;
-            this.cacheReadyLatch = cacheReadyLatch;
-            this.pollDurationMs = pollDurationMs;
-        }
-
-        @Override
-        public void run() {
-            log.debug("Getting partitions of topic {}", topic);
-            List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
-            Collection<TopicPartition> partitions = partitionInfos.stream()
-                    .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
-                    .collect(Collectors.toUnmodifiableList());
-
-            log.debug("Assigning consumer to partitions {}", partitions);
-            consumer.assign(partitions);
-
-            log.debug("Seeking consumer to beginning of partitions {}", partitions);
-            consumer.seekToBeginning(partitions);
-
-            POLL_LOOP: while (running.get()) {
-                log.trace("Polling");
-                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(pollDurationMs));
-                if (consumerRecords.isEmpty()) {
-                    // the first time this happens, we can assume that we have
-                    // consumed all
-                    // messages up to this point
-                    log.trace("0 messages fetched on poll");
-                    if (cacheReadyLatch.getCount() > 0) {
-                        log.debug("Cache warmed up");
-                        cacheReadyLatch.countDown();
-                    }
-                }
-                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
-                    CacheAction action;
-                    try {
-                        action = CacheAction.valueOf(consumerRecord.value());
-                    } catch (IllegalArgumentException iax) {
-                        log.error(
-                                "Unexpected action value:\"{}\" received on [topic:{}, partition:{}, offset:{}]. Shutting down.",
-                                consumerRecord.key(), consumerRecord.topic(),
-                                consumerRecord.partition(), consumerRecord.offset());
-                        setRunning(false);
-                        continue POLL_LOOP;
-                    }
-                    String messageId = consumerRecord.key();
-                    if (action == CacheAction.add) {
-                        log.debug("Adding to cache messageId:{}", messageId);
-                        cache.put(messageId, messageId);
-                    } else if (action == CacheAction.remove) {
-                        log.debug("Removing from cache messageId:{}", messageId);
-                        cache.remove(messageId);
-                    } else if (action == CacheAction.clear) {
-                        cache.clear();
-                    } else {
-                        // this should never happen
-                        log.warn("No idea how to {} a record. Shutting down.", action);
-                        setRunning(false);
-                        continue POLL_LOOP;
-                    }
-                }
-                consumer.commitSync();
-            }
-            log.debug("TopicPoller finished - triggering shutdown latch");
-            shutdownLatch.countDown();
-        }
-
-        CountDownLatch getShutdownLatch() {
-            return shutdownLatch;
-        }
-
-        void setRunning(boolean running) {
-            this.running.set(running);
-        }
-
-        boolean isRunning() {
-            return running.get();
-        }
-
-        @Override
-        public String toString() {
-            return "TopicPoller[" + topic + "]";
-        }
-    }
-
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaConsumerUtilsTest.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaConsumerUtilsTest.java
new file mode 100644
index 00000000000..a2d95ac46e1
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaConsumerUtilsTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.kafka;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.processor.idempotent.kafka.KafkaConsumerUtil.isReachedOffsets;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class KafkaConsumerUtilsTest {
+
+    @Test
+    public void testNotReachedOffsets() {
+        Map<TopicPartition, Long> targetOffsets = Map.of(
+                new TopicPartition("topic1", 0), 10L,
+                new TopicPartition("topic1", 1), 100L);
+
+        Consumer<String, String> consumer = mock(Consumer.class);
+        when(consumer.assignment())
+                .thenReturn(
+                        Set.of(
+                                new TopicPartition("topic1", 0),
+                                new TopicPartition("topic1", 1)));
+        when(consumer.position(new TopicPartition("topic1", 0)))
+                .thenReturn(9L);
+        when(consumer.position(new TopicPartition("topic1", 1)))
+                .thenReturn(99L);
+
+        boolean result = isReachedOffsets(consumer, targetOffsets);
+        assertFalse(result);
+    }
+
+    @Test
+    public void testReachedOffsets() {
+        Map<TopicPartition, Long> targetOffsets = Map.of(
+                new TopicPartition("topic1", 0), 10L,
+                new TopicPartition("topic1", 1), 100L);
+
+        Consumer<String, String> consumer = mock(Consumer.class);
+        when(consumer.assignment())
+                .thenReturn(
+                        Set.of(
+                                new TopicPartition("topic1", 0),
+                                new TopicPartition("topic1", 1)));
+        when(consumer.position(new TopicPartition("topic1", 0)))
+                .thenReturn(10L);
+        when(consumer.position(new TopicPartition("topic1", 1)))
+                .thenReturn(100L);
+
+        boolean result = isReachedOffsets(consumer, targetOffsets);
+        assertTrue(result);
+    }
+
+    @Test
+    public void testOverrunOffsets() {
+        Map<TopicPartition, Long> targetOffsets = Map.of(
+                new TopicPartition("topic1", 0), 10L,
+                new TopicPartition("topic1", 1), 100L);
+
+        Consumer<String, String> consumer = mock(Consumer.class);
+        when(consumer.assignment())
+                .thenReturn(
+                        Set.of(
+                                new TopicPartition("topic1", 0),
+                                new TopicPartition("topic1", 1)));
+        when(consumer.position(new TopicPartition("topic1", 0)))
+                .thenReturn(11L);
+        when(consumer.position(new TopicPartition("topic1", 1)))
+                .thenReturn(101L);
+
+        boolean result = isReachedOffsets(consumer, targetOffsets);
+        assertTrue(result);
+    }
+
+    @Test
+    public void testReachedOffsetsForSomePartitions() {
+        Map<TopicPartition, Long> targetOffsets = Map.of(
+                new TopicPartition("topic1", 0), 10L,
+                new TopicPartition("topic1", 1), 100L);
+
+        Consumer<String, String> consumer = mock(Consumer.class);
+        when(consumer.assignment())
+                .thenReturn(
+                        Set.of(
+                                new TopicPartition("topic1", 0),
+                                new TopicPartition("topic1", 1)));
+        when(consumer.position(new TopicPartition("topic1", 0)))
+                .thenReturn(10L);
+        when(consumer.position(new TopicPartition("topic1", 1)))
+                .thenReturn(99L);
+
+        boolean result = isReachedOffsets(consumer, targetOffsets);
+        assertFalse(result);
+    }
+
+    @Test
+    public void testNotReachedOffsetsSomeTargetOffsetsUnspecified() {
+        Map<TopicPartition, Long> targetOffsets = Map.of(
+                new TopicPartition("topic1", 0), 10L);
+
+        Consumer<String, String> consumer = mock(Consumer.class);
+        when(consumer.assignment())
+                .thenReturn(
+                        Set.of(
+                                new TopicPartition("topic1", 0),
+                                new TopicPartition("topic1", 1)));
+        when(consumer.position(new TopicPartition("topic1", 0)))
+                .thenReturn(9L);
+        when(consumer.position(new TopicPartition("topic1", 1)))
+                .thenReturn(99L);
+
+        boolean result = isReachedOffsets(consumer, targetOffsets);
+        assertFalse(result);
+    }
+
+    @Test
+    public void testReachedOffsetsSomeTargetOffsetsUnspecified() {
+        Map<TopicPartition, Long> targetOffsets = Map.of(
+                new TopicPartition("topic1", 0), 10L);
+
+        Consumer<String, String> consumer = mock(Consumer.class);
+        when(consumer.assignment())
+                .thenReturn(
+                        Set.of(
+                                new TopicPartition("topic1", 0),
+                                new TopicPartition("topic1", 1)));
+        when(consumer.position(new TopicPartition("topic1", 0)))
+                .thenReturn(10L);
+        when(consumer.position(new TopicPartition("topic1", 1)))
+                .thenReturn(99L);
+
+        boolean result = isReachedOffsets(consumer, targetOffsets);
+        assertTrue(result);
+    }
+
+    @Test
+    public void testTargetOffsetsEmpty() {
+        Map<TopicPartition, Long> targetOffsets = Collections.emptyMap();
+        Consumer<String, String> consumer = mock(Consumer.class);
+        assertThrows(IllegalArgumentException.class, () -> isReachedOffsets(consumer, targetOffsets));
+    }
+
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
index 4ad1a0f57b7..cfb5ddd6006 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
@@ -89,7 +89,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes
     @Test
     @DisplayName("Checks that half of the messages pass and duplicates are blocked")
     public void testFirstPassFiltersAsExpected() {
-        await().until(() -> kafkaIdempotentRepository.isCacheReady());
         int count = 10;
         sendMessages(count);