You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/11/07 21:35:57 UTC

[camel] branch main updated: CAMEL-18688: add full resume support for Kafka

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 59046fe140f CAMEL-18688: add full resume support for Kafka
59046fe140f is described below

commit 59046fe140ff0278dd5a243a4b61e4220b3eee20
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Nov 4 16:07:40 2022 +0100

    CAMEL-18688: add full resume support for Kafka
---
 .../org/apache/camel/kafka-adapter-factory         |   2 +
 .../camel/component/kafka/KafkaConsumer.java       |   5 +
 .../camel/component/kafka/KafkaFetchRecords.java   |  19 +-
 .../consumer/support/ResumeStrategyFactory.java    |  92 --------
 .../support/classic/AssignmentAdapterHelper.java   |  49 +++++
 .../ClassicRebalanceListener.java}                 |  31 +--
 .../classic/NoOpPartitionAssignmentAdapter.java    |  38 ++++
 .../OffsetPartitionAssignmentAdapter.java}         |  16 +-
 .../PartitionAssignmentAdapter.java}               |  13 +-
 .../SeekPolicyPartitionAssignmentAdapter.java}     |  15 +-
 .../consumer/support/resume/KafkaResumable.java    |  62 ++++++
 .../support/resume/KafkaResumeAdapter.java         | 113 ++++++++++
 .../ResumeRebalanceListener.java}                  |  32 ++-
 .../kafka/SingleNodeKafkaResumeStrategy.java       |   5 +-
 ...KafkaConsumerAutoInstResumeRouteStrategyIT.java | 113 ++++++++++
 .../KafkaConsumerWithResumeRouteStrategyIT.java    | 239 ---------------------
 .../processor/resume/TransientResumeStrategy.java  | 113 +++++-----
 17 files changed, 493 insertions(+), 464 deletions(-)

diff --git a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-adapter-factory b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-adapter-factory
new file mode 100644
index 00000000000..10d19f83eab
--- /dev/null
+++ b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-adapter-factory
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.kafka.consumer.support.resume.KafkaResumeAdapter
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index e1e29660b5c..dad29c73ad5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -238,4 +238,9 @@ public class KafkaConsumer extends DefaultConsumer
     public List<TaskHealthState> healthStates() {
         return tasks.stream().map(t -> t.healthState()).collect(Collectors.toList());
     }
+
+    @Override
+    public String adapterFactoryService() {
+        return "kafka-adapter-factory";
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index f94e3e118a5..b31a3898b5a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -27,11 +27,10 @@ import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.CommitManagers;
 import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
 import org.apache.camel.component.kafka.consumer.errorhandler.KafkaErrorStrategies;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter;
 import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
-import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
 import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
-import org.apache.camel.component.kafka.consumer.support.ResumeStrategyFactory;
+import org.apache.camel.component.kafka.consumer.support.classic.ClassicRebalanceListener;
+import org.apache.camel.component.kafka.consumer.support.resume.ResumeRebalanceListener;
 import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.support.task.ForegroundTask;
 import org.apache.camel.support.task.Tasks;
@@ -40,6 +39,7 @@ import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ReflectionHelper;
 import org.apache.camel.util.TimeUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
@@ -272,11 +272,16 @@ public class KafkaFetchRecords implements Runnable {
     }
 
     private void subscribe() {
-        KafkaConsumerResumeAdapter resumeStrategy = ResumeStrategyFactory.resolveResumeAdapter(kafkaConsumer);
-        resumeStrategy.setConsumer(consumer);
+        ConsumerRebalanceListener listener;
 
-        PartitionAssignmentListener listener = new PartitionAssignmentListener(
-                threadId, kafkaConsumer.getEndpoint().getConfiguration(), commitManager, resumeStrategy);
+        if (kafkaConsumer.getResumeStrategy() == null) {
+            listener = new ClassicRebalanceListener(
+                    threadId, kafkaConsumer.getEndpoint().getConfiguration(), commitManager, consumer);
+        } else {
+            listener = new ResumeRebalanceListener(
+                    threadId, kafkaConsumer.getEndpoint().getConfiguration(),
+                    commitManager, consumer, kafkaConsumer.getResumeStrategy());
+        }
 
         if (LOG.isInfoEnabled()) {
             LOG.info("Subscribing {} to {}", threadId, getPrintableTopic());
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
deleted file mode 100644
index 77315f2e85b..00000000000
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka.consumer.support;
-
-import org.apache.camel.component.kafka.KafkaConfiguration;
-import org.apache.camel.component.kafka.KafkaConsumer;
-import org.apache.camel.component.kafka.SeekPolicy;
-import org.apache.camel.resume.ResumeStrategy;
-import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class ResumeStrategyFactory {
-
-    /**
-     * A NO-OP resume strategy that does nothing (i.e.: no resume)
-     */
-    private static class NoOpKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
-
-        @SuppressWarnings("unused")
-        @Override
-        public void setConsumer(Consumer<?, ?> consumer) {
-            // NO-OP
-        }
-
-        @Override
-        public void setKafkaResumable(KafkaResumable kafkaResumable) {
-            // NO-OP
-        }
-
-        @SuppressWarnings("unused")
-        @Override
-        public void resume() {
-
-        }
-    }
-
-    private static final NoOpKafkaConsumerResumeAdapter NO_OP_RESUME_STRATEGY = new NoOpKafkaConsumerResumeAdapter();
-    private static final Logger LOG = LoggerFactory.getLogger(ResumeStrategyFactory.class);
-
-    private ResumeStrategyFactory() {
-    }
-
-    public static KafkaConsumerResumeAdapter resolveResumeAdapter(KafkaConsumer kafkaConsumer) {
-        // When using resumable routes, which register the strategy via service, it takes priority over everything else
-        ResumeStrategy resumeStrategy = kafkaConsumer.getResumeStrategy();
-        if (resumeStrategy != null) {
-            KafkaConsumerResumeAdapter adapter = resumeStrategy.getAdapter(KafkaConsumerResumeAdapter.class);
-
-            // The strategy should not be able to be created without an adapter, but let's be safe
-            assert adapter != null;
-
-            return adapter;
-        }
-
-        KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration();
-
-        return resolveBuiltinResumeAdapters(configuration);
-    }
-
-    private static KafkaConsumerResumeAdapter resolveBuiltinResumeAdapters(KafkaConfiguration configuration) {
-        LOG.debug("No resume strategy was provided ... checking for built-ins ...");
-        StateRepository<String, String> offsetRepository = configuration.getOffsetRepository();
-        SeekPolicy seekTo = configuration.getSeekTo();
-
-        if (offsetRepository != null) {
-            LOG.info("Using resume from offset strategy");
-            return new OffsetKafkaConsumerResumeAdapter(offsetRepository);
-        } else if (seekTo != null) {
-            LOG.info("Using resume from seek policy strategy with seeking from {}", seekTo);
-            return new SeekPolicyKafkaConsumerResumeAdapter(seekTo);
-        }
-
-        LOG.info("Using NO-OP resume strategy");
-        return NO_OP_RESUME_STRATEGY;
-    }
-}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/AssignmentAdapterHelper.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/AssignmentAdapterHelper.java
new file mode 100644
index 00000000000..082e53bdfef
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/AssignmentAdapterHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.consumer.support.classic;
+
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.SeekPolicy;
+import org.apache.camel.spi.StateRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class AssignmentAdapterHelper {
+
+    private static final NoOpPartitionAssignmentAdapter NO_OP_ASSIGNMENT_ADAPTER = new NoOpPartitionAssignmentAdapter();
+    private static final Logger LOG = LoggerFactory.getLogger(AssignmentAdapterHelper.class);
+
+    private AssignmentAdapterHelper() {
+    }
+
+    public static PartitionAssignmentAdapter resolveBuiltinResumeAdapters(KafkaConfiguration configuration) {
+        LOG.debug("No resume strategy was provided ... checking for built-ins ...");
+        StateRepository<String, String> offsetRepository = configuration.getOffsetRepository();
+        SeekPolicy seekTo = configuration.getSeekTo();
+
+        if (offsetRepository != null) {
+            LOG.info("Using resume from offset strategy");
+            return new OffsetPartitionAssignmentAdapter(offsetRepository);
+        } else if (seekTo != null) {
+            LOG.info("Using resume from seek policy strategy with seeking from {}", seekTo);
+            return new SeekPolicyPartitionAssignmentAdapter(seekTo);
+        }
+
+        LOG.info("Using NO-OP resume strategy");
+        return NO_OP_ASSIGNMENT_ADAPTER;
+    }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
similarity index 67%
copy from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
copy to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
index 03046f7a10b..ba11d1a7657 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
@@ -15,34 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.classic;
 
 import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
 
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.consumer.CommitManager;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PartitionAssignmentListener implements ConsumerRebalanceListener {
-    private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignmentListener.class);
+public class ClassicRebalanceListener implements ConsumerRebalanceListener {
+    private static final Logger LOG = LoggerFactory.getLogger(ClassicRebalanceListener.class);
 
     private final String threadId;
     private final KafkaConfiguration configuration;
-    private final KafkaConsumerResumeAdapter resumeStrategy;
+    private final PartitionAssignmentAdapter assignmentAdapter;
     private final CommitManager commitManager;
 
-    public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration,
-                                       CommitManager commitManager,
-                                       KafkaConsumerResumeAdapter resumeStrategy) {
+    public ClassicRebalanceListener(String threadId, KafkaConfiguration configuration,
+                                    CommitManager commitManager, Consumer<?, ?> consumer) {
         this.threadId = threadId;
         this.configuration = configuration;
         this.commitManager = commitManager;
-        this.resumeStrategy = resumeStrategy;
+
+        assignmentAdapter = AssignmentAdapterHelper.resolveBuiltinResumeAdapters(configuration);
+        assignmentAdapter.setConsumer(consumer);
     }
 
     @Override
@@ -59,19 +59,10 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
 
     @Override
     public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-
         if (LOG.isDebugEnabled()) {
             partitions.forEach(p -> LOG.debug("onPartitionsAssigned: {} from {}", threadId, p.topic()));
-
         }
-        List<KafkaResumable> resumables = partitions.stream()
-                .map(p -> new KafkaResumable(String.valueOf(p.partition()), p.topic())).collect(Collectors.toList());
-
-        resumables.forEach(this::doResume);
-    }
 
-    private void doResume(KafkaResumable r) {
-        resumeStrategy.setKafkaResumable(r);
-        resumeStrategy.resume();
+        assignmentAdapter.handlePartitionAssignment();
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/NoOpPartitionAssignmentAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/NoOpPartitionAssignmentAdapter.java
new file mode 100644
index 00000000000..b97f7689e82
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/NoOpPartitionAssignmentAdapter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support.classic;
+
+import org.apache.kafka.clients.consumer.Consumer;
+
+/**
+ * A NO-OP resume strategy that does nothing (i.e.: no resume)
+ */
+public class NoOpPartitionAssignmentAdapter implements PartitionAssignmentAdapter {
+
+    @SuppressWarnings("unused")
+    @Override
+    public void setConsumer(Consumer<?, ?> consumer) {
+        // NO-OP
+    }
+
+    @SuppressWarnings("unused")
+    @Override
+    public void handlePartitionAssignment() {
+
+    }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java
similarity index 86%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java
index 3ef4f209c43..4227ad7f004 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.classic;
 
 import java.util.Set;
 
@@ -30,14 +30,14 @@ import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProce
 /**
  * A resume strategy that uses Kafka's offset for resuming
  */
-public class OffsetKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
+public class OffsetPartitionAssignmentAdapter implements PartitionAssignmentAdapter {
 
-    private static final Logger LOG = LoggerFactory.getLogger(OffsetKafkaConsumerResumeAdapter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(OffsetPartitionAssignmentAdapter.class);
 
     private final StateRepository<String, String> offsetRepository;
     private Consumer<?, ?> consumer;
 
-    public OffsetKafkaConsumerResumeAdapter(StateRepository<String, String> offsetRepository) {
+    public OffsetPartitionAssignmentAdapter(StateRepository<String, String> offsetRepository) {
         this.offsetRepository = offsetRepository;
     }
 
@@ -46,11 +46,6 @@ public class OffsetKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdap
         this.consumer = consumer;
     }
 
-    @Override
-    public void setKafkaResumable(KafkaResumable kafkaResumable) {
-        // NO-OP
-    }
-
     private void resumeFromOffset(final Consumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) {
         // The state contains the last read offset, so you need to seek from the next one
         long offset = deserializeOffsetValue(offsetState) + 1;
@@ -58,8 +53,7 @@ public class OffsetKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdap
         consumer.seek(topicPartition, offset);
     }
 
-    @Override
-    public void resume() {
+    public void handlePartitionAssignment() {
         Set<TopicPartition> assignments = consumer.assignment();
         for (TopicPartition topicPartition : assignments) {
             String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition));
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
similarity index 75%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
index d25c05662cb..3c6a92d2542 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
@@ -14,9 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.classic;
 
-import org.apache.camel.resume.ResumeAdapter;
 import org.apache.kafka.clients.consumer.Consumer;
 
 /**
@@ -27,7 +26,7 @@ import org.apache.kafka.clients.consumer.Consumer;
  * the component is set up to use more than one of them. As such, implementations are responsible for ensuring the
  * thread-safety of the operations within the resume method.
  */
-public interface KafkaConsumerResumeAdapter extends ResumeAdapter {
+public interface PartitionAssignmentAdapter {
 
     /**
      * Sets the Kafka consumer instance for the adapter. Please note that the Kafka consumer is not safe for concurrent
@@ -37,11 +36,5 @@ public interface KafkaConsumerResumeAdapter extends ResumeAdapter {
      */
     void setConsumer(Consumer<?, ?> consumer);
 
-    /**
-     * Sets an optional resumable instance for the adapter. This is usually set during partition assignment. Garanteed
-     * not to be null and safe to ignore if partition and topic information are not used.
-     * 
-     * @param kafkaResumable the resumable instance
-     */
-    void setKafkaResumable(KafkaResumable kafkaResumable);
+    void handlePartitionAssignment();
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/SeekPolicyPartitionAssignmentAdapter.java
similarity index 81%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/SeekPolicyPartitionAssignmentAdapter.java
index 0dee4e1b849..b97839199d6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/SeekPolicyPartitionAssignmentAdapter.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.classic;
 
 import org.apache.camel.component.kafka.SeekPolicy;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -24,14 +24,14 @@ import org.slf4j.LoggerFactory;
 /**
  * A resume strategy that uses Camel's seekTo configuration for resuming
  */
-public class SeekPolicyKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
+public class SeekPolicyPartitionAssignmentAdapter implements PartitionAssignmentAdapter {
 
-    private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeAdapter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyPartitionAssignmentAdapter.class);
 
     private final SeekPolicy seekPolicy;
     private Consumer<?, ?> consumer;
 
-    public SeekPolicyKafkaConsumerResumeAdapter(SeekPolicy seekPolicy) {
+    public SeekPolicyPartitionAssignmentAdapter(SeekPolicy seekPolicy) {
         this.seekPolicy = seekPolicy;
     }
 
@@ -41,12 +41,7 @@ public class SeekPolicyKafkaConsumerResumeAdapter implements KafkaConsumerResume
     }
 
     @Override
-    public void setKafkaResumable(KafkaResumable kafkaResumable) {
-        // NO-OP
-    }
-
-    @Override
-    public void resume() {
+    public void handlePartitionAssignment() {
         if (seekPolicy == SeekPolicy.BEGINNING) {
             LOG.debug("Seeking from the beginning of topic");
             consumer.seekToBeginning(consumer.assignment());
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumable.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumable.java
new file mode 100644
index 00000000000..bcbcf578d20
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumable.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support.resume;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.support.resume.OffsetKeys;
+import org.apache.camel.support.resume.Offsets;
+
+public final class KafkaResumable implements Resumable {
+    private final String addressable;
+    private final Long offset;
+
+    private KafkaResumable(String addressable, Long offset) {
+        this.addressable = addressable;
+        this.offset = offset;
+    }
+
+    @Override
+    public OffsetKey<?> getOffsetKey() {
+        return OffsetKeys.unmodifiableOf(addressable);
+    }
+
+    @Override
+    public Offset<?> getLastOffset() {
+        return Offsets.of(offset);
+    }
+
+    /**
+     * Creates a new resumable for Kafka
+     * 
+     * @param  exchange the exchange to create the resumable from
+     * @return          a new KafkaResumable instance with the data from the exchange
+     */
+    public static KafkaResumable of(Exchange exchange) {
+        String topic = exchange.getMessage().getHeader(KafkaConstants.TOPIC, String.class);
+        Integer partition = exchange.getMessage().getHeader(KafkaConstants.PARTITION, Integer.class);
+        Long offset = exchange.getMessage().getHeader(KafkaConstants.OFFSET, Long.class);
+
+        String topicPartition = topic + "/" + partition;
+
+        return new KafkaResumable(topicPartition, offset);
+    }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumeAdapter.java
new file mode 100644
index 00000000000..641e5b5af06
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumeAdapter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support.resume;
+
+import java.nio.ByteBuffer;
+
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.Deserializable;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.camel.spi.annotations.JdkService;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@JdkService("kafka-adapter-factory")
+public class KafkaResumeAdapter implements ResumeAdapter, Deserializable, Cacheable {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaResumeAdapter.class);
+
+    private Consumer<?, ?> consumer;
+    private ResumeCache<TopicPartition> resumeCache;
+
+    private boolean resume(TopicPartition topicPartition, Object value) {
+        consumer.seek(topicPartition, (Long) value);
+
+        return true;
+    }
+
+    @Override
+    public void resume() {
+        resumeCache.forEach(this::resume);
+    }
+
+    @Override
+    public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
+        Object keyObj = deserializeKey(keyBuffer);
+        Object valueObj = deserializeValue(valueBuffer);
+
+        if (keyObj instanceof String) {
+            String key = (String) keyObj;
+
+            final String[] keyParts = key.split("/");
+            if (keyParts == null || keyParts.length != 2) {
+
+                String topic = keyParts[0];
+                int partition = Integer.parseInt(keyParts[1]);
+
+                if (valueObj instanceof Long) {
+                    Long offset = (Long) valueObj;
+
+                    resumeCache.add(new TopicPartition(topic, partition), offset);
+                } else {
+                    LOG.warn("The type for the key '{}' is invalid: {}", key, valueObj);
+                }
+
+            } else {
+                LOG.warn("Unable to deserialize key '{}' because it has in invalid format and it will be discarded",
+                        key);
+            }
+        } else {
+            LOG.warn("Unable to deserialize key '{}' because its type is invalid", keyObj);
+        }
+
+        return false;
+    }
+
+    @Override
+    public boolean add(OffsetKey<?> key, Offset<?> offset) {
+        Object keyObj = key.getValue();
+        Long valueObject = offset.getValue(Long.class);
+
+        if (keyObj instanceof TopicPartition) {
+            TopicPartition topicPartition = (TopicPartition) keyObj;
+
+            resumeCache.add(topicPartition, valueObject);
+        }
+
+        return true;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setCache(ResumeCache<?> cache) {
+        this.resumeCache = (ResumeCache<TopicPartition>) cache;
+    }
+
+    @Override
+    public ResumeCache<?> getCache() {
+        return resumeCache;
+    }
+
+    public void setConsumer(Consumer<?, ?> consumer) {
+        this.consumer = consumer;
+    }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
similarity index 67%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
index 03046f7a10b..a7a47b72fa1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
@@ -15,34 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.resume;
 
 import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
 
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.consumer.CommitManager;
+import org.apache.camel.component.kafka.consumer.support.classic.ClassicRebalanceListener;
+import org.apache.camel.resume.ResumeStrategy;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PartitionAssignmentListener implements ConsumerRebalanceListener {
-    private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignmentListener.class);
+public class ResumeRebalanceListener implements ConsumerRebalanceListener {
+    private static final Logger LOG = LoggerFactory.getLogger(ClassicRebalanceListener.class);
 
     private final String threadId;
     private final KafkaConfiguration configuration;
-    private final KafkaConsumerResumeAdapter resumeStrategy;
     private final CommitManager commitManager;
+    private final KafkaResumeAdapter resumeAdapter;
 
-    public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration,
-                                       CommitManager commitManager,
-                                       KafkaConsumerResumeAdapter resumeStrategy) {
+    public ResumeRebalanceListener(String threadId, KafkaConfiguration configuration,
+                                   CommitManager commitManager, Consumer<?, ?> consumer, ResumeStrategy resumeStrategy) {
         this.threadId = threadId;
         this.configuration = configuration;
         this.commitManager = commitManager;
-        this.resumeStrategy = resumeStrategy;
+
+        resumeAdapter = resumeStrategy.getAdapter(KafkaResumeAdapter.class);
+        resumeAdapter.setConsumer(consumer);
     }
 
     @Override
@@ -59,19 +61,11 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
 
     @Override
     public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-
         if (LOG.isDebugEnabled()) {
             partitions.forEach(p -> LOG.debug("onPartitionsAssigned: {} from {}", threadId, p.topic()));
-
         }
-        List<KafkaResumable> resumables = partitions.stream()
-                .map(p -> new KafkaResumable(String.valueOf(p.partition()), p.topic())).collect(Collectors.toList());
 
-        resumables.forEach(this::doResume);
+        resumeAdapter.resume();
     }
 
-    private void doResume(KafkaResumable r) {
-        resumeStrategy.setKafkaResumable(r);
-        resumeStrategy.resume();
-    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index c3863b6508e..32ee0a4af97 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -346,7 +346,10 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
 
     @Override
     public ResumeAdapter getAdapter() {
-        waitForInitialization();
+        if (adapter == null) {
+            waitForInitialization();
+        }
+
         return adapter;
     }
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java
new file mode 100644
index 00000000000..17a69566ab2
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.consumer.support.resume.KafkaResumable;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConsumerAutoInstResumeRouteStrategyIT extends BaseEmbeddedKafkaTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAutoInstResumeRouteStrategyIT.class);
+    private static final String TOPIC = "resumable-route-auto";
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    public static KafkaResumeStrategyConfigurationBuilder getDefaultKafkaResumeStrategyConfigurationBuilder() {
+        return KafkaResumeStrategyConfigurationBuilder.newBuilder()
+                .withBootstrapServers(service.getBootstrapServers())
+                .withTopic("resumable-route-auto-offsets")
+                .withResumeCache(TransientResumeStrategy.createSimpleCache())
+                .withProducerProperty("max.block.ms", "10000")
+                .withMaxInitializationDuration(Duration.ofSeconds(5))
+                .withProducerProperty("delivery.timeout.ms", "30000")
+                .withProducerProperty("session.timeout.ms", "15000")
+                .withProducerProperty("request.timeout.ms", "15000")
+                .withConsumerProperty("session.timeout.ms", "20000");
+    }
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+        KafkaProducer<Object, Object> producer = new KafkaProducer<>(props);
+
+        for (int i = 0; i < 10; i++) {
+            producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i)));
+        }
+    }
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        super.doPreSetup();
+
+    }
+
+    @Test
+    @Timeout(value = 30)
+    public void testOffsetIsBeingChecked() throws InterruptedException {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+
+        mock.expectedMessageCount(10);
+        mock.assertIsSatisfied();
+    }
+
+    @AfterEach
+    public void after() {
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    public void process(Exchange exchange) {
+        exchange.getMessage().setHeader(Exchange.OFFSET, KafkaResumable.of(exchange));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                fromF("kafka:%s?groupId=%s_GROUP&autoCommitIntervalMs=1000"
+                      + "&autoOffsetReset=earliest&consumersCount=1", TOPIC, TOPIC)
+                              .resumable().configuration(getDefaultKafkaResumeStrategyConfigurationBuilder())
+                              .process(e -> process(e))
+                              .routeId("resume-strategy-auto-route")
+                              // Note: this is for manually testing the ResumableCompletion onFailure exception logging. Uncomment it for testing it
+                              // .process(e -> e.setException(new RuntimeCamelException("Mock error in test")))
+                              .to("mock:sentMessages");
+
+                fromF("kafka:%s?groupId=%s_GROUP&autoCommitIntervalMs=1000", "resumable-route-auto-offsets",
+                        "resumable-route-auto-offsets")
+                                .to("mock:result");
+            }
+        };
+    }
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
deleted file mode 100644
index c6d903cd546..00000000000
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka.integration;
-
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.BindToRegistry;
-import org.apache.camel.EndpointInject;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter;
-import org.apache.camel.component.kafka.consumer.support.KafkaResumable;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.resume.TransientResumeStrategy;
-import org.apache.camel.resume.Offset;
-import org.apache.camel.resume.OffsetKey;
-import org.apache.camel.resume.Resumable;
-import org.apache.camel.resume.ResumeAdapter;
-import org.apache.camel.support.resume.Resumables;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTestSupport {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerWithResumeRouteStrategyIT.class);
-    private static final String TOPIC = "resumable-route-tp";
-    private static final int RANDOM_VALUE = ThreadLocalRandom.current().nextInt(1, 1000);
-
-    @EndpointInject("mock:result")
-    private MockEndpoint result;
-
-    @BindToRegistry("resumeStrategy")
-    private TestUpdateStrategy resumeStrategy;
-    private CountDownLatch messagesLatch;
-
-    private static class TestUpdateStrategy extends TransientResumeStrategy {
-        private final CountDownLatch messagesLatch;
-        private boolean startCalled;
-        private boolean offsetNull = true;
-        private boolean offsetAddressableNull = true;
-        private boolean offsetAddressableEmpty = true;
-        private boolean offsetValueNull = true;
-        private boolean offsetValueEmpty = true;
-        private int lastOffset;
-
-        public TestUpdateStrategy(ResumeAdapter resumeAdapter, CountDownLatch messagesLatch) {
-            super(resumeAdapter);
-
-            this.messagesLatch = messagesLatch;
-        }
-
-        @Override
-        public void start() {
-            LOG.warn("Start was called");
-            startCalled = true;
-        }
-
-        @Override
-        public void init() {
-            LOG.warn("Init was called");
-        }
-
-        @Override
-        public void updateLastOffset(Resumable offset) {
-            try {
-                if (offset != null) {
-                    offsetNull = false;
-
-                    OffsetKey<?> addressable = offset.getOffsetKey();
-                    if (addressable != null) {
-                        offsetAddressableNull = false;
-                        offsetAddressableEmpty = addressable.getValue() == null;
-
-                    }
-
-                    Offset<?> offsetValue = offset.getLastOffset();
-                    if (offsetValue != null) {
-                        offsetValueNull = false;
-
-                        if (offsetValue.getValue() != null) {
-                            offsetValueEmpty = false;
-                            lastOffset = (int) offsetValue.getValue();
-                        }
-                    }
-                }
-            } finally {
-                messagesLatch.countDown();
-            }
-        }
-
-        public boolean isOffsetNull() {
-            return offsetNull;
-        }
-
-        public boolean isOffsetAddressableNull() {
-            return offsetAddressableNull;
-        }
-
-        public boolean isOffsetValueNull() {
-            return offsetValueNull;
-        }
-
-        public boolean isOffsetAddressableEmpty() {
-            return offsetAddressableEmpty;
-        }
-
-        public boolean isOffsetValueEmpty() {
-            return offsetValueEmpty;
-        }
-
-        public boolean isStartCalled() {
-            return startCalled;
-        }
-    }
-
-    private static class TestKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
-        private boolean resumeCalled;
-        private boolean consumerIsNull = true;
-
-        @Override
-        public void setConsumer(Consumer<?, ?> consumer) {
-            if (consumer != null) {
-                consumerIsNull = false;
-            }
-        }
-
-        @Override
-        public void setKafkaResumable(KafkaResumable kafkaResumable) {
-
-        }
-
-        @Override
-        public void resume() {
-            resumeCalled = true;
-        }
-
-        public boolean isResumeCalled() {
-            return resumeCalled;
-        }
-
-        public boolean isConsumerIsNull() {
-            return consumerIsNull;
-        }
-    }
-
-    @BeforeEach
-    public void before() {
-        Properties props = getDefaultProperties();
-        KafkaProducer<Object, Object> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
-
-        for (int i = 0; i < 10; i++) {
-            producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i)));
-        }
-    }
-
-    @Override
-    protected void doPreSetup() throws Exception {
-        super.doPreSetup();
-
-        messagesLatch = new CountDownLatch(1);
-        resumeStrategy = new TestUpdateStrategy(new TestKafkaConsumerResumeAdapter(), messagesLatch);
-    }
-
-    @Test
-    //    @Timeout(value = 30)
-    public void testOffsetIsBeingChecked() throws InterruptedException {
-        assertTrue(messagesLatch.await(100, TimeUnit.SECONDS), "The resume was not called");
-
-        final TestKafkaConsumerResumeAdapter adapter = resumeStrategy.getAdapter(TestKafkaConsumerResumeAdapter.class);
-        assertNotNull(adapter, "The adapter should not be null");
-        assertTrue(adapter.isResumeCalled(),
-                "The resume strategy should have been called when the partition was assigned");
-        assertFalse(adapter.isConsumerIsNull(),
-                "The consumer passed to the strategy should not be null");
-        assertTrue(resumeStrategy.isStartCalled(),
-                "The resume strategy should have been started");
-        assertFalse(resumeStrategy.isOffsetNull(),
-                "The offset should not be null");
-        assertFalse(resumeStrategy.isOffsetAddressableNull(),
-                "The offset addressable should not be null");
-        assertFalse(resumeStrategy.isOffsetAddressableEmpty(),
-                "The offset addressable should not be empty");
-        assertFalse(resumeStrategy.isOffsetValueNull(),
-                "The offset value should not be null");
-        assertFalse(resumeStrategy.isOffsetValueEmpty(),
-                "The offset value should not be empty");
-        assertEquals(RANDOM_VALUE, resumeStrategy.lastOffset, "the offsets don't match");
-    }
-
-    @AfterEach
-    public void after() {
-        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() {
-        return new RouteBuilder() {
-            @Override
-            public void configure() {
-                from("kafka:" + TOPIC + "?groupId=" + TOPIC + "_GROUP&autoCommitIntervalMs=1000"
-                     + "&autoOffsetReset=earliest&consumersCount=1")
-                             .resumable().resumeStrategy("resumeStrategy", "DEBUG")
-                             .routeId("resume-strategy-route")
-                             .setHeader(Exchange.OFFSET, constant(Resumables.of("key", RANDOM_VALUE)))
-                             // Note: this is for manually testing the ResumableCompletion onFailure exception logging. Uncomment it for testing it
-                             // .process(e -> e.setException(new RuntimeCamelException("Mock error in test")))
-                             .to("mock:result");
-            }
-        };
-    }
-}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 0cfe5d86703..89ca17daf87 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -119,61 +119,7 @@ public class TransientResumeStrategy implements ResumeStrategy {
 
                     @Override
                     public ResumeCache<?> getResumeCache() {
-                        return new ResumeCache<>() {
-                            private Map<Object, Object> cache = new HashMap<>();
-
-                            @Override
-                            public Object computeIfAbsent(Object key, Function<? super Object, ? super Object> mapping) {
-                                return cache.computeIfAbsent(key, mapping);
-                            }
-
-                            @Override
-                            public Object computeIfPresent(
-                                    Object key, BiFunction<? super Object, ? super Object, ? super Object> remapping) {
-                                return cache.computeIfPresent(key, remapping);
-                            }
-
-                            @Override
-                            public boolean contains(Object key, Object entry) {
-                                return Objects.equals(cache.get(key), entry);
-                            }
-
-                            @Override
-                            public void add(Object key, Object offsetValue) {
-                                cache.put(key, offsetValue);
-                            }
-
-                            @Override
-                            public boolean isFull() {
-                                return false;
-                            }
-
-                            @Override
-                            public long capacity() {
-                                return Integer.MAX_VALUE;
-                            }
-
-                            @Override
-                            public <T> T get(Object key, Class<T> clazz) {
-                                final Object o = cache.get(key);
-
-                                return clazz.cast(o);
-                            }
-
-                            @Override
-                            public Object get(Object key) {
-                                return cache.get(key);
-                            }
-
-                            @Override
-                            public void forEach(BiFunction<? super Object, ? super Object, Boolean> action) {
-                                for (Map.Entry e : cache.entrySet()) {
-                                    if (!action.apply(e.getKey(), e.getValue())) {
-                                        cache.remove(e.getKey());
-                                    }
-                                }
-                            }
-                        };
+                        return createSimpleCache();
                     }
 
                     @Override
@@ -184,4 +130,61 @@ public class TransientResumeStrategy implements ResumeStrategy {
             }
         };
     }
+
+    public static ResumeCache<Object> createSimpleCache() {
+        return new ResumeCache<>() {
+            private Map<Object, Object> cache = new HashMap<>();
+
+            @Override
+            public Object computeIfAbsent(Object key, Function<? super Object, ? super Object> mapping) {
+                return cache.computeIfAbsent(key, mapping);
+            }
+
+            @Override
+            public Object computeIfPresent(Object key, BiFunction<? super Object, ? super Object, ? super Object> remapping) {
+                return cache.computeIfPresent(key, remapping);
+            }
+
+            @Override
+            public boolean contains(Object key, Object entry) {
+                return Objects.equals(cache.get(key), entry);
+            }
+
+            @Override
+            public void add(Object key, Object offsetValue) {
+                cache.put(key, offsetValue);
+            }
+
+            @Override
+            public boolean isFull() {
+                return false;
+            }
+
+            @Override
+            public long capacity() {
+                return Integer.MAX_VALUE;
+            }
+
+            @Override
+            public <T> T get(Object key, Class<T> clazz) {
+                final Object o = cache.get(key);
+
+                return clazz.cast(o);
+            }
+
+            @Override
+            public Object get(Object key) {
+                return cache.get(key);
+            }
+
+            @Override
+            public void forEach(BiFunction<? super Object, ? super Object, Boolean> action) {
+                for (Map.Entry e : cache.entrySet()) {
+                    if (!action.apply(e.getKey(), e.getValue())) {
+                        cache.remove(e.getKey());
+                    }
+                }
+            }
+        };
+    }
 }