You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/31 09:59:48 UTC
(camel) branch camel-3.22.x updated: CAMEL-20218: On startup populate local cache synchronously
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-3.22.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.22.x by this push:
new 30aedca6b0e CAMEL-20218: On startup populate local cache synchronously
30aedca6b0e is described below
commit 30aedca6b0ef932d9f8f7b7032e09bb35515115c
Author: Arseniy Tashoyan <ar...@swisscom.com>
AuthorDate: Tue Jan 30 10:08:05 2024 +0100
CAMEL-20218: On startup populate local cache synchronously
---
.../idempotent/kafka/KafkaConsumerUtil.java | 54 +++++
.../kafka/KafkaIdempotentRepository.java | 233 +++++++--------------
.../idempotent/kafka/KafkaConsumerUtilsTest.java | 167 +++++++++++++++
.../KafkaIdempotentRepositoryPersistenceIT.java | 1 -
4 files changed, 291 insertions(+), 164 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaConsumerUtil.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaConsumerUtil.java
new file mode 100644
index 00000000000..fdcef5e4d13
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaConsumerUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.camel.util.ObjectHelper;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+public class KafkaConsumerUtil {
+
+ /**
+ * Tests whether the Kafka consumer reached the target offsets for all specified topic partitions.
+ *
+ * @param consumer Kafka consumer. It is expected to have some assignment to topic partitions.
+ * @param targetOffsets Target offsets for topic partitions.
+ * @param <K> Key type.
+ * @param <V> Value type.
+ * @return {@code true} if the consumer has reached the target offsets for all specified topic
+ * partitions.
+ */
+ public static <K, V> boolean isReachedOffsets(Consumer<K, V> consumer, Map<TopicPartition, Long> targetOffsets) {
+ if (ObjectHelper.isEmpty(targetOffsets)) {
+ throw new IllegalArgumentException("Target offsets must be non-empty");
+ }
+
+ Set<TopicPartition> partitions = consumer.assignment();
+
+ /* If some partition is missing in the targetOffsets map, then we do not check the offset for this partition. */
+ Map<TopicPartition, Long> extendedTargetOffsets = new HashMap<>(targetOffsets);
+ partitions.forEach(partition -> extendedTargetOffsets.putIfAbsent(partition, Long.MIN_VALUE));
+
+ return partitions.stream()
+ .allMatch(partition -> consumer.position(partition) >= extendedTargetOffsets.get(partition));
+ }
+
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index b0340feeb55..fafcfed61bc 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -21,11 +21,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.camel.CamelContext;
@@ -64,12 +60,9 @@ import org.slf4j.LoggerFactory;
* partitions (it is designed to consume from all at the same time), or replication factor of the topic. Each repository
* instance that uses the topic (e.g. typically on different machines running in parallel) controls its own consumer
* group, so in a cluster of 10 Camel processes using the same topic each will control its own offset. On startup, the
- * instance subscribes to the topic and rewinds the offset to the beginning, rebuilding the cache to the latest state.
- * The cache will not be considered warmed up until one poll of {@link #pollDurationMs} in length returns 0 records.
- * Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the latter happens the
- * idempotent repository may be in an inconsistent state until its consumer catches up to the end of the topic. To use,
- * this repository must be placed in the Camel registry, either manually or by registration as a bean in
- * Spring/Blueprint, as it is CamelContext aware.
+ * instance consumes the full content of the topic, rebuilding the cache to the latest state. To use, this repository
+ * must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is
+ * CamelContext aware.
*/
@ManagedResource(description = "Kafka IdempotentRepository")
public class KafkaIdempotentRepository extends ServiceSupport implements IdempotentRepository, CamelContextAware {
@@ -93,11 +86,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
private Map<String, Object> cache;
private Consumer<String, String> consumer;
private Producer<String, String> producer;
- private TopicPoller topicPoller;
private CamelContext camelContext;
- private ExecutorService executorService;
- private CountDownLatch cacheReadyLatch;
enum CacheAction {
add,
@@ -185,7 +175,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
/**
* Sets the name of the Kafka topic used by this idempotent repository. Each functionally-separate repository should
* use a different topic.
- *
+ *
* @param topic The topic name.
*/
public void setTopic(String topic) {
@@ -198,21 +188,21 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
/**
* Sets the
- *
+ *
* <pre>
* bootstrap.servers
* </pre>
- *
+ *
* property on the internal Kafka producer and consumer. Use this as shorthand if not setting
* {@link #consumerConfig} and {@link #producerConfig}. If used, this component will apply sensible default
* configurations for the producer and consumer.
- *
+ *
* @param bootstrapServers The
- *
+ *
* <pre>
* bootstrap.servers
* </pre>
- *
+ *
* value to use.
*/
public void setBootstrapServers(String bootstrapServers) {
@@ -226,14 +216,14 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
/**
* Sets the properties that will be used by the Kafka producer. Overrides {@link #bootstrapServers}, so must define
* the
- *
+ *
* <pre>
* bootstrap.servers
* </pre>
- *
+ *
* property itself. Prefer using {@link #bootstrapServers} for default configuration unless you specifically need
* non-standard configuration options such as SSL/SASL.
- *
+ *
* @param producerConfig The producer configuration properties.
*/
public void setProducerConfig(Properties producerConfig) {
@@ -247,14 +237,14 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
/**
* Sets the properties that will be used by the Kafka consumer. Overrides {@link #bootstrapServers}, so must define
* the
- *
+ *
* <pre>
* bootstrap.servers
* </pre>
- *
+ *
* property itself. Prefer using {@link #bootstrapServers} for default configuration unless you specifically need
* non-standard configuration options such as SSL/SASL.
- *
+ *
* @param consumerConfig The consumer configuration properties.
*/
public void setConsumerConfig(Properties consumerConfig) {
@@ -267,7 +257,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
/**
* Sets the maximum size of the local key cache.
- *
+ *
* @param maxCacheSize The maximum key cache size.
*/
public void setMaxCacheSize(int maxCacheSize) {
@@ -288,7 +278,7 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
* the stream has been consumed up to the current point. If the poll duration is excessively long for the rate at
* which messages are sent on the topic, there exists a possibility that the cache cannot be warmed up and will
* operate in an inconsistent state relative to its peers until it catches up.
- *
+ *
* @param pollDurationMs The poll duration in milliseconds.
*/
public void setPollDurationMs(int pollDurationMs) {
@@ -325,7 +315,6 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
}
@Override
- @SuppressWarnings("unchecked")
protected void doStart() throws Exception {
ObjectHelper.notNull(camelContext, "camelContext");
StringHelper.notEmpty(topic, "topic");
@@ -361,41 +350,60 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
producerConfig.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0");
producer = new KafkaProducer<>(producerConfig);
- cacheReadyLatch = new CountDownLatch(1);
- topicPoller = new TopicPoller(consumer, cacheReadyLatch, pollDurationMs);
+ populateCache();
+ }
- // warm up the cache
- executorService = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "KafkaIdempotentRepository");
- executorService.submit(topicPoller);
- log.info("Warming up cache from topic {}", topic);
- try {
- if (cacheReadyLatch.await(30, TimeUnit.SECONDS)) {
- log.info("Cache OK");
- } else {
- log.warn("Timeout waiting for cache warm-up from topic {}. Proceeding anyway. "
- + "Duplicate records may not be detected.",
- topic);
+ private void populateCache() {
+ log.debug("Getting partitions of topic {}", topic);
+ List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
+ Collection<TopicPartition> partitions = partitionInfos.stream()
+ .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
+ .collect(Collectors.toUnmodifiableList());
+
+ log.debug("Assigning consumer to partitions {}", partitions);
+ consumer.assign(partitions);
+
+ log.debug("Seeking consumer to beginning of partitions {}", partitions);
+ consumer.seekToBeginning(partitions);
+
+ Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
+ log.debug("Consuming records from partitions {} till end offsets {}", partitions, endOffsets);
+ while (!KafkaConsumerUtil.isReachedOffsets(consumer, endOffsets)) {
+ ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(pollDurationMs));
+ for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
+ addToCache(consumerRecord);
}
- } catch (InterruptedException e) {
- log.warn("Interrupted while warming up cache. This exception is ignored.", e);
}
+
}
- @Override
- protected void doStop() {
- // stop the thread
- topicPoller.setRunning(false);
+ private void addToCache(ConsumerRecord<String, String> consumerRecord) {
+ CacheAction action = null;
try {
- if (topicPoller.getShutdownLatch().await(30, TimeUnit.SECONDS)) {
- log.info("Cache from topic {} shutdown successfully", topic);
- } else {
- log.warn("Timeout waiting for cache to shutdown from topic {}. Proceeding anyway.", topic);
- }
- } catch (InterruptedException e) {
- log.warn("Interrupted waiting on shutting down cache due {}. This exception is ignored.", e.getMessage());
+ action = CacheAction.valueOf(consumerRecord.value());
+ } catch (IllegalArgumentException iax) {
+ log.error(
+ "Unexpected action value:\"{}\" received on [topic:{}, partition:{}, offset:{}]. Shutting down.",
+ consumerRecord.key(), consumerRecord.topic(),
+ consumerRecord.partition(), consumerRecord.offset());
}
- camelContext.getExecutorServiceManager().shutdown(executorService);
+ String messageId = consumerRecord.key();
+ if (action == CacheAction.add) {
+ log.debug("Adding to cache messageId:{}", messageId);
+ cache.put(messageId, messageId);
+ } else if (action == CacheAction.remove) {
+ log.debug("Removing from cache messageId:{}", messageId);
+ cache.remove(messageId);
+ } else if (action == CacheAction.clear) {
+ cache.clear();
+ } else {
+ // this should never happen
+ throw new RuntimeException("Illegal action " + action + " for key " + consumerRecord.key());
+ }
+ }
+ @Override
+ protected void doStop() {
IOHelper.close(consumer, "consumer", log);
IOHelper.close(producer, "producer", log);
}
@@ -417,9 +425,13 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
private void broadcastAction(String key, CacheAction action) {
try {
log.debug("Broadcasting action:{} for key:{}", action, key);
- producer.send(new ProducerRecord<>(topic, key, action.toString())).get(); // sync
- // send
- } catch (ExecutionException | InterruptedException e) {
+ ObjectHelper.notNull(producer, "producer");
+
+ producer.send(new ProducerRecord<>(topic, key, action.toString())).get(); // sync send
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeCamelException(e);
+ } catch (ExecutionException e) {
throw new RuntimeCamelException(e);
}
}
@@ -451,109 +463,4 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
public void clear() {
broadcastAction(null, CacheAction.clear);
}
-
- @ManagedOperation(description = "Number of times duplicate messages have been detected")
- public boolean isPollerRunning() {
- return topicPoller.isRunning();
- }
-
- public boolean isCacheReady() {
- return cacheReadyLatch.getCount() == 0;
- }
-
- private class TopicPoller implements Runnable {
-
- private final Logger log = LoggerFactory.getLogger(this.getClass());
- private final Consumer<String, String> consumer;
- private final CountDownLatch cacheReadyLatch;
- private final int pollDurationMs;
-
- private final CountDownLatch shutdownLatch = new CountDownLatch(1);
- private final AtomicBoolean running = new AtomicBoolean(true);
-
- TopicPoller(Consumer<String, String> consumer, CountDownLatch cacheReadyLatch, int pollDurationMs) {
- this.consumer = consumer;
- this.cacheReadyLatch = cacheReadyLatch;
- this.pollDurationMs = pollDurationMs;
- }
-
- @Override
- public void run() {
- log.debug("Getting partitions of topic {}", topic);
- List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
- Collection<TopicPartition> partitions = partitionInfos.stream()
- .map(pi -> new TopicPartition(pi.topic(), pi.partition()))
- .collect(Collectors.toUnmodifiableList());
-
- log.debug("Assigning consumer to partitions {}", partitions);
- consumer.assign(partitions);
-
- log.debug("Seeking consumer to beginning of partitions {}", partitions);
- consumer.seekToBeginning(partitions);
-
- POLL_LOOP: while (running.get()) {
- log.trace("Polling");
- ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(pollDurationMs));
- if (consumerRecords.isEmpty()) {
- // the first time this happens, we can assume that we have
- // consumed all
- // messages up to this point
- log.trace("0 messages fetched on poll");
- if (cacheReadyLatch.getCount() > 0) {
- log.debug("Cache warmed up");
- cacheReadyLatch.countDown();
- }
- }
- for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
- CacheAction action;
- try {
- action = CacheAction.valueOf(consumerRecord.value());
- } catch (IllegalArgumentException iax) {
- log.error(
- "Unexpected action value:\"{}\" received on [topic:{}, partition:{}, offset:{}]. Shutting down.",
- consumerRecord.key(), consumerRecord.topic(),
- consumerRecord.partition(), consumerRecord.offset());
- setRunning(false);
- continue POLL_LOOP;
- }
- String messageId = consumerRecord.key();
- if (action == CacheAction.add) {
- log.debug("Adding to cache messageId:{}", messageId);
- cache.put(messageId, messageId);
- } else if (action == CacheAction.remove) {
- log.debug("Removing from cache messageId:{}", messageId);
- cache.remove(messageId);
- } else if (action == CacheAction.clear) {
- cache.clear();
- } else {
- // this should never happen
- log.warn("No idea how to {} a record. Shutting down.", action);
- setRunning(false);
- continue POLL_LOOP;
- }
- }
- consumer.commitSync();
- }
- log.debug("TopicPoller finished - triggering shutdown latch");
- shutdownLatch.countDown();
- }
-
- CountDownLatch getShutdownLatch() {
- return shutdownLatch;
- }
-
- void setRunning(boolean running) {
- this.running.set(running);
- }
-
- boolean isRunning() {
- return running.get();
- }
-
- @Override
- public String toString() {
- return "TopicPoller[" + topic + "]";
- }
- }
-
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaConsumerUtilsTest.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaConsumerUtilsTest.java
new file mode 100644
index 00000000000..a2d95ac46e1
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaConsumerUtilsTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.idempotent.kafka;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.processor.idempotent.kafka.KafkaConsumerUtil.isReachedOffsets;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class KafkaConsumerUtilsTest {
+
+ @Test
+ public void testNotReachedOffsets() {
+ Map<TopicPartition, Long> targetOffsets = Map.of(
+ new TopicPartition("topic1", 0), 10L,
+ new TopicPartition("topic1", 1), 100L);
+
+ Consumer<String, String> consumer = mock(Consumer.class);
+ when(consumer.assignment())
+ .thenReturn(
+ Set.of(
+ new TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1)));
+ when(consumer.position(new TopicPartition("topic1", 0)))
+ .thenReturn(9L);
+ when(consumer.position(new TopicPartition("topic1", 1)))
+ .thenReturn(99L);
+
+ boolean result = isReachedOffsets(consumer, targetOffsets);
+ assertFalse(result);
+ }
+
+ @Test
+ public void testReachedOffsets() {
+ Map<TopicPartition, Long> targetOffsets = Map.of(
+ new TopicPartition("topic1", 0), 10L,
+ new TopicPartition("topic1", 1), 100L);
+
+ Consumer<String, String> consumer = mock(Consumer.class);
+ when(consumer.assignment())
+ .thenReturn(
+ Set.of(
+ new TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1)));
+ when(consumer.position(new TopicPartition("topic1", 0)))
+ .thenReturn(10L);
+ when(consumer.position(new TopicPartition("topic1", 1)))
+ .thenReturn(100L);
+
+ boolean result = isReachedOffsets(consumer, targetOffsets);
+ assertTrue(result);
+ }
+
+ @Test
+ public void testOverrunOffsets() {
+ Map<TopicPartition, Long> targetOffsets = Map.of(
+ new TopicPartition("topic1", 0), 10L,
+ new TopicPartition("topic1", 1), 100L);
+
+ Consumer<String, String> consumer = mock(Consumer.class);
+ when(consumer.assignment())
+ .thenReturn(
+ Set.of(
+ new TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1)));
+ when(consumer.position(new TopicPartition("topic1", 0)))
+ .thenReturn(11L);
+ when(consumer.position(new TopicPartition("topic1", 1)))
+ .thenReturn(101L);
+
+ boolean result = isReachedOffsets(consumer, targetOffsets);
+ assertTrue(result);
+ }
+
+ @Test
+ public void testReachedOffsetsForSomePartitions() {
+ Map<TopicPartition, Long> targetOffsets = Map.of(
+ new TopicPartition("topic1", 0), 10L,
+ new TopicPartition("topic1", 1), 100L);
+
+ Consumer<String, String> consumer = mock(Consumer.class);
+ when(consumer.assignment())
+ .thenReturn(
+ Set.of(
+ new TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1)));
+ when(consumer.position(new TopicPartition("topic1", 0)))
+ .thenReturn(10L);
+ when(consumer.position(new TopicPartition("topic1", 1)))
+ .thenReturn(99L);
+
+ boolean result = isReachedOffsets(consumer, targetOffsets);
+ assertFalse(result);
+ }
+
+ @Test
+ public void testNotReachedOffsetsSomeTargetOffsetsUnspecified() {
+ Map<TopicPartition, Long> targetOffsets = Map.of(
+ new TopicPartition("topic1", 0), 10L);
+
+ Consumer<String, String> consumer = mock(Consumer.class);
+ when(consumer.assignment())
+ .thenReturn(
+ Set.of(
+ new TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1)));
+ when(consumer.position(new TopicPartition("topic1", 0)))
+ .thenReturn(9L);
+ when(consumer.position(new TopicPartition("topic1", 1)))
+ .thenReturn(99L);
+
+ boolean result = isReachedOffsets(consumer, targetOffsets);
+ assertFalse(result);
+ }
+
+ @Test
+ public void testReachedOffsetsSomeTargetOffsetsUnspecified() {
+ Map<TopicPartition, Long> targetOffsets = Map.of(
+ new TopicPartition("topic1", 0), 10L);
+
+ Consumer<String, String> consumer = mock(Consumer.class);
+ when(consumer.assignment())
+ .thenReturn(
+ Set.of(
+ new TopicPartition("topic1", 0),
+ new TopicPartition("topic1", 1)));
+ when(consumer.position(new TopicPartition("topic1", 0)))
+ .thenReturn(10L);
+ when(consumer.position(new TopicPartition("topic1", 1)))
+ .thenReturn(99L);
+
+ boolean result = isReachedOffsets(consumer, targetOffsets);
+ assertTrue(result);
+ }
+
+ @Test
+ public void testTargetOffsetsEmpty() {
+ Map<TopicPartition, Long> targetOffsets = Collections.emptyMap();
+ Consumer<String, String> consumer = mock(Consumer.class);
+ assertThrows(IllegalArgumentException.class, () -> isReachedOffsets(consumer, targetOffsets));
+ }
+
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
index 4ad1a0f57b7..cfb5ddd6006 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
@@ -89,7 +89,6 @@ public class KafkaIdempotentRepositoryPersistenceIT extends BaseEmbeddedKafkaTes
@Test
@DisplayName("Checks that half of the messages pass and duplicates are blocked")
public void testFirstPassFiltersAsExpected() {
- await().until(() -> kafkaIdempotentRepository.isCacheReady());
int count = 10;
sendMessages(count);