You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/05/03 13:14:52 UTC

[camel] 02/02: CAMEL-17762: rework the resume strategy to separate concerns

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a68c2607e5fd2cb35b4d11828ba830240212a9d8
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Apr 29 16:14:39 2022 +0200

    CAMEL-17762: rework the resume strategy to separate concerns
    
    Rework the strategy so that the handling of the strategy service is
    separate from the logic that handles the component-specific resume.
    
    This brings builtin caches, improved reusable Kafka resume trategies and
    additional cleanups to the resume API interfaces, classes and
    documentation.
---
 .../org/apache/camel/catalog/models/pausable.json  |   2 +-
 .../org/apache/camel/catalog/models/resumable.json |   2 +-
 .../apache/camel/component/feed/EntryFilter.java   |  14 +-
 .../component/feed/FeedEntryPollingConsumer.java   |  24 ++-
 .../AtomEntryPollingConsumerWithResumeTest.java    |   3 +-
 .../component/aws2/kinesis/Kinesis2Consumer.java   |  26 ++-
 ...sumeStrategy.java => KinesisResumeAdapter.java} |  23 +--
 ... => KinesisUserConfigurationResumeAdapter.java} |   4 +-
 .../caffeine/resume/multi/CaffeineCache.java       |  98 +++++++++
 .../caffeine/resume/single/CaffeineCache.java      |  93 +++++++++
 .../component/cassandra/CassandraConsumer.java     |  20 +-
 ...meStrategy.java => CassandraResumeAdapter.java} |  16 +-
 .../CassandraComponentResumeStrategyIT.java        |   9 +-
 .../component/couchbase/CouchbaseConsumer.java     |  18 +-
 ...meStrategy.java => CouchbaseResumeAdapter.java} |  16 +-
 .../integration/ConsumeResumeStrategyIT.java       |  16 +-
 components/camel-couchdb/pom.xml                   |   4 +
 .../component/couchdb/CouchDbChangesetTracker.java |  15 +-
 .../camel/component/couchdb/CouchDbConsumer.java   |  12 +-
 .../couchdb/consumer/CouchDbResumable.java         |   4 +-
 ...sumeStrategy.java => CouchDbResumeAdapter.java} |  21 +-
 .../consumer/CouchDbResumeStrategyFactory.java     |   8 +-
 ...java => LatestUpdateSequenceResumeAdapter.java} |   4 +-
 .../apache/camel/component/file/FileConsumer.java  |  37 ++--
 .../apache/camel/component/file/GenericFile.java   |   2 +-
 ...rResumeStrategy.java => FileResumeAdapter.java} |   6 +-
 .../component/file/consumer/FileResumeSet.java     |   2 +-
 ...sumeStrategy.java => FileSetResumeAdapter.java} |   7 +-
 .../file/consumer/GenericFileResumable.java        |   2 +-
 ...Strategy.java => GenericFileResumeAdapter.java} |  15 +-
 .../adapters/DefaultFileSetResumeAdapter.java      |  65 ++++++
 .../adapters/DefaultGenericFileResumeAdapter.java  |  64 ++++++
 .../camel/component/kafka/KafkaConsumer.java       |  14 +-
 .../camel/component/kafka/KafkaFetchRecords.java   |   4 +-
 .../errorhandler/KafkaConsumerListener.java        |   2 +-
 ...rategy.java => KafkaConsumerResumeAdapter.java} |  33 +--
 .../kafka/consumer/support/KafkaOffset.java        |   2 +-
 .../kafka/consumer/support/KafkaResumable.java     |   4 +-
 ....java => OffsetKafkaConsumerResumeAdapter.java} |  21 +-
 .../support/PartitionAssignmentListener.java       |  11 +-
 .../consumer/support/ResumeStrategyFactory.java    |  30 ++-
 ...a => SeekPolicyKafkaConsumerResumeAdapter.java} |  20 +-
 .../resume/kafka/KafkaResumeStrategy.java          |  22 +-
 .../resume/kafka/MultiNodeKafkaResumeStrategy.java | 152 ++++++++++++++
 .../camel/processor/resume/kafka/RecordError.java} |  31 ++-
 ...egy.java => SingleNodeKafkaResumeStrategy.java} | 227 ++++++++++++++-------
 .../KafkaConsumerWithResumeRouteStrategyIT.java    |  99 ++++-----
 .../camel/component/master/MasterConsumer.java     |   4 +-
 .../src/main/java/org/apache/camel/Route.java      |   2 +
 .../camel/{ => resume}/ConsumerListener.java       |   2 +-
 .../camel/{ => resume}/ConsumerListenerAware.java  |   2 +-
 .../java/org/apache/camel/{ => resume}/Offset.java |   2 +-
 .../org/apache/camel/{ => resume}/Resumable.java   |   2 +-
 .../apache/camel/{ => resume}/ResumableSet.java    |   2 +-
 .../ResumeAdapter.java}                            |  16 +-
 .../org/apache/camel/{ => resume}/ResumeAware.java |   2 +-
 .../apache/camel/{ => resume}/ResumeStrategy.java  |  23 ++-
 .../UpdatableConsumerResumeStrategy.java           |   2 +-
 .../apache/camel/resume/cache/MultiEntryCache.java |  11 +-
 .../camel/{ => resume/cache}/ResumeCache.java      |  20 +-
 .../cache/SingleEntryCache.java}                   |  21 +-
 .../org/apache/camel/impl/engine/DefaultRoute.java |   8 +-
 .../docs/modules/eips/pages/resume-strategies.adoc | 139 ++++---------
 .../resources/org/apache/camel/model/pausable.json |   2 +-
 .../org/apache/camel/model/resumable.json          |   2 +-
 .../org/apache/camel/model/PausableDefinition.java |   4 +-
 .../apache/camel/model/ProcessorDefinition.java    |   4 +-
 .../apache/camel/model/ResumableDefinition.java    |   4 +-
 .../apache/camel/processor/PausableProcessor.java  |   2 +-
 .../processor/resume/DelegatingResumeAdapter.java  | 181 ++++++++++++++++
 .../processor/resume/DelegatingResumeStrategy.java | 102 ---------
 .../processor/resume/ResumableCompletion.java      |   6 +-
 .../camel/processor/resume/ResumableProcessor.java |   9 +-
 .../processor/resume/TransientResumeStrategy.java  |  33 +--
 .../org/apache/camel/reifier/PausableReifier.java  |   2 +-
 .../org/apache/camel/reifier/ResumableReifier.java |   2 +-
 .../FileConsumerResumeFromOffsetStrategyTest.java  |  27 ++-
 .../file/FileConsumerResumeStrategyTest.java       |  17 +-
 .../main/java/org/apache/camel/resume/Offsets.java |   2 -
 .../java/org/apache/camel/resume/Resumables.java   |   3 -
 80 files changed, 1304 insertions(+), 680 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
index 88b088805ab..f1526b5311b 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
@@ -12,7 +12,7 @@
     "output": false
   },
   "properties": {
-    "consumerListener": { "kind": "attribute", "displayName": "Consumer Listener", "required": true, "type": "object", "javaType": "org.apache.camel.ConsumerListener", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the consumer listener to use" },
+    "consumerListener": { "kind": "attribute", "displayName": "Consumer Listener", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ConsumerListener", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the consumer listener to use" },
     "untilCheck": { "kind": "attribute", "displayName": "Until Check", "required": true, "type": "object", "javaType": "java.util.function.Predicate", "deprecated": false, "autowired": false, "secret": false },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json
index acbea228865..03a0a440e46 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json
@@ -12,7 +12,7 @@
     "output": false
   },
   "properties": {
-    "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" },
+    "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
   }
diff --git a/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java b/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java
index b477e1bb372..b0a4dd6e5a3 100644
--- a/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java
+++ b/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java
@@ -16,14 +16,14 @@
  */
 package org.apache.camel.component.feed;
 
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 
 /**
  * Filter used by the {@link org.apache.camel.component.feed.FeedEntryPollingConsumer} to filter entries from the feed.
  *
  * @param <E> entry type
  */
-public interface EntryFilter<E> extends ResumeStrategy {
+public interface EntryFilter<E> extends ResumeAdapter {
 
     /**
      * Tests to be used as filtering the feed for only entries of interest, such as only new entries, etc.
@@ -37,14 +37,4 @@ public interface EntryFilter<E> extends ResumeStrategy {
     default void resume() {
         // NO-OP by default. Implementations can implement more complex behaviors if needed
     }
-
-    @Override
-    default void start() {
-        // NO-OP
-    }
-
-    @Override
-    default void stop() {
-        // NO-OP
-    }
 }
diff --git a/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java b/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
index d8bcfcf255e..035e7ea4f15 100644
--- a/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
+++ b/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
@@ -20,14 +20,16 @@ import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 
 /**
  * Consumer to poll feeds and return each entry from the feed step by step.
  */
-public abstract class FeedEntryPollingConsumer<E> extends FeedPollingConsumer implements ResumeAware<EntryFilter<E>> {
+public abstract class FeedEntryPollingConsumer<E> extends FeedPollingConsumer implements ResumeAware<ResumeStrategy> {
     protected int entryIndex;
-    protected EntryFilter<E> entryFilter;
+    protected ResumeStrategy resumeStrategy;
     @SuppressWarnings("rawtypes")
     protected List<E> list;
     protected boolean throttleEntries;
@@ -52,8 +54,12 @@ public abstract class FeedEntryPollingConsumer<E> extends FeedPollingConsumer im
             polledMessages++;
 
             boolean valid = true;
-            if (entryFilter != null) {
-                valid = entryFilter.isValidEntry(entry);
+            if (resumeStrategy != null) {
+                ResumeAdapter adapter = resumeStrategy.getAdapter();
+
+                if (adapter instanceof EntryFilter) {
+                    valid = ((EntryFilter) adapter).isValidEntry(entry);
+                }
             }
             if (valid) {
                 Exchange exchange = endpoint.createExchange(feed, entry);
@@ -73,13 +79,13 @@ public abstract class FeedEntryPollingConsumer<E> extends FeedPollingConsumer im
     }
 
     @Override
-    public void setResumeStrategy(EntryFilter<E> resumeStrategy) {
-        this.entryFilter = resumeStrategy;
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
+        this.resumeStrategy = resumeStrategy;
     }
 
     @Override
-    public EntryFilter<E> getResumeStrategy() {
-        return entryFilter;
+    public ResumeStrategy getResumeStrategy() {
+        return resumeStrategy;
     }
 
     protected abstract void resetList();
diff --git a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerWithResumeTest.java b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerWithResumeTest.java
index b39e4d957c7..4c810f8a65e 100644
--- a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerWithResumeTest.java
+++ b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerWithResumeTest.java
@@ -22,6 +22,7 @@ import java.util.Date;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -57,7 +58,7 @@ public class AtomEntryPollingConsumerWithResumeTest extends CamelTestSupport {
         return new RouteBuilder() {
             public void configure() {
                 from("atom:file:src/test/data/feed.atom?splitEntries=true&delay=500")
-                        .resumable().resumeStrategy(new UpdatedDateFilter(getTestFilterDate()))
+                        .resumable().resumeStrategy(new TransientResumeStrategy(new UpdatedDateFilter(getTestFilterDate())))
                         .routeId("WithResume")
                         .to("mock:result");
             }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 1925b6ae57e..de0317df604 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -23,9 +23,10 @@ import java.util.Queue;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisUserConfigurationResumeStrategy;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisUserConfigurationResumeAdapter;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -41,13 +42,13 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 
-public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements ResumeAware<KinesisResumeStrategy> {
+public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements ResumeAware<ResumeStrategy> {
 
     private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class);
 
     private String currentShardIterator;
     private boolean isShardClosed;
-    private KinesisResumeStrategy resumeStrategy;
+    private ResumeStrategy resumeStrategy;
 
     public Kinesis2Consumer(Kinesis2Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -173,12 +174,15 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     }
 
     private void resume(GetShardIteratorRequest.Builder req) {
-        if (resumeStrategy == null) {
-            resumeStrategy = new KinesisUserConfigurationResumeStrategy(getEndpoint().getConfiguration());
+        KinesisResumeAdapter adapter;
+        if (resumeStrategy != null) {
+            adapter = resumeStrategy.getAdapter(KinesisResumeAdapter.class);
+        } else {
+            adapter = new KinesisUserConfigurationResumeAdapter(getEndpoint().getConfiguration());
         }
 
-        resumeStrategy.setRequestBuilder(req);
-        resumeStrategy.resume();
+        adapter.setRequestBuilder(req);
+        adapter.resume();
     }
 
     private Queue<Exchange> createExchanges(List<Record> records) {
@@ -203,12 +207,12 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     }
 
     @Override
-    public void setResumeStrategy(KinesisResumeStrategy resumeStrategy) {
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
 
     @Override
-    public KinesisResumeStrategy getResumeStrategy() {
+    public ResumeStrategy getResumeStrategy() {
         return resumeStrategy;
     }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
similarity index 75%
rename from components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
rename to components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
index 0592bfbaa09..b4f26b437eb 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
@@ -17,20 +17,17 @@
 
 package org.apache.camel.component.aws2.kinesis.consumer;
 
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 
-public interface KinesisResumeStrategy extends ResumeStrategy {
-
+/**
+ * The resume adapter for Kinesis
+ */
+public interface KinesisResumeAdapter extends ResumeAdapter {
+    /**
+     * Sets the shard iterator request builder that can be used to customize the call and set the exact resume point
+     * 
+     * @param builder the builder instance
+     */
     void setRequestBuilder(GetShardIteratorRequest.Builder builder);
-
-    @Override
-    default void start() {
-
-    }
-
-    @Override
-    default void stop() {
-
-    }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeAdapter.java
similarity index 91%
rename from components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
rename to components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeAdapter.java
index 56334437770..5e47dfb817b 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeAdapter.java
@@ -21,11 +21,11 @@ import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
-public class KinesisUserConfigurationResumeStrategy implements KinesisResumeStrategy {
+public class KinesisUserConfigurationResumeAdapter implements KinesisResumeAdapter {
     private final Kinesis2Configuration configuration;
     private GetShardIteratorRequest.Builder resumable;
 
-    public KinesisUserConfigurationResumeStrategy(Kinesis2Configuration configuration) {
+    public KinesisUserConfigurationResumeAdapter(Kinesis2Configuration configuration) {
         this.configuration = configuration;
     }
 
diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java
new file mode 100644
index 00000000000..e775be5d063
--- /dev/null
+++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.caffeine.resume.multi;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.camel.resume.cache.MultiEntryCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache that can store multiple key/valued resumables where the value is a list containing multiple values
+ * 
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class CaffeineCache<K, V> implements MultiEntryCache<K, V> {
+    private static final Logger LOG = LoggerFactory.getLogger(CaffeineCache.class);
+
+    private final Cache<K, List<V>> cache;
+    private final long cacheSize;
+
+    /**
+     * Builds a new instance of this object with the given cache size
+     * 
+     * @param cacheSize the size of the cache
+     */
+    public CaffeineCache(long cacheSize) {
+        this(Caffeine.newBuilder().maximumSize(cacheSize).build(), cacheSize);
+    }
+
+    /**
+     * Builds a new instance of this object
+     *
+     * @param cache     an instance of a pre-constructed cache object
+     * @param cacheSize the size of the pre-constructed cache object
+     */
+    public CaffeineCache(Cache<K, List<V>> cache, long cacheSize) {
+        this.cache = cache;
+        this.cacheSize = cacheSize;
+    }
+
+    @Override
+    public long capacity() {
+        return cacheSize;
+    }
+
+    @Override
+    public synchronized void add(K key, V offsetValue) {
+        LOG.trace("Adding entry to the cache (k/v): {}/{}", key, offsetValue);
+        LOG.trace("Adding entry to the cache (k/v) with types: {}/{}", key.getClass(), offsetValue.getClass());
+        List<V> entries = cache.get(key, k -> new ArrayList<>());
+
+        entries.add(offsetValue);
+    }
+
+    @Override
+    public synchronized boolean contains(K key, V entry) {
+        final List<V> entries = cache.getIfPresent(key);
+
+        if (entries == null) {
+            return false;
+        }
+
+        boolean ret = entries.contains(entry);
+        LOG.trace("Checking if cache contains key {} with value {} ({})", key, entry, ret);
+
+        return ret;
+    }
+
+    @Override
+    public boolean isFull() {
+        if (cache.estimatedSize() >= cacheSize) {
+            return true;
+        }
+
+        return false;
+    }
+
+}
diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java
new file mode 100644
index 00000000000..d551418cf38
--- /dev/null
+++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.caffeine.resume.single;
+
+import java.util.Optional;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.camel.resume.cache.SingleEntryCache;
+
+/**
+ * This is a simple cache implementation that uses Caffeine to store the resume offsets
+ *
+ * @param <K> The type of the key to cache
+ * @param <V> The type of the value/entry to cache
+ */
+public class CaffeineCache<K, V> implements SingleEntryCache<K, V> {
+    private final Cache<K, V> cache;
+    private final long cacheSize;
+
+    /**
+     * Builds a new cache with the given cache size
+     * 
+     * @param cacheSize the cache size
+     */
+    public CaffeineCache(long cacheSize) {
+        this(Caffeine.newBuilder().maximumSize(cacheSize).build(), cacheSize);
+    }
+
+    /**
+     * Builds a new instance of this object
+     * 
+     * @param cache     an instance of a pre-constructed cache object
+     * @param cacheSize the size of the pre-constructed cache object
+     */
+    public CaffeineCache(Cache<K, V> cache, long cacheSize) {
+        this.cache = cache;
+        this.cacheSize = cacheSize;
+    }
+
+    @Override
+    public boolean contains(K key, V entry) {
+        assert key != null;
+        V cachedEntry = cache.getIfPresent(key);
+
+        return entry.equals(cachedEntry);
+    }
+
+    @Override
+    public void add(K key, V offsetValue) {
+        cache.put(key, offsetValue);
+    }
+
+    @Override
+    public Optional<V> get(K key) {
+        V entry = cache.getIfPresent(key);
+
+        if (entry == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(entry);
+    }
+
+    @Override
+    public boolean isFull() {
+        if (cache.estimatedSize() >= cacheSize) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public long capacity() {
+        return cacheSize;
+    }
+}
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
index 659bc4cec9c..f3bb3bf971e 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
@@ -22,21 +22,22 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.component.cassandra.consumer.support.CassandraResumeStrategy;
+import org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.ScheduledPollConsumer;
 
 /**
  * Cassandra 2 CQL3 consumer.
  */
-public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAware<CassandraResumeStrategy> {
+public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAware<ResumeStrategy> {
 
     /**
      * Prepared statement used for polling
      */
     private PreparedStatement preparedStatement;
 
-    private CassandraResumeStrategy resumeStrategy;
+    private ResumeStrategy resumeStrategy;
 
     public CassandraConsumer(CassandraEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -86,8 +87,11 @@ public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAw
         if (resumeStrategy != null) {
             CqlSession session = getEndpoint().getSessionHolder().getSession();
 
-            resumeStrategy.setSession(session);
-            resumeStrategy.resume();
+            CassandraResumeAdapter resumeAdapter = resumeStrategy.getAdapter(CassandraResumeAdapter.class);
+            if (resumeAdapter != null) {
+                resumeAdapter.setSession(session);
+                resumeAdapter.resume();
+            }
         }
     }
 
@@ -102,12 +106,12 @@ public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAw
     }
 
     @Override
-    public CassandraResumeStrategy getResumeStrategy() {
+    public ResumeStrategy getResumeStrategy() {
         return resumeStrategy;
     }
 
     @Override
-    public void setResumeStrategy(CassandraResumeStrategy resumeStrategy) {
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
 }
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeStrategy.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
similarity index 81%
rename from components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeStrategy.java
rename to components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
index 220242c7b2c..e8ccbed8efe 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeStrategy.java
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
@@ -18,12 +18,12 @@
 package org.apache.camel.component.cassandra.consumer.support;
 
 import com.datastax.oss.driver.api.core.CqlSession;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 
 /**
- * Provides a resume strategy for Cassandra consumers
+ * Provides a resume adapter for Cassandra consumers
  */
-public interface CassandraResumeStrategy extends ResumeStrategy {
+public interface CassandraResumeAdapter extends ResumeAdapter {
 
     /**
      * Sets the session that allow implementations to run a one-time query on the DB
@@ -31,14 +31,4 @@ public interface CassandraResumeStrategy extends ResumeStrategy {
      * @param session
      */
     void setSession(CqlSession session);
-
-    @Override
-    default void start() {
-
-    }
-
-    @Override
-    default void stop() {
-
-    }
 }
diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
index dcff5c23842..b41a2a2ef33 100644
--- a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
+++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
@@ -24,15 +24,16 @@ import com.datastax.oss.driver.api.core.CqlSession;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.cassandra.consumer.support.CassandraResumeStrategy;
+import org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CassandraComponentResumeStrategyIT extends BaseCassandra {
 
-    private static class TestCassandraResumeStrategy implements CassandraResumeStrategy {
+    private static class TestCassandraResumeAdapter implements CassandraResumeAdapter {
         private boolean sessionCalled;
         private boolean sessionNotNull;
         private boolean resumeCalled;
@@ -62,7 +63,7 @@ public class CassandraComponentResumeStrategyIT extends BaseCassandra {
     }
 
     private static final String CQL = "select login, first_name, last_name from camel_user";
-    private final TestCassandraResumeStrategy resumeStrategy = new TestCassandraResumeStrategy();
+    private final TestCassandraResumeAdapter resumeStrategy = new TestCassandraResumeAdapter();
 
     @Test
     public void testConsumeAll() throws Exception {
@@ -88,7 +89,7 @@ public class CassandraComponentResumeStrategyIT extends BaseCassandra {
         return new RouteBuilder() {
             public void configure() {
                 fromF("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL)
-                        .resumable(resumeStrategy)
+                        .resumable(new TransientResumeStrategy(resumeStrategy))
                         .to("mock:resultAll");
             }
         };
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
index 1ac3bbefb05..a1dc66deefd 100644
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
+++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
@@ -26,7 +26,8 @@ import com.couchbase.client.java.view.ViewResult;
 import com.couchbase.client.java.view.ViewRow;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.DefaultScheduledPollConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +37,7 @@ import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_ID;
 import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_KEY;
 import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_VIEWNAME;
 
-public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements ResumeAware<CouchbaseResumeStrategy> {
+public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements ResumeAware<ResumeStrategy> {
 
     private static final Logger LOG = LoggerFactory.getLogger(CouchbaseConsumer.class);
 
@@ -45,7 +46,7 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R
     private final Collection collection;
     private ViewOptions viewOptions;
 
-    private CouchbaseResumeStrategy resumeStrategy;
+    private ResumeStrategy resumeStrategy;
 
     public CouchbaseConsumer(CouchbaseEndpoint endpoint, Bucket client, Processor processor) {
         super(endpoint, processor);
@@ -96,9 +97,12 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R
 
         if (resumeStrategy != null) {
             LOG.info("Couchbase consumer running with resume strategy enabled");
-            resumeStrategy.setBucket(bucket);
 
-            resumeStrategy.resume();
+            CouchbaseResumeAdapter resumeAdapter = resumeStrategy.getAdapter(CouchbaseResumeAdapter.class);
+            if (resumeAdapter != null) {
+                resumeAdapter.setBucket(bucket);
+                resumeAdapter.resume();
+            }
         }
     }
 
@@ -182,12 +186,12 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R
     }
 
     @Override
-    public CouchbaseResumeStrategy getResumeStrategy() {
+    public ResumeStrategy getResumeStrategy() {
         return resumeStrategy;
     }
 
     @Override
-    public void setResumeStrategy(CouchbaseResumeStrategy resumeStrategy) {
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
 }
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeAdapter.java
similarity index 79%
copy from components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
copy to components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeAdapter.java
index 6e90080b8b8..b7e87ec6a47 100644
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
+++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeAdapter.java
@@ -18,22 +18,12 @@
 package org.apache.camel.component.couchbase;
 
 import com.couchbase.client.java.Bucket;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 
 /**
- * Allow implementing resume strategies for couchbase consumers
+ * Allow implementing resume adapters for couchbase consumers
  */
-public interface CouchbaseResumeStrategy extends ResumeStrategy {
-
-    @Override
-    default void start() {
-
-    }
-
-    @Override
-    default void stop() {
-
-    }
+public interface CouchbaseResumeAdapter extends ResumeAdapter {
 
     /**
      * Sets the bucket in use
diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
index ac7616a879e..20b03444564 100644
--- a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
+++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
@@ -22,8 +22,9 @@ import java.util.concurrent.TimeUnit;
 import com.couchbase.client.java.Bucket;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.couchbase.CouchbaseResumeStrategy;
+import org.apache.camel.component.couchbase.CouchbaseResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.resume.Resumables;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -32,7 +33,7 @@ import org.junit.jupiter.api.Test;
 import static org.awaitility.Awaitility.await;
 
 public class ConsumeResumeStrategyIT extends CouchbaseIntegrationTestBase {
-    static class TestResumeStrategy implements CouchbaseResumeStrategy {
+    static class TestCouchbaseResumeAdapter implements CouchbaseResumeAdapter {
         volatile boolean setBucketCalled;
         volatile boolean bucketNotNull;
         volatile boolean resumeCalled;
@@ -49,7 +50,7 @@ public class ConsumeResumeStrategyIT extends CouchbaseIntegrationTestBase {
         }
     }
 
-    TestResumeStrategy resumeStrategy = new TestResumeStrategy();
+    TransientResumeStrategy resumeStrategy = new TransientResumeStrategy(new TestCouchbaseResumeAdapter());
 
     @Test
     public void testQueryForBeers() throws Exception {
@@ -61,15 +62,18 @@ public class ConsumeResumeStrategyIT extends CouchbaseIntegrationTestBase {
 
         assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
 
+        TestCouchbaseResumeAdapter adapter = resumeStrategy.getAdapter(TestCouchbaseResumeAdapter.class);
+        await().atMost(30, TimeUnit.SECONDS).until(() -> adapter != null);
+
         await().atMost(30, TimeUnit.SECONDS)
-                .untilAsserted(() -> Assertions.assertTrue(resumeStrategy.setBucketCalled,
+                .untilAsserted(() -> Assertions.assertTrue(adapter.setBucketCalled,
                         "The setBucket method should have been called"));
         await().atMost(3, TimeUnit.SECONDS)
-                .untilAsserted(() -> Assertions.assertTrue(resumeStrategy.bucketNotNull,
+                .untilAsserted(() -> Assertions.assertTrue(adapter.bucketNotNull,
                         "The input bucket should not have been null"));
         await().atMost(3, TimeUnit.SECONDS)
                 .untilAsserted(
-                        () -> Assertions.assertTrue(resumeStrategy.resumeCalled, "The resume method should have been called"));
+                        () -> Assertions.assertTrue(adapter.resumeCalled, "The resume method should have been called"));
     }
 
     @AfterEach
diff --git a/components/camel-couchdb/pom.xml b/components/camel-couchdb/pom.xml
index b83914157d7..0372ec29c29 100644
--- a/components/camel-couchdb/pom.xml
+++ b/components/camel-couchdb/pom.xml
@@ -39,6 +39,10 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-support</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core-processor</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.lightcouch</groupId>
             <artifactId>lightcouch</artifactId>
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
index 58a9a70d6ee..3b239cea610 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
@@ -21,8 +21,9 @@ import java.time.Duration;
 import com.google.gson.JsonObject;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.couchdb.consumer.CouchDbResumable;
-import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy;
+import org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter;
 import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategyFactory;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.task.BlockingTask;
 import org.apache.camel.support.task.Tasks;
 import org.apache.camel.support.task.budget.Budgets;
@@ -53,10 +54,16 @@ public class CouchDbChangesetTracker implements Runnable {
         CouchDbResumable resumable = new CouchDbResumable(couchClient, sequence);
 
         if (sequence == null) {
-            CouchDbResumeStrategy resumeStrategy = CouchDbResumeStrategyFactory.newResumeStrategy(this.consumer);
+            ResumeStrategy resumeStrategy = CouchDbResumeStrategyFactory.newResumeStrategy(this.consumer);
 
-            resumeStrategy.setResumable(resumable);
-            resumeStrategy.resume();
+            assert resumeStrategy != null;
+
+            CouchDbResumeAdapter adapter = resumeStrategy.getAdapter(CouchDbResumeAdapter.class);
+
+            if (adapter != null) {
+                adapter.setResumable(resumable);
+                adapter.resume();
+            }
         }
 
         LOG.debug("Last sequence [{}]", resumable.getLastOffset());
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
index 615823c23ad..4512004fff1 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
@@ -21,17 +21,17 @@ import java.util.concurrent.ExecutorService;
 import com.google.gson.JsonObject;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.DefaultConsumer;
 
-public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<CouchDbResumeStrategy> {
+public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<ResumeStrategy> {
 
     private final CouchDbClientWrapper couchClient;
     private final CouchDbEndpoint endpoint;
     private ExecutorService executor;
     private CouchDbChangesetTracker task;
-    private CouchDbResumeStrategy resumeStrategy;
+    private ResumeStrategy resumeStrategy;
 
     public CouchDbConsumer(CouchDbEndpoint endpoint, CouchDbClientWrapper couchClient, Processor processor) {
         super(endpoint, processor);
@@ -40,12 +40,12 @@ public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<Couc
     }
 
     @Override
-    public void setResumeStrategy(CouchDbResumeStrategy resumeStrategy) {
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
 
     @Override
-    public CouchDbResumeStrategy getResumeStrategy() {
+    public ResumeStrategy getResumeStrategy() {
         return resumeStrategy;
     }
 
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
index e6ebd10c7c8..d7a7e91311e 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
@@ -17,10 +17,10 @@
 
 package org.apache.camel.component.couchdb.consumer;
 
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
 import org.apache.camel.component.couchdb.CouchDbClientWrapper;
+import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.Offsets;
+import org.apache.camel.resume.Resumable;
 
 /**
  * Wraps the resume data for CouchDb
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
similarity index 76%
rename from components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java
rename to components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
index 3a92a042e6d..f66a2ccd5a8 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
@@ -17,21 +17,16 @@
 
 package org.apache.camel.component.couchdb.consumer;
 
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 
 /**
- * Defines a resumable strategy usable by the CouchDB component
+ * Defines a resumable adapter usable by the CouchDB component
  */
-public interface CouchDbResumeStrategy extends ResumeStrategy {
+public interface CouchDbResumeAdapter extends ResumeAdapter {
+    /**
+     * Sets the resumable for the adapter
+     * 
+     * @param resumable the resumable instance
+     */
     void setResumable(CouchDbResumable resumable);
-
-    @Override
-    default void start() {
-
-    }
-
-    @Override
-    default void stop() {
-
-    }
 }
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java
index ce0196f0b15..b8515b1ad6a 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java
@@ -18,16 +18,18 @@
 package org.apache.camel.component.couchdb.consumer;
 
 import org.apache.camel.component.couchdb.CouchDbConsumer;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.resume.ResumeStrategy;
 
 public final class CouchDbResumeStrategyFactory {
     private CouchDbResumeStrategyFactory() {
     }
 
-    public static CouchDbResumeStrategy newResumeStrategy(CouchDbConsumer consumer) {
-        CouchDbResumeStrategy resumeStrategy = consumer.getResumeStrategy();
+    public static ResumeStrategy newResumeStrategy(CouchDbConsumer consumer) {
+        ResumeStrategy resumeStrategy = consumer.getResumeStrategy();
 
         if (resumeStrategy == null) {
-            resumeStrategy = new LatestUpdateSequenceResumeStrategy();
+            resumeStrategy = new TransientResumeStrategy(new LatestUpdateSequenceResumeAdapter());
         }
 
         return resumeStrategy;
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java
similarity index 88%
rename from components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java
rename to components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java
index 277d84243e7..e9815b9171a 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java
@@ -20,9 +20,9 @@ package org.apache.camel.component.couchdb.consumer;
 import org.apache.camel.component.couchdb.CouchDbClientWrapper;
 
 /**
- * A resume strategy that resumes from the last update sequence
+ * A resume adapter for couchdb that resumes from the last update sequence
  */
-public final class LatestUpdateSequenceResumeStrategy implements CouchDbResumeStrategy {
+public final class LatestUpdateSequenceResumeAdapter implements CouchDbResumeAdapter {
     private CouchDbResumable resumable;
 
     @Override
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 6dedb43cd18..9d730a528a4 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -31,11 +31,13 @@ import java.util.Set;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.component.file.consumer.FileConsumerResumeStrategy;
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
 import org.apache.camel.component.file.consumer.FileResumeSet;
-import org.apache.camel.component.file.consumer.FileSetResumeStrategy;
-import org.apache.camel.component.file.consumer.GenericFileResumeStrategy;
+import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
+import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -44,10 +46,10 @@ import org.slf4j.LoggerFactory;
 /**
  * File consumer.
  */
-public class FileConsumer extends GenericFileConsumer<File> implements ResumeAware<FileConsumerResumeStrategy> {
+public class FileConsumer extends GenericFileConsumer<File> implements ResumeAware<ResumeStrategy> {
 
     private static final Logger LOG = LoggerFactory.getLogger(FileConsumer.class);
-    private FileConsumerResumeStrategy resumeStrategy;
+    private ResumeStrategy resumeStrategy;
     private String endpointPath;
     private Set<String> extendedAttributes;
 
@@ -103,8 +105,11 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
             GenericFile<File> gf
                     = asGenericFile(endpointPath, file, getEndpoint().getCharset(), getEndpoint().isProbeContentType());
 
-            if (resumeStrategy instanceof GenericFileResumeStrategy) {
-                ((GenericFileResumeStrategy<File>) resumeStrategy).resume(gf);
+            if (resumeStrategy != null) {
+                ResumeAdapter adapter = resumeStrategy.getAdapter();
+                if (adapter instanceof GenericFileResumeAdapter) {
+                    ((FileResumeAdapter) adapter).resume(gf);
+                }
             }
 
             if (file.isDirectory()) {
@@ -171,11 +176,15 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
             }
         }
 
-        if (resumeStrategy instanceof FileSetResumeStrategy) {
-            FileResumeSet resumeSet = new FileResumeSet(dirFiles);
-            resumeStrategy.resume(resumeSet);
+        if (resumeStrategy != null) {
+            ResumeAdapter adapter = resumeStrategy.getAdapter();
+            if (adapter instanceof FileSetResumeAdapter) {
+                FileResumeSet resumeSet = new FileResumeSet(dirFiles);
+
+                ((FileResumeAdapter) adapter).resume(resumeSet);
 
-            return resumeSet.resumed();
+                return resumeSet.resumed();
+            }
         }
 
         return dirFiles;
@@ -307,12 +316,12 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
     }
 
     @Override
-    public FileConsumerResumeStrategy getResumeStrategy() {
+    public ResumeStrategy getResumeStrategy() {
         return resumeStrategy;
     }
 
     @Override
-    public void setResumeStrategy(FileConsumerResumeStrategy resumeStrategy) {
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
 
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
index c04a0e9ecc3..c1fcc9de5f9 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
@@ -22,9 +22,9 @@ import java.nio.file.Path;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.Offset;
 import org.apache.camel.WrappedFile;
 import org.apache.camel.component.file.consumer.GenericFileResumable;
+import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.Offsets;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
similarity index 85%
copy from components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
copy to components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
index fed8000c512..25b807cbf40 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
@@ -17,12 +17,12 @@
 
 package org.apache.camel.component.file.consumer;
 
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 
 /**
- * Defines resume strategy for consumers of the file component.
+ * Defines resume adapter for consumers of the file component.
  */
-public interface FileConsumerResumeStrategy<T> extends ResumeStrategy {
+public interface FileResumeAdapter<T> extends ResumeAdapter {
 
     /**
      * Returns the last offset read for the given file.
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
index f746c0f9fa3..41b1751223c 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
@@ -21,7 +21,7 @@ import java.io.File;
 import java.util.Objects;
 import java.util.function.Predicate;
 
-import org.apache.camel.ResumableSet;
+import org.apache.camel.resume.ResumableSet;
 
 /**
  * This contains the input/output file set for resume operations.
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
similarity index 79%
rename from components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeStrategy.java
rename to components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
index dc75e80bd89..2e1f5e56477 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeStrategy.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
@@ -17,6 +17,9 @@
 
 package org.apache.camel.component.file.consumer;
 
-public interface FileSetResumeStrategy extends FileConsumerResumeStrategy<FileResumeSet> {
-
+/**
+ * Allows the implementation of file adapters for handling resume operations for file sets (i.e.: file entries in a
+ * directory)
+ */
+public interface FileSetResumeAdapter extends FileResumeAdapter<FileResumeSet> {
 }
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
index 037621d751f..79899514a54 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
@@ -17,7 +17,7 @@
 
 package org.apache.camel.component.file.consumer;
 
-import org.apache.camel.Resumable;
+import org.apache.camel.resume.Resumable;
 
 public interface GenericFileResumable<T> extends Resumable<T, Long> {
 
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
similarity index 66%
rename from components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
rename to components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
index fed8000c512..8f7c1f2900c 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
@@ -17,17 +17,18 @@
 
 package org.apache.camel.component.file.consumer;
 
-import org.apache.camel.ResumeStrategy;
+import java.io.File;
+import java.util.Optional;
 
 /**
- * Defines resume strategy for consumers of the file component.
+ * Allows the implementation of file adapters for handling resume operations for generic files
  */
-public interface FileConsumerResumeStrategy<T> extends ResumeStrategy {
-
+public interface GenericFileResumeAdapter extends FileResumeAdapter<GenericFileResumable<File>> {
     /**
-     * Returns the last offset read for the given file.
+     * Gets the last offset for the given file
      *
-     * @param resumable the resumable file or resumable set to run the resume
+     * @param  addressable the file instance
+     * @return             An Optional with the offset value
      */
-    void resume(T resumable);
+    Optional<Long> getLastOffset(File addressable);
 }
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java
new file mode 100644
index 00000000000..9a1f7f49f76
--- /dev/null
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.file.consumer.adapters;
+
+import java.io.File;
+
+import org.apache.camel.component.file.consumer.FileResumeSet;
+import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
+import org.apache.camel.resume.cache.MultiEntryCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link FileSetResumeAdapter} that can be used for resume operations for multiple files. For
+ * instance, this can be used to manage the resume operations for files within a directory.
+ */
+public class DefaultFileSetResumeAdapter implements FileSetResumeAdapter {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultFileSetResumeAdapter.class);
+
+    private final MultiEntryCache<File, File> cache;
+
+    public DefaultFileSetResumeAdapter(MultiEntryCache<File, File> cache) {
+        this.cache = cache;
+    }
+
+    private boolean notProcessed(File file) {
+        File key = file.getParentFile();
+
+        // if the file is in the cache, then it's already processed
+        boolean ret = !cache.contains(key, file);
+        return ret;
+    }
+
+    @Override
+    public void resume(FileResumeSet resumable) {
+        if (resumable != null) {
+            resumable.resumeEach(this::notProcessed);
+            if (resumable.hasResumables()) {
+                LOG.debug("There's {} files to still to be processed", resumable.resumed().length);
+            }
+        } else {
+            LOG.trace("Nothing to resume");
+        }
+    }
+
+    @Override
+    public void resume() {
+        // NO-OP
+    }
+}
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java
new file mode 100644
index 00000000000..54e339fe73e
--- /dev/null
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.file.consumer.adapters;
+
+import java.io.File;
+import java.util.Optional;
+
+import org.apache.camel.component.file.consumer.GenericFileResumable;
+import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
+import org.apache.camel.resume.cache.SingleEntryCache;
+
+/**
+ * An implementation of the {@link GenericFileResumeAdapter} that can be used to handle resume operations for file
+ * offsets (where the offsets are of Long format).
+ */
+public class DefaultGenericFileResumeAdapter implements GenericFileResumeAdapter {
+    private final SingleEntryCache<File, Long> cache;
+
+    public DefaultGenericFileResumeAdapter(SingleEntryCache<File, Long> cache) {
+        this.cache = cache;
+    }
+
+    private Optional<Long> getLastOffset(GenericFileResumable<File> resumable) {
+        final File addressable = resumable.getAddressable();
+        return cache.get(addressable);
+    }
+
+    @Override
+    public Optional<Long> getLastOffset(File addressable) {
+        return cache.get(addressable);
+    }
+
+    @Override
+    public void resume(GenericFileResumable<File> resumable) {
+        final Optional<Long> lastOffsetOpt = getLastOffset(resumable);
+
+        if (!lastOffsetOpt.isPresent()) {
+            return;
+        }
+
+        final long lastOffset = lastOffsetOpt.get();
+        resumable.updateLastOffset(lastOffset);
+    }
+
+    @Override
+    public void resume() {
+
+    }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index f864cf272b6..3a6d2860329 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -25,14 +25,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import org.apache.camel.ConsumerListenerAware;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
 import org.apache.camel.Suspendable;
 import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
 import org.apache.camel.health.HealthCheckAware;
 import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.resume.ConsumerListenerAware;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.StateRepository;
 import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.support.DefaultConsumer;
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaConsumer extends DefaultConsumer
-        implements ResumeAware<KafkaConsumerResumeStrategy>, HealthCheckAware, ConsumerListenerAware<KafkaConsumerListener>,
+        implements ResumeAware<ResumeStrategy>, HealthCheckAware, ConsumerListenerAware<KafkaConsumerListener>,
         Suspendable {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
@@ -56,7 +56,7 @@ public class KafkaConsumer extends DefaultConsumer
     // This list helps to work around the infinite loop of KAFKA-1894
     private final List<KafkaFetchRecords> tasks = new ArrayList<>();
     private volatile boolean stopOffsetRepo;
-    private KafkaConsumerResumeStrategy resumeStrategy;
+    private ResumeStrategy resumeStrategy;
     private KafkaConsumerListener consumerListener;
 
     public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
@@ -65,12 +65,12 @@ public class KafkaConsumer extends DefaultConsumer
     }
 
     @Override
-    public void setResumeStrategy(KafkaConsumerResumeStrategy resumeStrategy) {
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
 
     @Override
-    public KafkaConsumerResumeStrategy getResumeStrategy() {
+    public ResumeStrategy getResumeStrategy() {
         return resumeStrategy;
     }
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index b1befdfa5c3..f6f7392a6ac 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -27,7 +27,7 @@ import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.CommitManagers;
 import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
 import org.apache.camel.component.kafka.consumer.errorhandler.KafkaErrorStrategies;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
+import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter;
 import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
 import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
 import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
@@ -273,7 +273,7 @@ public class KafkaFetchRecords implements Runnable {
     }
 
     private void subscribe() {
-        KafkaConsumerResumeStrategy resumeStrategy = ResumeStrategyFactory.newResumeStrategy(kafkaConsumer);
+        KafkaConsumerResumeAdapter resumeStrategy = ResumeStrategyFactory.resolveResumeAdapter(kafkaConsumer);
         resumeStrategy.setConsumer(consumer);
 
         PartitionAssignmentListener listener = new PartitionAssignmentListener(
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
index 44f9f6337ed..58211ba86a0 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
@@ -19,9 +19,9 @@ package org.apache.camel.component.kafka.consumer.errorhandler;
 
 import java.util.function.Predicate;
 
-import org.apache.camel.ConsumerListener;
 import org.apache.camel.component.kafka.SeekPolicy;
 import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import org.apache.camel.resume.ConsumerListener;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java
similarity index 65%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java
index 97f25e0745d..d25c05662cb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java
@@ -16,31 +16,32 @@
  */
 package org.apache.camel.component.kafka.consumer.support;
 
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 import org.apache.kafka.clients.consumer.Consumer;
 
 /**
- * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
+ * Defines a adapters for handling resume operations. Implementations can define different ways to handle how to resume
  * processing records.
  *
  * The resume runs in the scope of the Kafka Consumer thread and may run concurrently with other consumer instances when
  * the component is set up to use more than one of them. As such, implementations are responsible for ensuring the
  * thread-safety of the operations within the resume method.
  */
-public interface KafkaConsumerResumeStrategy extends ResumeStrategy {
+public interface KafkaConsumerResumeAdapter extends ResumeAdapter {
+
+    /**
+     * Sets the Kafka consumer instance for the adapter. Please note that the Kafka consumer is not safe for concurrent
+     * access
+     * 
+     * @param consumer the consumer instance
+     */
     void setConsumer(Consumer<?, ?> consumer);
 
-    default void resume(KafkaResumable resumable) {
-
-    }
-
-    @Override
-    default void start() {
-
-    }
-
-    @Override
-    default void stop() {
-
-    }
+    /**
+     * Sets an optional resumable instance for the adapter. This is usually set during partition assignment. Garanteed
+     * not to be null and safe to ignore if partition and topic information are not used.
+     * 
+     * @param kafkaResumable the resumable instance
+     */
+    void setKafkaResumable(KafkaResumable kafkaResumable);
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
index 8a86b658b4d..11a0879a59c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
@@ -17,7 +17,7 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
-import org.apache.camel.Offset;
+import org.apache.camel.resume.Offset;
 import org.apache.camel.util.KeyValueHolder;
 
 /**
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
index 388f47fa3e5..03bc3a977b1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
@@ -17,9 +17,9 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
+import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.Offsets;
+import org.apache.camel.resume.Resumable;
 
 public class KafkaResumable implements Resumable<String, String> {
     private final String partition;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java
similarity index 82%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java
index 4502831f8fb..3ef4f209c43 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java
@@ -30,14 +30,14 @@ import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProce
 /**
  * A resume strategy that uses Kafka's offset for resuming
  */
-public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+public class OffsetKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
 
-    private static final Logger LOG = LoggerFactory.getLogger(OffsetKafkaConsumerResumeStrategy.class);
+    private static final Logger LOG = LoggerFactory.getLogger(OffsetKafkaConsumerResumeAdapter.class);
 
     private final StateRepository<String, String> offsetRepository;
     private Consumer<?, ?> consumer;
 
-    public OffsetKafkaConsumerResumeStrategy(StateRepository<String, String> offsetRepository) {
+    public OffsetKafkaConsumerResumeAdapter(StateRepository<String, String> offsetRepository) {
         this.offsetRepository = offsetRepository;
     }
 
@@ -46,6 +46,11 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr
         this.consumer = consumer;
     }
 
+    @Override
+    public void setKafkaResumable(KafkaResumable kafkaResumable) {
+        // NO-OP
+    }
+
     private void resumeFromOffset(final Consumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) {
         // The state contains the last read offset, so you need to seek from the next one
         long offset = deserializeOffsetValue(offsetState) + 1;
@@ -63,14 +68,4 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr
             }
         }
     }
-
-    /*
-     Note: when self-managing the offsets, we don't need to use the information on the resumable
-     instance. We can collect the assignments directly from the consumer instance as we always did.
-     */
-    @SuppressWarnings("unused")
-    @Override
-    public void resume(KafkaResumable resumable) {
-        resume();
-    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index e471d1e2ea6..03046f7a10b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -33,12 +33,12 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
 
     private final String threadId;
     private final KafkaConfiguration configuration;
-    private final KafkaConsumerResumeStrategy resumeStrategy;
+    private final KafkaConsumerResumeAdapter resumeStrategy;
     private final CommitManager commitManager;
 
     public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration,
                                        CommitManager commitManager,
-                                       KafkaConsumerResumeStrategy resumeStrategy) {
+                                       KafkaConsumerResumeAdapter resumeStrategy) {
         this.threadId = threadId;
         this.configuration = configuration;
         this.commitManager = commitManager;
@@ -67,6 +67,11 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
         List<KafkaResumable> resumables = partitions.stream()
                 .map(p -> new KafkaResumable(String.valueOf(p.partition()), p.topic())).collect(Collectors.toList());
 
-        resumables.forEach(r -> resumeStrategy.resume(r));
+        resumables.forEach(this::doResume);
+    }
+
+    private void doResume(KafkaResumable r) {
+        resumeStrategy.setKafkaResumable(r);
+        resumeStrategy.resume();
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 3784b4c5c15..77315f2e85b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.consumer.support;
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaConsumer;
 import org.apache.camel.component.kafka.SeekPolicy;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.slf4j.Logger;
@@ -29,12 +30,17 @@ public final class ResumeStrategyFactory {
     /**
      * A NO-OP resume strategy that does nothing (i.e.: no resume)
      */
-    private static class NoOpKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+    private static class NoOpKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
 
         @SuppressWarnings("unused")
         @Override
         public void setConsumer(Consumer<?, ?> consumer) {
+            // NO-OP
+        }
 
+        @Override
+        public void setKafkaResumable(KafkaResumable kafkaResumable) {
+            // NO-OP
         }
 
         @SuppressWarnings("unused")
@@ -44,36 +50,40 @@ public final class ResumeStrategyFactory {
         }
     }
 
-    private static final NoOpKafkaConsumerResumeStrategy NO_OP_RESUME_STRATEGY = new NoOpKafkaConsumerResumeStrategy();
+    private static final NoOpKafkaConsumerResumeAdapter NO_OP_RESUME_STRATEGY = new NoOpKafkaConsumerResumeAdapter();
     private static final Logger LOG = LoggerFactory.getLogger(ResumeStrategyFactory.class);
 
     private ResumeStrategyFactory() {
     }
 
-    public static KafkaConsumerResumeStrategy newResumeStrategy(KafkaConsumer kafkaConsumer) {
+    public static KafkaConsumerResumeAdapter resolveResumeAdapter(KafkaConsumer kafkaConsumer) {
         // When using resumable routes, which register the strategy via service, it takes priority over everything else
-        KafkaConsumerResumeStrategy resumableRouteStrategy = kafkaConsumer.getResumeStrategy();
+        ResumeStrategy resumeStrategy = kafkaConsumer.getResumeStrategy();
+        if (resumeStrategy != null) {
+            KafkaConsumerResumeAdapter adapter = resumeStrategy.getAdapter(KafkaConsumerResumeAdapter.class);
+
+            // The strategy should not be able to be created without an adapter, but let's be safe
+            assert adapter != null;
 
-        if (resumableRouteStrategy != null) {
-            return resumableRouteStrategy;
+            return adapter;
         }
 
         KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration();
 
-        return builtinResumeStrategies(configuration);
+        return resolveBuiltinResumeAdapters(configuration);
     }
 
-    private static KafkaConsumerResumeStrategy builtinResumeStrategies(KafkaConfiguration configuration) {
+    private static KafkaConsumerResumeAdapter resolveBuiltinResumeAdapters(KafkaConfiguration configuration) {
         LOG.debug("No resume strategy was provided ... checking for built-ins ...");
         StateRepository<String, String> offsetRepository = configuration.getOffsetRepository();
         SeekPolicy seekTo = configuration.getSeekTo();
 
         if (offsetRepository != null) {
             LOG.info("Using resume from offset strategy");
-            return new OffsetKafkaConsumerResumeStrategy(offsetRepository);
+            return new OffsetKafkaConsumerResumeAdapter(offsetRepository);
         } else if (seekTo != null) {
             LOG.info("Using resume from seek policy strategy with seeking from {}", seekTo);
-            return new SeekPolicyKafkaConsumerResumeStrategy(seekTo);
+            return new SeekPolicyKafkaConsumerResumeAdapter(seekTo);
         }
 
         LOG.info("Using NO-OP resume strategy");
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java
similarity index 80%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java
index b877763854e..0dee4e1b849 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java
@@ -24,14 +24,14 @@ import org.slf4j.LoggerFactory;
 /**
  * A resume strategy that uses Camel's seekTo configuration for resuming
  */
-public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+public class SeekPolicyKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
 
-    private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeStrategy.class);
+    private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeAdapter.class);
 
     private final SeekPolicy seekPolicy;
     private Consumer<?, ?> consumer;
 
-    public SeekPolicyKafkaConsumerResumeStrategy(SeekPolicy seekPolicy) {
+    public SeekPolicyKafkaConsumerResumeAdapter(SeekPolicy seekPolicy) {
         this.seekPolicy = seekPolicy;
     }
 
@@ -40,6 +40,11 @@ public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResum
         this.consumer = consumer;
     }
 
+    @Override
+    public void setKafkaResumable(KafkaResumable kafkaResumable) {
+        // NO-OP
+    }
+
     @Override
     public void resume() {
         if (seekPolicy == SeekPolicy.BEGINNING) {
@@ -50,13 +55,4 @@ public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResum
             consumer.seekToEnd(consumer.assignment());
         }
     }
-
-    /*
-     * Note: when using the seek policy strategy, we don't use the resumable information
-     * because we use the consumer to set the policy.
-     */
-    @Override
-    public void resume(KafkaResumable resumable) {
-        resume();
-    }
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/Offset.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
similarity index 63%
copy from core/camel-api/src/main/java/org/apache/camel/Offset.java
copy to components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
index 1e73d1dcc2a..a1614d4bfc2 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Offset.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
@@ -15,20 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.processor.resume.kafka;
+
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
 
 /**
- * Generic offset without a concrete type
- *
- * @param <T> the type of the offset
+ * Base interface for resume strategies that publish the offsets to a Kafka topic
+ * 
+ * @param <K> the type of key
+ * @param <V> the type of the value
  */
-public interface Offset<T> {
-
-    /**
-     * Gets the offset value
-     *
-     * @return the offset value
-     */
-    T offset();
+public interface KafkaResumeStrategy<K, V> extends UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>>, ResumeStrategy {
 
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
new file mode 100644
index 00000000000..e9974cac4e8
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume.kafka;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for multi node
+ * integrations. This is suitable, for instance, when using clusters with the master component.
+ *
+ * @param <K> the type of key
+ * @param <V> the type of the value
+ */
+public class MultiNodeKafkaResumeStrategy<K, V> extends SingleNodeKafkaResumeStrategy<K, V> {
+    private static final Logger LOG = LoggerFactory.getLogger(MultiNodeKafkaResumeStrategy.class);
+    private final ExecutorService executorService;
+
+    /**
+     * Create a new instance of this class
+     * 
+     * @param bootstrapServers the address of the Kafka broker
+     * @param topic            the topic where to publish the offsets
+     * @param resumeCache      a cache instance where to store the offsets locally for faster access
+     * @param resumeAdapter    the component-specific resume adapter
+     */
+    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache,
+                                        ResumeAdapter resumeAdapter) {
+        // just in case users don't want to provide their own worker thread pool
+        this(bootstrapServers, topic, resumeCache, resumeAdapter, Executors.newSingleThreadExecutor());
+    }
+
+    /**
+     * Builds an instance of this class
+     *
+     * @param bootstrapServers
+     * @param topic            the topic where to publish the offsets
+     * @param resumeCache      a cache instance where to store the offsets locally for faster access
+     * @param resumeAdapter    the component-specific resume adapter
+     * @param executorService  an executor service that will run a separate thread for periodically refreshing the
+     *                         offsets
+     */
+
+    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache,
+                                        ResumeAdapter resumeAdapter, ExecutorService executorService) {
+        super(bootstrapServers, topic, resumeCache, resumeAdapter);
+
+        // We need to keep refreshing the cache
+        this.executorService = executorService;
+        executorService.submit(() -> refresh());
+    }
+
+    /**
+     * Builds an instance of this class
+     *
+     * @param topic          the topic where to publish the offsets
+     * @param resumeCache    a cache instance where to store the offsets locally for faster access
+     * @param resumeAdapter  the component-specific resume adapter
+     * @param producerConfig the set of properties to be used by the Kafka producer within this class
+     * @param consumerConfig the set of properties to be used by the Kafka consumer within this class
+     */
+    public MultiNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, ResumeAdapter resumeAdapter,
+                                        Properties producerConfig, Properties consumerConfig) {
+        this(topic, resumeCache, resumeAdapter, producerConfig, consumerConfig, Executors.newSingleThreadExecutor());
+    }
+
+    /**
+     * Builds an instance of this class
+     *
+     * @param topic           the topic where to publish the offsets
+     * @param resumeCache     a cache instance where to store the offsets locally for faster access
+     * @param resumeAdapter   the component-specific resume adapter
+     * @param producerConfig  the set of properties to be used by the Kafka producer within this class
+     * @param consumerConfig  the set of properties to be used by the Kafka consumer within this class
+     * @param executorService an executor service that will run a separate thread for periodically refreshing the
+     *                        offsets
+     */
+
+    public MultiNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, ResumeAdapter resumeAdapter,
+                                        Properties producerConfig, Properties consumerConfig, ExecutorService executorService) {
+        super(topic, resumeCache, resumeAdapter, producerConfig, consumerConfig);
+
+        this.executorService = executorService;
+        executorService.submit(() -> refresh());
+    }
+
+    /**
+     * Launch a thread to refresh the offsets periodically
+     */
+    protected void refresh() {
+        LOG.trace("Creating a offset cache refresher");
+        try {
+            Properties prop = (Properties) getConsumerConfig().clone();
+            prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+
+            Consumer<K, V> consumer = new KafkaConsumer<>(prop);
+
+            consumer.subscribe(Collections.singletonList(getTopic()));
+
+            while (true) {
+                var records = consumer.poll(getPollDuration());
+                if (records.isEmpty()) {
+                    continue;
+                }
+
+                for (var record : records) {
+                    V value = record.value();
+
+                    LOG.trace("Read from Kafka: {}", value);
+                    getResumeCache().add(record.key(), record.value());
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            executorService.shutdown();
+        } finally {
+            super.stop();
+        }
+    }
+}
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/RecordError.java
similarity index 58%
rename from components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/RecordError.java
index 6e90080b8b8..c0bfbb4f8a1 100644
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/RecordError.java
@@ -15,30 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.couchbase;
+package org.apache.camel.processor.resume.kafka;
 
-import com.couchbase.client.java.Bucket;
-import org.apache.camel.ResumeStrategy;
+import org.apache.kafka.clients.producer.RecordMetadata;
 
 /**
- * Allow implementing resume strategies for couchbase consumers
+ * Contains the error details when failing to produce records
  */
-public interface CouchbaseResumeStrategy extends ResumeStrategy {
-
-    @Override
-    default void start() {
+public class RecordError {
+    private final RecordMetadata recordMetadata;
+    private final Exception exception;
 
+    public RecordError(RecordMetadata recordMetadata, Exception exception) {
+        this.recordMetadata = recordMetadata;
+        this.exception = exception;
     }
 
-    @Override
-    default void stop() {
-
+    public RecordMetadata getRecordMetadata() {
+        return recordMetadata;
     }
 
-    /**
-     * Sets the bucket in use
-     * 
-     * @param bucket the bucket in use
-     */
-    void setBucket(Bucket bucket);
+    public Exception getException() {
+        return exception;
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
similarity index 55%
rename from components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 9960cecccf4..0a14bb481f6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -19,20 +19,20 @@ package org.apache.camel.processor.resume.kafka;
 
 import java.io.IOException;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
+import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
-import org.apache.camel.Resumable;
-import org.apache.camel.ResumeCache;
-import org.apache.camel.Service;
-import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StringHelper;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -44,45 +44,64 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractKafkaResumeStrategy<K, V>
-        implements UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>>, Service {
-    public static final int UNLIMITED = -1;
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaResumeStrategy.class);
+/**
+ * A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for single node
+ * integrations. For multi-node integrations (i.e: using clusters with the master component check
+ * {@link MultiNodeKafkaResumeStrategy}.
+ * 
+ * @param <K> the type of key
+ * @param <V> the type of the value
+ */
+public class SingleNodeKafkaResumeStrategy<K, V> implements KafkaResumeStrategy<K, V> {
+    private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
 
     private final String topic;
 
     private Consumer<K, V> consumer;
     private Producer<K, V> producer;
-    private long errorCount;
     private Duration pollDuration = Duration.ofSeconds(1);
 
-    private final List<Future<RecordMetadata>> sentItems = new ArrayList<>();
+    private final Queue<RecordError> producerErrors = new ConcurrentLinkedQueue<>();
     private final ResumeCache<K, V> resumeCache;
     private boolean subscribed;
     private final Properties producerConfig;
     private final Properties consumerConfig;
+    private ResumeAdapter resumeAdapter;
 
-    public AbstractKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache) {
-        this.topic = topic;
-
-        this.producerConfig = createProducer(bootstrapServers);
-        this.consumerConfig = createConsumer(bootstrapServers);
-        this.resumeCache = resumeCache;
-
-        init();
+    /**
+     * Builds an instance of this class
+     * 
+     * @param bootstrapServers the address of the Kafka broker
+     * @param topic            the topic where to publish the offsets
+     * @param resumeCache      a cache instance where to store the offsets locally for faster access
+     * @param resumeAdapter    the component-specific resume adapter
+     */
+    public SingleNodeKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache,
+                                         ResumeAdapter resumeAdapter) {
+        this(topic, resumeCache, resumeAdapter, createProducer(bootstrapServers), createConsumer(bootstrapServers));
     }
 
-    public AbstractKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, Properties producerConfig,
-                                       Properties consumerConfig) {
-        this.topic = topic;
+    /**
+     * Builds an instance of this class
+     *
+     * @param topic          the topic where to publish the offsets
+     * @param resumeCache    a cache instance where to store the offsets locally for faster access
+     * @param resumeAdapter  the component-specific resume adapter
+     * @param producerConfig the set of properties to be used by the Kafka producer within this class
+     * @param consumerConfig the set of properties to be used by the Kafka consumer within this class
+     */
+    public SingleNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, ResumeAdapter resumeAdapter,
+                                         Properties producerConfig,
+                                         Properties consumerConfig) {
+        this.topic = ObjectHelper.notNull(topic, "The topic must not be null");
         this.resumeCache = resumeCache;
+        this.resumeAdapter = resumeAdapter;
         this.producerConfig = producerConfig;
         this.consumerConfig = consumerConfig;
 
@@ -132,24 +151,24 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
     }
 
     /**
-     * Sends data to a topic
-     * 
+     * Sends data to a topic. The records will always be sent asynchronously. If there's an error, a producer error
+     * counter will be increased.
+     *
+     * @see                         SingleNodeKafkaResumeStrategy#getProducerErrors()
      * @param  message              the message to send
      * @throws ExecutionException
      * @throws InterruptedException
+     *
      */
-    public void produce(K key, V message) throws ExecutionException, InterruptedException {
+    protected void produce(K key, V message) throws ExecutionException, InterruptedException {
         ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, message);
 
-        errorCount = 0;
-        Future<RecordMetadata> future = producer.send(record, (recordMetadata, e) -> {
+        producer.send(record, (recordMetadata, e) -> {
             if (e != null) {
                 LOG.error("Failed to send message {}", e.getMessage(), e);
-                errorCount++;
+                producerErrors.add(new RecordError(recordMetadata, e));
             }
         });
-
-        sentItems.add(future);
     }
 
     @Override
@@ -164,6 +183,11 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
         resumeCache.add(key, offsetValue);
     }
 
+    /**
+     * Loads the existing data into the cache
+     * 
+     * @throws Exception
+     */
     protected void loadCache() throws Exception {
         subscribe();
 
@@ -192,11 +216,12 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
         unsubscribe();
     }
 
-    // TODO: bad method ...
     /**
+     * Subscribe to the topic if not subscribed yet
+     * 
      * @param topic the topic to consume the messages from
      */
-    public void checkAndSubscribe(String topic) {
+    protected void checkAndSubscribe(String topic) {
         if (!subscribed) {
             consumer.subscribe(Collections.singletonList(topic));
 
@@ -205,40 +230,54 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
     }
 
     /**
-     * @param topic the topic to consume the messages from
+     * Subscribe to the topic if not subscribed yet
+     * 
+     * @param topic     the topic to consume the messages from
+     * @param remaining the number of messages to rewind from the last offset position (used to fill the cache)
      */
     public void checkAndSubscribe(String topic, long remaining) {
         if (!subscribed) {
-            consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
-                @Override
-                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-
-                }
-
-                @Override
-                public void onPartitionsAssigned(Collection<TopicPartition> assignments) {
-                    consumer.seekToEnd(assignments);
-                    for (TopicPartition assignment : assignments) {
-                        final long endPosition = consumer.position(assignment);
-                        final long startPosition = endPosition - remaining;
-
-                        if (startPosition >= 0) {
-                            consumer.seek(assignment, startPosition);
-                        } else {
-                            LOG.info(
-                                    "Ignoring the seek command because the initial offset is negative (the topic is likely empty)");
-                        }
-                    }
-                }
-            });
+            consumer.subscribe(Collections.singletonList(topic), getConsumerRebalanceListener(remaining));
 
             subscribed = true;
         }
     }
 
-    public abstract void subscribe() throws Exception;
+    /**
+     * Creates a new consumer rebalance listener. This can be useful for setting the exact Kafka offset when necessary
+     * to read a limited amount of messages or customize the resume strategy behavior when a rebalance occurs.
+     * 
+     * @param  remaining
+     * @return
+     */
+    protected ConsumerRebalanceListener getConsumerRebalanceListener(long remaining) {
+        return new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
+
+            }
 
-    public void unsubscribe() {
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> assignments) {
+                for (TopicPartition assignment : assignments) {
+                    final long endPosition = consumer.position(assignment);
+                    final long startPosition = endPosition - remaining;
+
+                    if (startPosition >= 0) {
+                        consumer.seek(assignment, startPosition);
+                    } else {
+                        LOG.info(
+                                "Ignoring the seek command because the initial offset is negative (the topic is likely empty)");
+                    }
+                }
+            }
+        };
+    }
+
+    /**
+     * Unsubscribe from the topic
+     */
+    protected void unsubscribe() {
         try {
             consumer.unsubscribe();
         } catch (IllegalStateException e) {
@@ -249,17 +288,23 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
     }
 
     /**
-     * Consumes message from the given topic until the predicate returns false
+     * Consumes message from the topic previously setup
      *
-     * @return
+     * @return An instance of the consumer records
      */
-    public ConsumerRecords<K, V> consume() {
+    protected ConsumerRecords<K, V> consume() {
         int retries = 10;
 
         return consume(retries);
     }
 
-    public ConsumerRecords<K, V> consume(int retries) {
+    /**
+     * Consumes message from the topic previously setup
+     * 
+     * @param  retries how many times to retry consuming data from the topic
+     * @return         An instance of the consumer records
+     */
+    protected ConsumerRecords<K, V> consume(int retries) {
         while (retries > 0) {
             ConsumerRecords<K, V> records = consumer.poll(pollDuration);
             if (!records.isEmpty()) {
@@ -271,22 +316,35 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
         return ConsumerRecords.empty();
     }
 
-    public long getErrorCount() {
-        return errorCount;
+    public void subscribe() throws Exception {
+        if (resumeCache.capacity() >= 1) {
+            checkAndSubscribe(topic, resumeCache.capacity());
+        } else {
+            checkAndSubscribe(topic);
+        }
     }
 
-    public List<Future<RecordMetadata>> getSentItems() {
-        return Collections.unmodifiableList(sentItems);
+    @Override
+    public ResumeAdapter getAdapter() {
+        return resumeAdapter;
+    }
+
+    /**
+     * Gets the set record of sent items
+     *
+     * @return
+     */
+    protected Collection<RecordError> getProducerErrors() {
+        return Collections.unmodifiableCollection(producerErrors);
     }
 
     @Override
     public void build() {
-        Service.super.build();
+        // NO-OP
     }
 
     @Override
     public void init() {
-        Service.super.init();
 
         LOG.debug("Initializing the Kafka resume strategy");
         if (consumer == null) {
@@ -300,12 +358,16 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
 
     @Override
     public void stop() {
+        LOG.info("Closing the Kafka producer");
+        IOHelper.close(producer, "Kafka producer", LOG);
 
+        LOG.info("Closing the Kafka consumer");
+        IOHelper.close(producer, "Kafka consumer", LOG);
     }
 
     @Override
     public void close() throws IOException {
-        Service.super.close();
+        stop();
     }
 
     @Override
@@ -334,4 +396,27 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
     protected Producer<K, V> getProducer() {
         return producer;
     }
+
+    protected Properties getProducerConfig() {
+        return producerConfig;
+    }
+
+    protected Properties getConsumerConfig() {
+        return consumerConfig;
+    }
+
+    protected String getTopic() {
+        return topic;
+    }
+
+    protected ResumeCache<K, V> getResumeCache() {
+        return resumeCache;
+    }
+
+    /**
+     * Clear the producer errors
+     */
+    public void resetProducerErrors() {
+        producerErrors.clear();
+    }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
index 181009da26c..52388fd9081 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
@@ -25,15 +25,16 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
-import org.apache.camel.Service;
-import org.apache.camel.UpdatableConsumerResumeStrategy;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
+import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter;
 import org.apache.camel.component.kafka.consumer.support.KafkaResumable;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.Resumables;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTestSupport {
@@ -56,16 +58,13 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
     private MockEndpoint result;
 
     @BindToRegistry("resumeStrategy")
-    private TestKafkaConsumerResumeStrategy resumeStrategy;
+    private TestUpdateStrategy resumeStrategy;
     private CountDownLatch messagesLatch;
     private KafkaProducer<Object, Object> producer;
 
-    private static class TestKafkaConsumerResumeStrategy
-            implements KafkaConsumerResumeStrategy,
-            UpdatableConsumerResumeStrategy<String, Integer, Resumable<String, Integer>>, Service {
+    private static class TestUpdateStrategy extends TransientResumeStrategy
+            implements UpdatableConsumerResumeStrategy<String, Integer, Resumable<String, Integer>> {
         private final CountDownLatch messagesLatch;
-        private boolean resumeCalled;
-        private boolean consumerIsNull = true;
         private boolean startCalled;
         private boolean offsetNull = true;
         private boolean offsetAddressableNull = true;
@@ -74,34 +73,10 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
         private boolean offsetValueEmpty = true;
         private int lastOffset;
 
-        public TestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch) {
-            this.messagesLatch = messagesLatch;
-        }
-
-        @Override
-        public void setConsumer(Consumer<?, ?> consumer) {
-            if (consumer != null) {
-                consumerIsNull = false;
-            }
-        }
-
-        @Override
-        public void resume(KafkaResumable resumable) {
-            resumeCalled = true;
-
-        }
-
-        @Override
-        public void resume() {
-            resumeCalled = true;
-        }
-
-        public boolean isResumeCalled() {
-            return resumeCalled;
-        }
+        public TestUpdateStrategy(ResumeAdapter resumeAdapter, CountDownLatch messagesLatch) {
+            super(resumeAdapter);
 
-        public boolean isConsumerIsNull() {
-            return consumerIsNull;
+            this.messagesLatch = messagesLatch;
         }
 
         @Override
@@ -115,12 +90,8 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
             LOG.warn("Init was called");
         }
 
-        public boolean isStartCalled() {
-            return startCalled;
-        }
-
         @Override
-        public void updateLastOffset(Resumable<String, Integer> offset) {
+        public void updateLastOffset(Resumable<String, Integer> offset) throws Exception {
             try {
                 if (offset != null) {
                     offsetNull = false;
@@ -166,6 +137,40 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
         public boolean isOffsetValueEmpty() {
             return offsetValueEmpty;
         }
+
+        public boolean isStartCalled() {
+            return startCalled;
+        }
+    }
+
+    private static class TestKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
+        private boolean resumeCalled;
+        private boolean consumerIsNull = true;
+
+        @Override
+        public void setConsumer(Consumer<?, ?> consumer) {
+            if (consumer != null) {
+                consumerIsNull = false;
+            }
+        }
+
+        @Override
+        public void setKafkaResumable(KafkaResumable kafkaResumable) {
+
+        }
+
+        @Override
+        public void resume() {
+            resumeCalled = true;
+        }
+
+        public boolean isResumeCalled() {
+            return resumeCalled;
+        }
+
+        public boolean isConsumerIsNull() {
+            return consumerIsNull;
+        }
     }
 
     @BeforeEach
@@ -183,7 +188,7 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
         super.doPreSetup();
 
         messagesLatch = new CountDownLatch(1);
-        resumeStrategy = new TestKafkaConsumerResumeStrategy(messagesLatch);
+        resumeStrategy = new TestUpdateStrategy(new TestKafkaConsumerResumeAdapter(), messagesLatch);
     }
 
     @Test
@@ -191,9 +196,11 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
     public void testOffsetIsBeingChecked() throws InterruptedException {
         assertTrue(messagesLatch.await(100, TimeUnit.SECONDS), "The resume was not called");
 
-        assertTrue(resumeStrategy.isResumeCalled(),
+        final TestKafkaConsumerResumeAdapter adapter = resumeStrategy.getAdapter(TestKafkaConsumerResumeAdapter.class);
+        assertNotNull(adapter, "The adapter should not be null");
+        assertTrue(adapter.isResumeCalled(),
                 "The resume strategy should have been called when the partition was assigned");
-        assertFalse(resumeStrategy.isConsumerIsNull(),
+        assertFalse(adapter.isConsumerIsNull(),
                 "The consumer passed to the strategy should not be null");
         assertTrue(resumeStrategy.isStartCalled(),
                 "The resume strategy should have been started");
diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
index a79ce85a743..2e699a2de92 100644
--- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
+++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
@@ -21,8 +21,6 @@ import java.util.Optional;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.ResumeStrategy;
 import org.apache.camel.StartupListener;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.api.management.ManagedAttribute;
@@ -31,6 +29,8 @@ import org.apache.camel.cluster.CamelClusterEventListener;
 import org.apache.camel.cluster.CamelClusterMember;
 import org.apache.camel.cluster.CamelClusterService;
 import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
diff --git a/core/camel-api/src/main/java/org/apache/camel/Route.java b/core/camel-api/src/main/java/org/apache/camel/Route.java
index 0e6f90882c5..c5f46ec328c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Route.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Route.java
@@ -21,6 +21,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.camel.resume.ConsumerListener;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.ManagementInterceptStrategy;
 import org.apache.camel.spi.Resource;
diff --git a/core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java b/core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListener.java
similarity index 98%
rename from core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListener.java
index d92808e5d2a..bbf6be9d018 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListener.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 import java.util.function.Predicate;
 
diff --git a/core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java b/core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListenerAware.java
similarity index 97%
rename from core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListenerAware.java
index bbd24c59e47..2e188721028 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListenerAware.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 /**
  * An interface to represent an object which wishes to support listening for consumer events using the
diff --git a/core/camel-api/src/main/java/org/apache/camel/Offset.java b/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
similarity index 96%
rename from core/camel-api/src/main/java/org/apache/camel/Offset.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
index 1e73d1dcc2a..c41f312d49a 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Offset.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 /**
  * Generic offset without a concrete type
diff --git a/core/camel-api/src/main/java/org/apache/camel/Resumable.java b/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
similarity index 98%
rename from core/camel-api/src/main/java/org/apache/camel/Resumable.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
index 804c05bf9a3..108af04293a 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Resumable.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 /**
  * This provides an interface for resumable objects. Such objects allow its users to address them at a specific offset.
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
similarity index 98%
rename from core/camel-api/src/main/java/org/apache/camel/ResumableSet.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
index 0a18ba9e232..d09a61f7e72 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 import java.lang.reflect.Array;
 import java.util.Arrays;
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
similarity index 62%
copy from core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
copy to core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
index f23e7625e75..53221ac7d10 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 /**
- * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
- * processing records.
+ * A resume adapter provides the component-specific logic that plugs the more generic strategic with the lower level
+ * requirements of the component being used.
+ *
+ * It is the responsibility of the supported components to implement the custom implementation for this part of the
+ * resume API, as well as to offer component-specific interfaces that can be specialized by other integrations.
  */
-public interface ResumeStrategy extends Service {
-
+public interface ResumeAdapter {
     /**
-     * A consumer, iterator or value class that can be used to set the index position from which to resume from. The
-     * type is specific to the component.
-     *
+     * Execute the resume logic for the adapter
      */
     void resume();
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeAware.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAware.java
similarity index 97%
rename from core/camel-api/src/main/java/org/apache/camel/ResumeAware.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/ResumeAware.java
index 6ee69db413f..10f59b3c00c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeAware.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAware.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 /**
  * An interface to represent an object which wishes to support resume operations using a {@link ResumeStrategy}.
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
similarity index 65%
copy from core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
copy to core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index f23e7625e75..06096be3b1e 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -15,7 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
+
+import org.apache.camel.Service;
 
 /**
  * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
@@ -24,9 +26,20 @@ package org.apache.camel;
 public interface ResumeStrategy extends Service {
 
     /**
-     * A consumer, iterator or value class that can be used to set the index position from which to resume from. The
-     * type is specific to the component.
-     *
+     * Gets an adapter for resuming operations
+     * 
+     * @return
+     */
+    ResumeAdapter getAdapter();
+
+    /**
+     * Gets and adapter for resuming operations
+     * 
+     * @param  clazz the class of the adapter
+     * @return       the adapter or null if it can't be cast to the requested class
+     * @param  <T>   the type of the adapter
      */
-    void resume();
+    default <T extends ResumeAdapter> T getAdapter(Class<T> clazz) {
+        return clazz.cast(getAdapter());
+    }
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/UpdatableConsumerResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
similarity index 97%
rename from core/camel-api/src/main/java/org/apache/camel/UpdatableConsumerResumeStrategy.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
index 34bd71690f2..af6e682f198 100644
--- a/core/camel-api/src/main/java/org/apache/camel/UpdatableConsumerResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 /**
  * An updatable resume strategy
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
similarity index 71%
rename from components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeStrategy.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
index 294e77223ab..0708d1ddff0 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
@@ -15,8 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.file.consumer;
+package org.apache.camel.resume.cache;
 
-public interface GenericFileResumeStrategy<T> extends FileConsumerResumeStrategy<GenericFileResumable<T>> {
+/**
+ * A cache where an entry can point to one or more entries. For instance, a path as the key and the file entries as its
+ * entries
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public interface MultiEntryCache<K, V> extends ResumeCache<K, V> {
 
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeCache.java b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
similarity index 76%
rename from core/camel-api/src/main/java/org/apache/camel/ResumeCache.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
index 0abe2e6dca7..c2ce3758f33 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeCache.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume.cache;
 
-import java.util.Optional;
+import org.apache.camel.resume.ResumeStrategy;
 
 /**
  * This cache stored the resumed data from a {@link ResumeStrategy}.
@@ -27,6 +27,15 @@ import java.util.Optional;
  */
 public interface ResumeCache<K, V> {
 
+    /**
+     * Whether the cache contains the key with the given entry value
+     *
+     * @param  key   the key
+     * @param  entry the entry
+     * @return       true if the key/entry pair is stored in the cache
+     */
+    boolean contains(K key, V entry);
+
     /**
      * Adds a value to the cache
      * 
@@ -43,10 +52,9 @@ public interface ResumeCache<K, V> {
     boolean isFull();
 
     /**
-     * Gets the offset value for the key
+     * Gets the cache pool size
      * 
-     * @param  key the key
-     * @return     the key
+     * @return
      */
-    Optional<V> get(K key);
+    long capacity();
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
similarity index 67%
rename from core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
index f23e7625e75..e8b243846e7 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
@@ -15,18 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume.cache;
+
+import java.util.Optional;
 
 /**
- * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
- * processing records.
+ * A resume cache where a single key can only be mapped to a single entry
+ *
+ * @param <K> the type the key
+ * @param <V> the type of the entry
  */
-public interface ResumeStrategy extends Service {
-
+public interface SingleEntryCache<K, V> extends ResumeCache<K, V> {
     /**
-     * A consumer, iterator or value class that can be used to set the index position from which to resume from. The
-     * type is specific to the component.
+     * Gets the offset value for the key
      *
+     * @param  key the key
+     * @return     the key
      */
-    void resume();
+    Optional<V> get(K key);
+
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
index 8a8fc6a10d5..c35da8960f3 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
@@ -28,15 +28,11 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
-import org.apache.camel.ConsumerListener;
-import org.apache.camel.ConsumerListenerAware;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.NamedNode;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.ResumeStrategy;
 import org.apache.camel.Route;
 import org.apache.camel.RouteAware;
 import org.apache.camel.Service;
@@ -44,6 +40,10 @@ import org.apache.camel.ShutdownRoute;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.Suspendable;
 import org.apache.camel.SuspendableService;
+import org.apache.camel.resume.ConsumerListener;
+import org.apache.camel.resume.ConsumerListenerAware;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.ManagementInterceptStrategy;
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index 8176f7d6a52..bcaee49fecf 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -34,7 +34,7 @@ This instance can be bound in the Context registry as follows:
 
 [source,java]
 ----
-getCamelContext().getRegistry().bind("testResumeStrategy", new MyTestResumeStrategy());
+getCamelContext().getRegistry().bind("testResumeStrategy", new MyTestResumeStrategy(new MyAdapter()));
 
 from("some:component")
     .resumable("testResumeStrategy")
@@ -46,129 +46,70 @@ Or the instance can be constructed as follows:
 [source,java]
 ----
 from("some:component")
-    .resumable(new MyTestResumeStrategy())
+    .resumable(new MyTestResumeStrategy(new MyAdapter()))
     .process(this::process)
 ----
 
+=== The Resume Adapter
+
+The adapter class responsibility is to bind the component-specific part of the logic to the more generic handling of the
+resume strategy. The adapter is always component specific and some components may have more than one. Integrations with
+more complex resume processes, may implement their own adapters, although the builtin ones should be useful in most of the
+cases. Currently, the following adapters are available:
+
+* camel-atom: `org.apache.camel.component.feed.EntryFilter`
+* camel-aws2-kinesis: `org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter`
+* camel-cassandracql: `org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter`
+* camel-couchbase: `org.apache.camel.component.couchbase.CouchbaseResumeAdapter`
+* camel-couchdb: `org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter`
+* camel-file: `org.apache.camel.component.file.consumer.adapters.FileSetResumeAdapter` for directories
+* camel-file: `org.apache.camel.component.file.consumer.adapters.GenericFileResumeAdapter` for files
+* camel-kafka: `org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter`
+* camel-rss: `org.apache.camel.component.feed.EntryFilter`
+* generic: `org.apache.camel.processor.resume.DelegatingResumeAdapter`
+
+Note: in the future, these adapters will be resolved automatically by Camel.
+
 == The Resume API Interfaces
 
 These are the *core interfaces*:
 
-* `org.apache.camel.ResumeStrategy` - the basic resume strategy
-* `org.apache.camel.UpdatableConsumerResumeStrategy` - an extension to the resume strategy to allow updatable strategies
-* `org.apache.camel.ResumeCache` - an interface for local cache for resumable information
+* `org.apache.camel.resume.ResumeStrategy` - the resume strategy service
+* `org.apache.camel.resume.ResumeAdapter` - an adapter that binds the generic parts of the resume strategy with the component
+* `org.apache.camel.resume.UpdatableConsumerResumeStrategy` - an extension to the resume strategy to allow updatable strategies
+* `org.apache.camel.resume.cache.ResumeCache` - the base interface for local cache for resumable information
+* `org.apache.camel.resume.cache.SingleEntryCache` - an interface for local cache for resumable information where there is a one-to-one relationship between cache a key and its entry (i.e: a file and its offset)
+* `org.apache.camel.resume.cache.MultiEntryCache` - an interface for local cache for resumable information where there is a one-to-many relationship between the cache a keys and its entries (i.e.: a path and its file entries)
 
 These are the *core classes* supporting the strategies:
 
-* `org.apache.camel.Resumable` - an interface to allow users to work with abstract resumable entities (files, offsets, etc)
-* `org.apache.camel.ResumableSet` - an interface for resumables with a 1-to-many relationship
-* `org.apache.camel.Offset` - a generic offset without a concrete type (it may represent a long, a file name, etc)
+* `org.apache.camel.resume.Resumable` - an interface to allow users to work with abstract resumable entities (files, offsets, etc)
+* `org.apache.camel.resume.ResumableSet` - an interface for resumables with a 1-to-many relationship
+* `org.apache.camel.resume.Offset` - a generic offset without a concrete type (it may represent a long, a file name, etc)
 
 These are the *supporting classes*:
 
 * `org.apache.camel.support.Resumables` - resumables handling support
 * `org.apache.camel.support.Offsets` - offset handling support
 
-== Basic Strategies
-
-The basic strategies offer a component-specific skeleton that can be used to implement strategies.
-
-* `AbstractKafkaResumeStrategy` - a resume strategy from the `camel-kafka` component that uses Kafka as the store for the offsets.
-
-[source,java]
-----
-public class KafkaResumeStrategy<K> extends AbstractKafkaResumeStrategy<K, Long> implements GenericFileResumeStrategy<File> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaResumeStrategy.class);
-    public static final int CACHE_SIZE = 100;
-
-    private final String topic;
-    private final ResumeCache<K, Long> cache;
+== Builtin Resume Strategies
 
-    public KafkaResumeStrategy(String topic,
-                               ResumeCache<K, Long> cache,
-                               DefaultProducerPropertyFactory producerPropertyFactory,
-                               DefaultConsumerPropertyFactory consumerPropertyFactory)
-    {
-        super(topic, cache, producerPropertyFactory.getProperties(), consumerPropertyFactory.getProperties());
-        this.topic = topic;
-        this.cache = cache;
-    }
+Camel comes with a few builtin strategies that can be used to store, retrieve and update the offsets. The following strategies are available:
 
+* `SingleNodeKafkaResumeStrategy` - a resume strategy from the `camel-kafka` component that uses Kafka as the store for the offsets and is suitable for single node integrations.
+* `MultiNodeKafkaResumeStrategy` - a resume strategy from the `camel-kafka` component that uses Kafka as the store for the offsets and is suitable for multi node integrations (i.e.: integrations running on clusters using the xref:components::master-component.adoc[camel-master] component.
 
-    private Optional<Long> getLastOffset(GenericFileResumable<File> resumable) {
-        final File addressable = resumable.getAddressable();
-        return getLastOffset((K) addressable);
-    }
 
-    public Optional<Long> getLastOffset(K addressable) {
-        return cache.get(addressable);
-    }
-
-    @Override
-    public void subscribe() {
-        checkAndSubscribe(topic, 1);
-    }
-
-    @Override
-    public void resume(GenericFileResumable<File> resumable) {
-        final Optional<Long> lastOffsetOpt = getLastOffset(resumable);
-
-        if (!lastOffsetOpt.isPresent()) {
-            return;
-        }
-
-        final long lastOffset = lastOffsetOpt.get();
-        resumable.updateLastOffset(lastOffset);
-    }
-
-    @Override
-    public void resume() {
-        throw new UnsupportedOperationException("Cannot perform blind resume");
-    }
-}
-----
+=== Implementing New Builtin Resume Strategies
 
+New builtin resume strategies can be created by implementing the `UpdatableConsumerResumeStrategy` and the `ResumeStrategy` interfaces. Check the code for `SingleNodeKafkaResumeStrategy` for implementation details.
 
 == Local Cache Support
 
 A sample local cache implemented using https://github.com/ben-manes/caffeine[Caffeine].
 
-[source,java]
-----
-public class SingleItemCache<K> implements ResumeCache<K, Long> {
-    public static final int CACHE_SIZE = 100;
-    private final Cache<K, Long> cache = Caffeine.newBuilder()
-            .maximumSize(CACHE_SIZE)
-            .build();
-
-    @Override
-    public void add(K key, Long offsetValue) {
-        cache.put(key, offsetValue);
-    }
-
-    @Override
-    public Optional<Long> get(K key) {
-        Long entry = cache.getIfPresent(key);
-
-        if (entry == null) {
-            return Optional.empty();
-        }
-
-        return Optional.of(entry.longValue());
-    }
-
-    @Override
-    public boolean isFull() {
-        if (cache.estimatedSize() < CACHE_SIZE) {
-            return true;
-        }
-
-        return false;
-    }
-}
-----
-
+* `org.apache.camel.component.caffeine.resume.single.CaffeineCache`: for data with where 1 key can only point to 1 entry (1-to-1 relationship)
+* `org.apache.camel.component.caffeine.resume.multi.CaffeineCache`: for data with where 1 key can point to 1 or more entries (1-to-many relationship)
 
 == Known Limitations
 
@@ -209,7 +150,7 @@ Currently, support for pausable consumers is available for the following compone
 
 To use the API, it needs an instance of a Consumer listener along with a predicate that tests whether to continue.
 
-* `org.apache.camel.ConsumerListener` - the consumer listener interface. Camel already comes with pre-built consumer listeners, but users in need of more complex behaviors can create their own listeners.
+* `org.apache.camel.resume.ConsumerListener` - the consumer listener interface. Camel already comes with pre-built consumer listeners, but users in need of more complex behaviors can create their own listeners.
 * a predicate that returns true if data consumption should resume or false if consumption should be put on pause
 
 Usage example:
diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
index 88b088805ab..f1526b5311b 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
@@ -12,7 +12,7 @@
     "output": false
   },
   "properties": {
-    "consumerListener": { "kind": "attribute", "displayName": "Consumer Listener", "required": true, "type": "object", "javaType": "org.apache.camel.ConsumerListener", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the consumer listener to use" },
+    "consumerListener": { "kind": "attribute", "displayName": "Consumer Listener", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ConsumerListener", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the consumer listener to use" },
     "untilCheck": { "kind": "attribute", "displayName": "Until Check", "required": true, "type": "object", "javaType": "java.util.function.Predicate", "deprecated": false, "autowired": false, "secret": false },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json
index acbea228865..03a0a440e46 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json
@@ -12,7 +12,7 @@
     "output": false
   },
   "properties": {
-    "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" },
+    "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
   }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
index d6f1bdfcaf2..7c8ceef4738 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
@@ -25,7 +25,7 @@ import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
-import org.apache.camel.ConsumerListener;
+import org.apache.camel.resume.ConsumerListener;
 import org.apache.camel.spi.Metadata;
 
 /**
@@ -40,7 +40,7 @@ public class PausableDefinition extends NoOutputDefinition<PausableDefinition> {
     private ConsumerListener<?, ?> consumerListenerBean;
 
     @XmlAttribute(required = true)
-    @Metadata(required = true, javaType = "org.apache.camel.ConsumerListener")
+    @Metadata(required = true, javaType = "org.apache.camel.resume.ConsumerListener")
     private String consumerListener;
 
     @XmlTransient
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index af065fcbae0..e2b0cadf078 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -37,7 +37,6 @@ import org.apache.camel.AggregationStrategy;
 import org.apache.camel.BeanScope;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
-import org.apache.camel.ConsumerListener;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -45,7 +44,6 @@ import org.apache.camel.Expression;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeStrategy;
 import org.apache.camel.builder.DataFormatClause;
 import org.apache.camel.builder.EndpointConsumerBuilder;
 import org.apache.camel.builder.EndpointProducerBuilder;
@@ -60,6 +58,8 @@ import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.model.language.LanguageExpression;
 import org.apache.camel.model.language.SimpleExpression;
 import org.apache.camel.processor.loadbalancer.LoadBalancer;
+import org.apache.camel.resume.ConsumerListener;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.AsEndpointUri;
 import org.apache.camel.spi.AsPredicate;
 import org.apache.camel.spi.DataFormat;
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
index 4e2d62deb86..bc68161ae30 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
@@ -22,7 +22,7 @@ import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.Metadata;
 
 /**
@@ -37,7 +37,7 @@ public class ResumableDefinition extends NoOutputDefinition<ResumableDefinition>
     private ResumeStrategy resumeStrategyBean;
 
     @XmlAttribute(required = true)
-    @Metadata(required = true, javaType = "org.apache.camel.ResumeStrategy")
+    @Metadata(required = true, javaType = "org.apache.camel.resume.ResumeStrategy")
     private String resumeStrategy;
 
     @Metadata(label = "advanced", javaType = "org.apache.camel.LoggingLevel", defaultValue = "ERROR",
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
index 85954c3b6f7..09c3ab6db35 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
@@ -24,10 +24,10 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
-import org.apache.camel.ConsumerListener;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
+import org.apache.camel.resume.ConsumerListener;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java
new file mode 100644
index 00000000000..6aee7fd8f4f
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.function.IntFunction;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+
+import org.apache.camel.resume.ResumeAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A delegating adapter that can be used to delegate to and/or abstract resume adapters
+ */
+public class DelegatingResumeAdapter implements ResumeAdapter {
+    private static final Logger LOG = LoggerFactory.getLogger(DelegatingResumeAdapter.class);
+
+    private final List<ResumeAdapter> resumeStrategies;
+
+    public DelegatingResumeAdapter() {
+        resumeStrategies = new ArrayList<>();
+    }
+
+    protected DelegatingResumeAdapter(List<ResumeAdapter> resumeStrategies) {
+        this.resumeStrategies = resumeStrategies;
+    }
+
+    public boolean add(ResumeAdapter resumeAdapter) {
+        return resumeStrategies.add(resumeAdapter);
+    }
+
+    public boolean remove(Object resumeAdapter) {
+        return resumeStrategies.remove(resumeAdapter);
+    }
+
+    public boolean removeIf(Predicate<? super ResumeAdapter> filter) {
+        return resumeStrategies.removeIf(filter);
+    }
+
+    @Override
+    public void resume() {
+        resumeStrategies.forEach(ResumeAdapter::resume);
+    }
+
+    public int size() {
+        return resumeStrategies.size();
+    }
+
+    public boolean isEmpty() {
+        return resumeStrategies.isEmpty();
+    }
+
+    public boolean contains(Object o) {
+        return resumeStrategies.contains(o);
+    }
+
+    public Iterator<ResumeAdapter> iterator() {
+        return resumeStrategies.iterator();
+    }
+
+    public Object[] toArray() {
+        return resumeStrategies.toArray();
+    }
+
+    public <T> T[] toArray(T[] a) {
+        return resumeStrategies.toArray(a);
+    }
+
+    public boolean containsAll(Collection<?> c) {
+        return resumeStrategies.containsAll(c);
+    }
+
+    public boolean addAll(Collection<? extends ResumeAdapter> c) {
+        return resumeStrategies.addAll(c);
+    }
+
+    public boolean addAll(int index, Collection<? extends ResumeAdapter> c) {
+        return resumeStrategies.addAll(index, c);
+    }
+
+    public boolean removeAll(Collection<?> c) {
+        return resumeStrategies.removeAll(c);
+    }
+
+    public boolean retainAll(Collection<?> c) {
+        return resumeStrategies.retainAll(c);
+    }
+
+    public void replaceAll(UnaryOperator<ResumeAdapter> operator) {
+        resumeStrategies.replaceAll(operator);
+    }
+
+    public void sort(Comparator<? super ResumeAdapter> c) {
+        resumeStrategies.sort(c);
+    }
+
+    public void clear() {
+        resumeStrategies.clear();
+    }
+
+    public ResumeAdapter get(int index) {
+        return resumeStrategies.get(index);
+    }
+
+    public ResumeAdapter set(int index, ResumeAdapter element) {
+        return resumeStrategies.set(index, element);
+    }
+
+    public void add(int index, ResumeAdapter element) {
+        resumeStrategies.add(index, element);
+    }
+
+    public ResumeAdapter remove(int index) {
+        return resumeStrategies.remove(index);
+    }
+
+    public int indexOf(Object o) {
+        return resumeStrategies.indexOf(o);
+    }
+
+    public int lastIndexOf(Object o) {
+        return resumeStrategies.lastIndexOf(o);
+    }
+
+    public ListIterator<ResumeAdapter> listIterator() {
+        return resumeStrategies.listIterator();
+    }
+
+    public ListIterator<ResumeAdapter> listIterator(int index) {
+        return resumeStrategies.listIterator(index);
+    }
+
+    public List<ResumeAdapter> subList(int fromIndex, int toIndex) {
+        return resumeStrategies.subList(fromIndex, toIndex);
+    }
+
+    public Spliterator<ResumeAdapter> spliterator() {
+        return resumeStrategies.spliterator();
+    }
+
+    public <T> T[] toArray(IntFunction<T[]> generator) {
+        return resumeStrategies.toArray(generator);
+    }
+
+    public Stream<ResumeAdapter> stream() {
+        return resumeStrategies.stream();
+    }
+
+    public Stream<ResumeAdapter> parallelStream() {
+        return resumeStrategies.parallelStream();
+    }
+
+    public void forEach(Consumer<? super ResumeAdapter> action) {
+        resumeStrategies.forEach(action);
+    }
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeStrategy.java
deleted file mode 100644
index 71ea930f95c..00000000000
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeStrategy.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.processor.resume;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Predicate;
-
-import org.apache.camel.ResumeStrategy;
-import org.apache.camel.Service;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A delegating strategy that can be used to delegate to and/or abstract resume strategies
- */
-public class DelegatingResumeStrategy implements ResumeStrategy {
-    private static final Logger LOG = LoggerFactory.getLogger(DelegatingResumeStrategy.class);
-
-    private final List<ResumeStrategy> resumeStrategies;
-
-    public DelegatingResumeStrategy() {
-        resumeStrategies = new ArrayList<>();
-    }
-
-    protected DelegatingResumeStrategy(List<ResumeStrategy> resumeStrategies) {
-        this.resumeStrategies = resumeStrategies;
-    }
-
-    public boolean add(ResumeStrategy resumeStrategy) {
-        return resumeStrategies.add(resumeStrategy);
-    }
-
-    public boolean remove(Object resumeStrategy) {
-        return resumeStrategies.remove(resumeStrategy);
-    }
-
-    public boolean removeIf(Predicate<? super ResumeStrategy> filter) {
-        return resumeStrategies.removeIf(filter);
-    }
-
-    @Override
-    public void resume() {
-        resumeStrategies.forEach(ResumeStrategy::resume);
-    }
-
-    @Override
-    public void start() {
-        resumeStrategies.forEach(Service::start);
-    }
-
-    @Override
-    public void stop() {
-        resumeStrategies.forEach(Service::stop);
-    }
-
-    @Override
-    public void build() {
-        resumeStrategies.forEach(Service::build);
-    }
-
-    @Override
-    public void init() {
-        resumeStrategies.forEach(Service::init);
-    }
-
-    private void close(ResumeStrategy resumeStrategy) {
-        try {
-            resumeStrategy.close();
-        } catch (IOException e) {
-            LOG.warn("Failed to close resume strategy {}: {}", resumeStrategy.getClass().getSimpleName(), e.getMessage());
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        resumeStrategies.forEach(r -> close(r));
-    }
-
-    @Override
-    public String toString() {
-        return "DelegatingResumeStrategy{" +
-               "resumeStrategies=" + resumeStrategies +
-               '}';
-    }
-}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
index a09d04591e6..aa347da63e1 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
@@ -19,9 +19,9 @@ package org.apache.camel.processor.resume;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
-import org.apache.camel.Resumable;
-import org.apache.camel.ResumeStrategy;
-import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ExchangeHelper;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
index 240376d5acf..34b2b475c7e 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
@@ -29,7 +29,7 @@ import org.apache.camel.ExtendedExchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.Synchronization;
@@ -67,6 +67,13 @@ public class ResumableProcessor extends AsyncProcessorSupport
         super.doStart();
     }
 
+    @Override
+    protected void doStop() throws Exception {
+        LOG.info("Stopping the resumable strategy: {}", resumeStrategy.getClass().getSimpleName());
+        resumeStrategy.stop();
+        super.doStop();
+    }
+
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         final Synchronization onCompletion = new ResumableCompletion(resumeStrategy, loggingLevel);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
similarity index 54%
copy from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
copy to core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 388f47fa3e5..b0ffd26c080 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -15,33 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.processor.resume;
 
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
-import org.apache.camel.resume.Offsets;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.ResumeStrategy;
 
-public class KafkaResumable implements Resumable<String, String> {
-    private final String partition;
-    private String offset;
+/**
+ * A resume strategy that keeps all the resume strategy information in memory. This is hardly useful for production
+ * level implementations, but can be useful for testing the resume strategies
+ */
+public class TransientResumeStrategy implements ResumeStrategy {
+    private final ResumeAdapter resumeAdapter;
 
-    public KafkaResumable(String partition, String offset) {
-        this.partition = partition;
-        this.offset = offset;
+    public TransientResumeStrategy(ResumeAdapter resumeAdapter) {
+        this.resumeAdapter = resumeAdapter;
     }
 
     @Override
-    public void updateLastOffset(String offset) {
-        this.offset = offset;
+    public ResumeAdapter getAdapter() {
+        return resumeAdapter;
     }
 
     @Override
-    public Offset<String> getLastOffset() {
-        return Offsets.of(offset);
+    public void start() {
+        // this is NO-OP
     }
 
     @Override
-    public String getAddressable() {
-        return partition;
+    public void stop() {
+        // this is NO-OP
     }
 }
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
index 0626fcb05e5..f67406b871d 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
@@ -19,12 +19,12 @@ package org.apache.camel.reifier;
 
 import java.util.function.Predicate;
 
-import org.apache.camel.ConsumerListener;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.model.PausableDefinition;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.processor.PausableProcessor;
+import org.apache.camel.resume.ConsumerListener;
 import org.apache.camel.util.ObjectHelper;
 
 public class PausableReifier extends ProcessorReifier<PausableDefinition> {
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
index c96b14a941c..da5c21e9dc1 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
@@ -18,11 +18,11 @@ package org.apache.camel.reifier;
 
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeStrategy;
 import org.apache.camel.Route;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.ResumableDefinition;
 import org.apache.camel.processor.resume.ResumableProcessor;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.util.ObjectHelper;
 
 public class ResumableReifier extends ProcessorReifier<ResumableDefinition> {
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index f258e5e32c9..9053c2ca3d3 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -20,25 +20,27 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
-import org.apache.camel.Resumable;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.UpdatableConsumerResumeStrategy;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.consumer.GenericFileResumable;
-import org.apache.camel.component.file.consumer.GenericFileResumeStrategy;
+import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.Resumables;
+import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 
 public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport {
 
-    private static class TestResumeStrategy implements GenericFileResumeStrategy<File> {
+    private static class TestFileResumeAdapter implements GenericFileResumeAdapter {
         @Override
         public void resume(GenericFileResumable<File> resumable) {
             if (!resumable.getAddressable().getName().startsWith("resume-from-offset")) {
@@ -55,17 +57,12 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
         }
 
         @Override
-        public void start() {
-
-        }
-
-        @Override
-        public void stop() {
-
+        public Optional<Long> getLastOffset(File addressable) {
+            return Optional.empty();
         }
     }
 
-    private static class FailResumeStrategy extends TestResumeStrategy
+    private static class FailResumeAdapter extends TestFileResumeAdapter
             implements UpdatableConsumerResumeStrategy<File, Long, Resumable<File, Long>> {
         private boolean called;
 
@@ -75,7 +72,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
         }
     }
 
-    private static final FailResumeStrategy FAIL_RESUME_STRATEGY = new FailResumeStrategy();
+    private static final TransientResumeStrategy FAIL_RESUME_STRATEGY = new TransientResumeStrategy(new FailResumeAdapter());
 
     @DisplayName("Tests whether it can resume from an offset")
     @Test
@@ -106,7 +103,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
 
         List<Exchange> exchangeList = mock.getExchanges();
         Assertions.assertFalse(exchangeList.isEmpty(), "It should have received a few messages");
-        Assertions.assertFalse(FAIL_RESUME_STRATEGY.called);
+        Assertions.assertFalse(((FailResumeAdapter) FAIL_RESUME_STRATEGY.getAdapter()).called);
     }
 
     @DisplayName("Tests whether we can start from the beginning (i.e.: no resume strategy)")
@@ -127,7 +124,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
             @Override
             public void configure() {
 
-                bindToRegistry("myResumeStrategy", new TestResumeStrategy());
+                bindToRegistry("myResumeStrategy", new TransientResumeStrategy(new TestFileResumeAdapter()));
                 bindToRegistry("resumeNotToBeCalledStrategy", FAIL_RESUME_STRATEGY);
 
                 from(fileUri("resumeOff?noop=true&recursive=true"))
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index c8308b0b0ff..b0f6ef96026 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -25,14 +25,15 @@ import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.consumer.FileResumeSet;
-import org.apache.camel.component.file.consumer.FileSetResumeStrategy;
+import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.resume.Resumables;
 import org.junit.jupiter.api.Test;
 
 public class FileConsumerResumeStrategyTest extends ContextTestSupport {
 
-    private static class TestResumeStrategy implements FileSetResumeStrategy {
+    private static class TestFileSetResumeAdapter implements FileSetResumeAdapter {
         private List<String> processedFiles = Arrays.asList("0.txt", "1.txt", "2.txt");
         private FileResumeSet resumeSet;
 
@@ -49,16 +50,6 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
                 resumeSet.resumeEach(f -> !processedFiles.contains(f.getName()));
             }
         }
-
-        @Override
-        public void start() {
-
-        }
-
-        @Override
-        public void stop() {
-
-        }
     }
 
     private static Map<String, Object> headerFor(int num) {
@@ -99,7 +90,7 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
             @Override
             public void configure() throws Exception {
 
-                bindToRegistry("testResumeStrategy", new TestResumeStrategy());
+                bindToRegistry("testResumeStrategy", new TransientResumeStrategy(new TestFileSetResumeAdapter()));
 
                 from(fileUri("resume?noop=true&recursive=true"))
                         .resumable("testResumeStrategy")
diff --git a/core/camel-support/src/main/java/org/apache/camel/resume/Offsets.java b/core/camel-support/src/main/java/org/apache/camel/resume/Offsets.java
index a7f46836879..ab5421af693 100644
--- a/core/camel-support/src/main/java/org/apache/camel/resume/Offsets.java
+++ b/core/camel-support/src/main/java/org/apache/camel/resume/Offsets.java
@@ -17,8 +17,6 @@
 
 package org.apache.camel.resume;
 
-import org.apache.camel.Offset;
-
 /**
  * Offset handling support
  */
diff --git a/core/camel-support/src/main/java/org/apache/camel/resume/Resumables.java b/core/camel-support/src/main/java/org/apache/camel/resume/Resumables.java
index 38bfed33c69..05a931cfd6b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/resume/Resumables.java
+++ b/core/camel-support/src/main/java/org/apache/camel/resume/Resumables.java
@@ -17,9 +17,6 @@
 
 package org.apache.camel.resume;
 
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
-
 /**
  * A wrapper for resumable entities
  */