You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/05/03 13:14:50 UTC

[camel] branch main updated (0208c0c65c6 -> a68c2607e5f)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from 0208c0c65c6 CAMEL-18035: camel-jbang - Run * should better detect what is Camel routes and what are misc files
     new 70bf8befe16 CAMEL-17762: fixed resumable javadoc
     new a68c2607e5f CAMEL-17762: rework the resume strategy to separate concerns

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/camel/catalog/models/pausable.json  |   2 +-
 .../org/apache/camel/catalog/models/resumable.json |   2 +-
 .../apache/camel/component/feed/EntryFilter.java   |  14 +-
 .../component/feed/FeedEntryPollingConsumer.java   |  24 ++-
 .../AtomEntryPollingConsumerWithResumeTest.java    |   3 +-
 .../component/aws2/kinesis/Kinesis2Consumer.java   |  26 ++-
 ...sumeStrategy.java => KinesisResumeAdapter.java} |  23 +--
 ... => KinesisUserConfigurationResumeAdapter.java} |   4 +-
 .../caffeine/resume/multi/CaffeineCache.java       |  98 +++++++++
 .../caffeine/resume/single/CaffeineCache.java      |  93 +++++++++
 .../component/cassandra/CassandraConsumer.java     |  20 +-
 ...meStrategy.java => CassandraResumeAdapter.java} |  16 +-
 .../CassandraComponentResumeStrategyIT.java        |   9 +-
 .../component/couchbase/CouchbaseConsumer.java     |  18 +-
 ...meStrategy.java => CouchbaseResumeAdapter.java} |  16 +-
 .../integration/ConsumeResumeStrategyIT.java       |  16 +-
 components/camel-couchdb/pom.xml                   |   4 +
 .../component/couchdb/CouchDbChangesetTracker.java |  15 +-
 .../camel/component/couchdb/CouchDbConsumer.java   |  12 +-
 .../couchdb/consumer/CouchDbResumable.java         |   4 +-
 ...sumeStrategy.java => CouchDbResumeAdapter.java} |  21 +-
 .../consumer/CouchDbResumeStrategyFactory.java     |   8 +-
 ...java => LatestUpdateSequenceResumeAdapter.java} |   4 +-
 .../apache/camel/component/file/FileConsumer.java  |  37 ++--
 .../apache/camel/component/file/GenericFile.java   |   2 +-
 ...rResumeStrategy.java => FileResumeAdapter.java} |   6 +-
 .../component/file/consumer/FileResumeSet.java     |   2 +-
 ...sumeStrategy.java => FileSetResumeAdapter.java} |   7 +-
 .../file/consumer/GenericFileResumable.java        |   2 +-
 ...Strategy.java => GenericFileResumeAdapter.java} |  15 +-
 .../adapters/DefaultFileSetResumeAdapter.java      |  65 ++++++
 .../adapters/DefaultGenericFileResumeAdapter.java  |  64 ++++++
 .../camel/component/kafka/KafkaConsumer.java       |  14 +-
 .../camel/component/kafka/KafkaFetchRecords.java   |   4 +-
 .../errorhandler/KafkaConsumerListener.java        |   2 +-
 ...rategy.java => KafkaConsumerResumeAdapter.java} |  33 +--
 .../kafka/consumer/support/KafkaOffset.java        |   2 +-
 .../kafka/consumer/support/KafkaResumable.java     |   4 +-
 ....java => OffsetKafkaConsumerResumeAdapter.java} |  21 +-
 .../support/PartitionAssignmentListener.java       |  11 +-
 .../consumer/support/ResumeStrategyFactory.java    |  30 ++-
 ...a => SeekPolicyKafkaConsumerResumeAdapter.java} |  20 +-
 .../resume/kafka/KafkaResumeStrategy.java          |  22 +-
 .../resume/kafka/MultiNodeKafkaResumeStrategy.java | 152 ++++++++++++++
 .../camel/processor/resume/kafka/RecordError.java} |  31 ++-
 ...egy.java => SingleNodeKafkaResumeStrategy.java} | 227 ++++++++++++++-------
 .../KafkaConsumerWithResumeRouteStrategyIT.java    |  99 ++++-----
 .../camel/component/master/MasterConsumer.java     |   4 +-
 .../src/main/java/org/apache/camel/Route.java      |   2 +
 .../camel/{ => resume}/ConsumerListener.java       |   2 +-
 .../camel/{ => resume}/ConsumerListenerAware.java  |   2 +-
 .../java/org/apache/camel/{ => resume}/Offset.java |   2 +-
 .../org/apache/camel/{ => resume}/Resumable.java   |   4 +-
 .../apache/camel/{ => resume}/ResumableSet.java    |   2 +-
 .../ResumeAdapter.java}                            |  16 +-
 .../org/apache/camel/{ => resume}/ResumeAware.java |   2 +-
 .../apache/camel/{ => resume}/ResumeStrategy.java  |  23 ++-
 .../UpdatableConsumerResumeStrategy.java           |   2 +-
 .../apache/camel/resume/cache/MultiEntryCache.java |  11 +-
 .../camel/{ => resume/cache}/ResumeCache.java      |  20 +-
 .../cache/SingleEntryCache.java}                   |  21 +-
 .../org/apache/camel/impl/engine/DefaultRoute.java |   8 +-
 .../docs/modules/eips/pages/resume-strategies.adoc | 139 ++++---------
 .../resources/org/apache/camel/model/pausable.json |   2 +-
 .../org/apache/camel/model/resumable.json          |   2 +-
 .../org/apache/camel/model/PausableDefinition.java |   4 +-
 .../apache/camel/model/ProcessorDefinition.java    |   4 +-
 .../apache/camel/model/ResumableDefinition.java    |   4 +-
 .../apache/camel/processor/PausableProcessor.java  |   2 +-
 .../processor/resume/DelegatingResumeAdapter.java  | 181 ++++++++++++++++
 .../processor/resume/DelegatingResumeStrategy.java | 102 ---------
 .../processor/resume/ResumableCompletion.java      |   6 +-
 .../camel/processor/resume/ResumableProcessor.java |   9 +-
 .../processor/resume/TransientResumeStrategy.java  |  33 +--
 .../org/apache/camel/reifier/PausableReifier.java  |   2 +-
 .../org/apache/camel/reifier/ResumableReifier.java |   2 +-
 .../FileConsumerResumeFromOffsetStrategyTest.java  |  27 ++-
 .../file/FileConsumerResumeStrategyTest.java       |  17 +-
 .../main/java/org/apache/camel/resume/Offsets.java |   2 -
 .../java/org/apache/camel/resume/Resumables.java   |   3 -
 80 files changed, 1305 insertions(+), 681 deletions(-)
 rename components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/{KinesisResumeStrategy.java => KinesisResumeAdapter.java} (75%)
 rename components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/{KinesisUserConfigurationResumeStrategy.java => KinesisUserConfigurationResumeAdapter.java} (91%)
 create mode 100644 components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java
 create mode 100644 components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java
 rename components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/{CassandraResumeStrategy.java => CassandraResumeAdapter.java} (81%)
 copy components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/{CouchbaseResumeStrategy.java => CouchbaseResumeAdapter.java} (79%)
 rename components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/{CouchDbResumeStrategy.java => CouchDbResumeAdapter.java} (76%)
 rename components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/{LatestUpdateSequenceResumeStrategy.java => LatestUpdateSequenceResumeAdapter.java} (88%)
 copy components/camel-file/src/main/java/org/apache/camel/component/file/consumer/{FileConsumerResumeStrategy.java => FileResumeAdapter.java} (85%)
 rename components/camel-file/src/main/java/org/apache/camel/component/file/consumer/{FileSetResumeStrategy.java => FileSetResumeAdapter.java} (79%)
 rename components/camel-file/src/main/java/org/apache/camel/component/file/consumer/{FileConsumerResumeStrategy.java => GenericFileResumeAdapter.java} (66%)
 create mode 100644 components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java
 create mode 100644 components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java
 rename components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/{KafkaConsumerResumeStrategy.java => KafkaConsumerResumeAdapter.java} (65%)
 rename components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/{OffsetKafkaConsumerResumeStrategy.java => OffsetKafkaConsumerResumeAdapter.java} (82%)
 rename components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/{SeekPolicyKafkaConsumerResumeStrategy.java => SeekPolicyKafkaConsumerResumeAdapter.java} (80%)
 copy core/camel-api/src/main/java/org/apache/camel/Offset.java => components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java (63%)
 create mode 100644 components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
 rename components/{camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java => camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/RecordError.java} (58%)
 rename components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/{AbstractKafkaResumeStrategy.java => SingleNodeKafkaResumeStrategy.java} (55%)
 rename core/camel-api/src/main/java/org/apache/camel/{ => resume}/ConsumerListener.java (98%)
 rename core/camel-api/src/main/java/org/apache/camel/{ => resume}/ConsumerListenerAware.java (97%)
 rename core/camel-api/src/main/java/org/apache/camel/{ => resume}/Offset.java (96%)
 rename core/camel-api/src/main/java/org/apache/camel/{ => resume}/Resumable.java (94%)
 rename core/camel-api/src/main/java/org/apache/camel/{ => resume}/ResumableSet.java (98%)
 copy core/camel-api/src/main/java/org/apache/camel/{ResumeStrategy.java => resume/ResumeAdapter.java} (62%)
 rename core/camel-api/src/main/java/org/apache/camel/{ => resume}/ResumeAware.java (97%)
 copy core/camel-api/src/main/java/org/apache/camel/{ => resume}/ResumeStrategy.java (65%)
 rename core/camel-api/src/main/java/org/apache/camel/{ => resume}/UpdatableConsumerResumeStrategy.java (97%)
 rename components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeStrategy.java => core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java (71%)
 rename core/camel-api/src/main/java/org/apache/camel/{ => resume/cache}/ResumeCache.java (76%)
 rename core/camel-api/src/main/java/org/apache/camel/{ResumeStrategy.java => resume/cache/SingleEntryCache.java} (67%)
 create mode 100644 core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java
 delete mode 100644 core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeStrategy.java
 copy components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java => core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java (54%)


[camel] 01/02: CAMEL-17762: fixed resumable javadoc

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 70bf8befe16521ad40b989beb3d1d7917bb83a40
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Apr 29 12:50:24 2022 +0200

    CAMEL-17762: fixed resumable javadoc
---
 core/camel-api/src/main/java/org/apache/camel/Resumable.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/Resumable.java b/core/camel-api/src/main/java/org/apache/camel/Resumable.java
index 98c5c285a41..804c05bf9a3 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Resumable.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Resumable.java
@@ -29,7 +29,7 @@ package org.apache.camel;
 public interface Resumable<Y, T> {
 
     /**
-     * Updates the last offset the last offset as appropriate for the user of the interface
+     * Updates the last offset as appropriate for the user of the interface
      *
      * @param offset the offset value
      */


[camel] 02/02: CAMEL-17762: rework the resume strategy to separate concerns

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a68c2607e5fd2cb35b4d11828ba830240212a9d8
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Apr 29 16:14:39 2022 +0200

    CAMEL-17762: rework the resume strategy to separate concerns
    
    Rework the strategy so that the handling of the strategy service is
    separate from the logic that handles the component-specific resume.
    
    This brings builtin caches, improved reusable Kafka resume trategies and
    additional cleanups to the resume API interfaces, classes and
    documentation.
---
 .../org/apache/camel/catalog/models/pausable.json  |   2 +-
 .../org/apache/camel/catalog/models/resumable.json |   2 +-
 .../apache/camel/component/feed/EntryFilter.java   |  14 +-
 .../component/feed/FeedEntryPollingConsumer.java   |  24 ++-
 .../AtomEntryPollingConsumerWithResumeTest.java    |   3 +-
 .../component/aws2/kinesis/Kinesis2Consumer.java   |  26 ++-
 ...sumeStrategy.java => KinesisResumeAdapter.java} |  23 +--
 ... => KinesisUserConfigurationResumeAdapter.java} |   4 +-
 .../caffeine/resume/multi/CaffeineCache.java       |  98 +++++++++
 .../caffeine/resume/single/CaffeineCache.java      |  93 +++++++++
 .../component/cassandra/CassandraConsumer.java     |  20 +-
 ...meStrategy.java => CassandraResumeAdapter.java} |  16 +-
 .../CassandraComponentResumeStrategyIT.java        |   9 +-
 .../component/couchbase/CouchbaseConsumer.java     |  18 +-
 ...meStrategy.java => CouchbaseResumeAdapter.java} |  16 +-
 .../integration/ConsumeResumeStrategyIT.java       |  16 +-
 components/camel-couchdb/pom.xml                   |   4 +
 .../component/couchdb/CouchDbChangesetTracker.java |  15 +-
 .../camel/component/couchdb/CouchDbConsumer.java   |  12 +-
 .../couchdb/consumer/CouchDbResumable.java         |   4 +-
 ...sumeStrategy.java => CouchDbResumeAdapter.java} |  21 +-
 .../consumer/CouchDbResumeStrategyFactory.java     |   8 +-
 ...java => LatestUpdateSequenceResumeAdapter.java} |   4 +-
 .../apache/camel/component/file/FileConsumer.java  |  37 ++--
 .../apache/camel/component/file/GenericFile.java   |   2 +-
 ...rResumeStrategy.java => FileResumeAdapter.java} |   6 +-
 .../component/file/consumer/FileResumeSet.java     |   2 +-
 ...sumeStrategy.java => FileSetResumeAdapter.java} |   7 +-
 .../file/consumer/GenericFileResumable.java        |   2 +-
 ...Strategy.java => GenericFileResumeAdapter.java} |  15 +-
 .../adapters/DefaultFileSetResumeAdapter.java      |  65 ++++++
 .../adapters/DefaultGenericFileResumeAdapter.java  |  64 ++++++
 .../camel/component/kafka/KafkaConsumer.java       |  14 +-
 .../camel/component/kafka/KafkaFetchRecords.java   |   4 +-
 .../errorhandler/KafkaConsumerListener.java        |   2 +-
 ...rategy.java => KafkaConsumerResumeAdapter.java} |  33 +--
 .../kafka/consumer/support/KafkaOffset.java        |   2 +-
 .../kafka/consumer/support/KafkaResumable.java     |   4 +-
 ....java => OffsetKafkaConsumerResumeAdapter.java} |  21 +-
 .../support/PartitionAssignmentListener.java       |  11 +-
 .../consumer/support/ResumeStrategyFactory.java    |  30 ++-
 ...a => SeekPolicyKafkaConsumerResumeAdapter.java} |  20 +-
 .../resume/kafka/KafkaResumeStrategy.java          |  22 +-
 .../resume/kafka/MultiNodeKafkaResumeStrategy.java | 152 ++++++++++++++
 .../camel/processor/resume/kafka/RecordError.java} |  31 ++-
 ...egy.java => SingleNodeKafkaResumeStrategy.java} | 227 ++++++++++++++-------
 .../KafkaConsumerWithResumeRouteStrategyIT.java    |  99 ++++-----
 .../camel/component/master/MasterConsumer.java     |   4 +-
 .../src/main/java/org/apache/camel/Route.java      |   2 +
 .../camel/{ => resume}/ConsumerListener.java       |   2 +-
 .../camel/{ => resume}/ConsumerListenerAware.java  |   2 +-
 .../java/org/apache/camel/{ => resume}/Offset.java |   2 +-
 .../org/apache/camel/{ => resume}/Resumable.java   |   2 +-
 .../apache/camel/{ => resume}/ResumableSet.java    |   2 +-
 .../ResumeAdapter.java}                            |  16 +-
 .../org/apache/camel/{ => resume}/ResumeAware.java |   2 +-
 .../apache/camel/{ => resume}/ResumeStrategy.java  |  23 ++-
 .../UpdatableConsumerResumeStrategy.java           |   2 +-
 .../apache/camel/resume/cache/MultiEntryCache.java |  11 +-
 .../camel/{ => resume/cache}/ResumeCache.java      |  20 +-
 .../cache/SingleEntryCache.java}                   |  21 +-
 .../org/apache/camel/impl/engine/DefaultRoute.java |   8 +-
 .../docs/modules/eips/pages/resume-strategies.adoc | 139 ++++---------
 .../resources/org/apache/camel/model/pausable.json |   2 +-
 .../org/apache/camel/model/resumable.json          |   2 +-
 .../org/apache/camel/model/PausableDefinition.java |   4 +-
 .../apache/camel/model/ProcessorDefinition.java    |   4 +-
 .../apache/camel/model/ResumableDefinition.java    |   4 +-
 .../apache/camel/processor/PausableProcessor.java  |   2 +-
 .../processor/resume/DelegatingResumeAdapter.java  | 181 ++++++++++++++++
 .../processor/resume/DelegatingResumeStrategy.java | 102 ---------
 .../processor/resume/ResumableCompletion.java      |   6 +-
 .../camel/processor/resume/ResumableProcessor.java |   9 +-
 .../processor/resume/TransientResumeStrategy.java  |  33 +--
 .../org/apache/camel/reifier/PausableReifier.java  |   2 +-
 .../org/apache/camel/reifier/ResumableReifier.java |   2 +-
 .../FileConsumerResumeFromOffsetStrategyTest.java  |  27 ++-
 .../file/FileConsumerResumeStrategyTest.java       |  17 +-
 .../main/java/org/apache/camel/resume/Offsets.java |   2 -
 .../java/org/apache/camel/resume/Resumables.java   |   3 -
 80 files changed, 1304 insertions(+), 680 deletions(-)

diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
index 88b088805ab..f1526b5311b 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/pausable.json
@@ -12,7 +12,7 @@
     "output": false
   },
   "properties": {
-    "consumerListener": { "kind": "attribute", "displayName": "Consumer Listener", "required": true, "type": "object", "javaType": "org.apache.camel.ConsumerListener", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the consumer listener to use" },
+    "consumerListener": { "kind": "attribute", "displayName": "Consumer Listener", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ConsumerListener", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the consumer listener to use" },
     "untilCheck": { "kind": "attribute", "displayName": "Until Check", "required": true, "type": "object", "javaType": "java.util.function.Predicate", "deprecated": false, "autowired": false, "secret": false },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json
index acbea228865..03a0a440e46 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/resumable.json
@@ -12,7 +12,7 @@
     "output": false
   },
   "properties": {
-    "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" },
+    "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
   }
diff --git a/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java b/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java
index b477e1bb372..b0a4dd6e5a3 100644
--- a/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java
+++ b/components/camel-atom/src/main/java/org/apache/camel/component/feed/EntryFilter.java
@@ -16,14 +16,14 @@
  */
 package org.apache.camel.component.feed;
 
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 
 /**
  * Filter used by the {@link org.apache.camel.component.feed.FeedEntryPollingConsumer} to filter entries from the feed.
  *
  * @param <E> entry type
  */
-public interface EntryFilter<E> extends ResumeStrategy {
+public interface EntryFilter<E> extends ResumeAdapter {
 
     /**
      * Tests to be used as filtering the feed for only entries of interest, such as only new entries, etc.
@@ -37,14 +37,4 @@ public interface EntryFilter<E> extends ResumeStrategy {
     default void resume() {
         // NO-OP by default. Implementations can implement more complex behaviors if needed
     }
-
-    @Override
-    default void start() {
-        // NO-OP
-    }
-
-    @Override
-    default void stop() {
-        // NO-OP
-    }
 }
diff --git a/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java b/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
index d8bcfcf255e..035e7ea4f15 100644
--- a/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
+++ b/components/camel-atom/src/main/java/org/apache/camel/component/feed/FeedEntryPollingConsumer.java
@@ -20,14 +20,16 @@ import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 
 /**
  * Consumer to poll feeds and return each entry from the feed step by step.
  */
-public abstract class FeedEntryPollingConsumer<E> extends FeedPollingConsumer implements ResumeAware<EntryFilter<E>> {
+public abstract class FeedEntryPollingConsumer<E> extends FeedPollingConsumer implements ResumeAware<ResumeStrategy> {
     protected int entryIndex;
-    protected EntryFilter<E> entryFilter;
+    protected ResumeStrategy resumeStrategy;
     @SuppressWarnings("rawtypes")
     protected List<E> list;
     protected boolean throttleEntries;
@@ -52,8 +54,12 @@ public abstract class FeedEntryPollingConsumer<E> extends FeedPollingConsumer im
             polledMessages++;
 
             boolean valid = true;
-            if (entryFilter != null) {
-                valid = entryFilter.isValidEntry(entry);
+            if (resumeStrategy != null) {
+                ResumeAdapter adapter = resumeStrategy.getAdapter();
+
+                if (adapter instanceof EntryFilter) {
+                    valid = ((EntryFilter) adapter).isValidEntry(entry);
+                }
             }
             if (valid) {
                 Exchange exchange = endpoint.createExchange(feed, entry);
@@ -73,13 +79,13 @@ public abstract class FeedEntryPollingConsumer<E> extends FeedPollingConsumer im
     }
 
     @Override
-    public void setResumeStrategy(EntryFilter<E> resumeStrategy) {
-        this.entryFilter = resumeStrategy;
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
+        this.resumeStrategy = resumeStrategy;
     }
 
     @Override
-    public EntryFilter<E> getResumeStrategy() {
-        return entryFilter;
+    public ResumeStrategy getResumeStrategy() {
+        return resumeStrategy;
     }
 
     protected abstract void resetList();
diff --git a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerWithResumeTest.java b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerWithResumeTest.java
index b39e4d957c7..4c810f8a65e 100644
--- a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerWithResumeTest.java
+++ b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerWithResumeTest.java
@@ -22,6 +22,7 @@ import java.util.Date;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -57,7 +58,7 @@ public class AtomEntryPollingConsumerWithResumeTest extends CamelTestSupport {
         return new RouteBuilder() {
             public void configure() {
                 from("atom:file:src/test/data/feed.atom?splitEntries=true&delay=500")
-                        .resumable().resumeStrategy(new UpdatedDateFilter(getTestFilterDate()))
+                        .resumable().resumeStrategy(new TransientResumeStrategy(new UpdatedDateFilter(getTestFilterDate())))
                         .routeId("WithResume")
                         .to("mock:result");
             }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 1925b6ae57e..de0317df604 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -23,9 +23,10 @@ import java.util.Queue;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisUserConfigurationResumeStrategy;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisUserConfigurationResumeAdapter;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -41,13 +42,13 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 
-public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements ResumeAware<KinesisResumeStrategy> {
+public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements ResumeAware<ResumeStrategy> {
 
     private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class);
 
     private String currentShardIterator;
     private boolean isShardClosed;
-    private KinesisResumeStrategy resumeStrategy;
+    private ResumeStrategy resumeStrategy;
 
     public Kinesis2Consumer(Kinesis2Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -173,12 +174,15 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     }
 
     private void resume(GetShardIteratorRequest.Builder req) {
-        if (resumeStrategy == null) {
-            resumeStrategy = new KinesisUserConfigurationResumeStrategy(getEndpoint().getConfiguration());
+        KinesisResumeAdapter adapter;
+        if (resumeStrategy != null) {
+            adapter = resumeStrategy.getAdapter(KinesisResumeAdapter.class);
+        } else {
+            adapter = new KinesisUserConfigurationResumeAdapter(getEndpoint().getConfiguration());
         }
 
-        resumeStrategy.setRequestBuilder(req);
-        resumeStrategy.resume();
+        adapter.setRequestBuilder(req);
+        adapter.resume();
     }
 
     private Queue<Exchange> createExchanges(List<Record> records) {
@@ -203,12 +207,12 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     }
 
     @Override
-    public void setResumeStrategy(KinesisResumeStrategy resumeStrategy) {
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
 
     @Override
-    public KinesisResumeStrategy getResumeStrategy() {
+    public ResumeStrategy getResumeStrategy() {
         return resumeStrategy;
     }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
similarity index 75%
rename from components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
rename to components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
index 0592bfbaa09..b4f26b437eb 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeAdapter.java
@@ -17,20 +17,17 @@
 
 package org.apache.camel.component.aws2.kinesis.consumer;
 
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 
-public interface KinesisResumeStrategy extends ResumeStrategy {
-
+/**
+ * The resume adapter for Kinesis
+ */
+public interface KinesisResumeAdapter extends ResumeAdapter {
+    /**
+     * Sets the shard iterator request builder that can be used to customize the call and set the exact resume point
+     * 
+     * @param builder the builder instance
+     */
     void setRequestBuilder(GetShardIteratorRequest.Builder builder);
-
-    @Override
-    default void start() {
-
-    }
-
-    @Override
-    default void stop() {
-
-    }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeAdapter.java
similarity index 91%
rename from components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
rename to components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeAdapter.java
index 56334437770..5e47dfb817b 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeAdapter.java
@@ -21,11 +21,11 @@ import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
-public class KinesisUserConfigurationResumeStrategy implements KinesisResumeStrategy {
+public class KinesisUserConfigurationResumeAdapter implements KinesisResumeAdapter {
     private final Kinesis2Configuration configuration;
     private GetShardIteratorRequest.Builder resumable;
 
-    public KinesisUserConfigurationResumeStrategy(Kinesis2Configuration configuration) {
+    public KinesisUserConfigurationResumeAdapter(Kinesis2Configuration configuration) {
         this.configuration = configuration;
     }
 
diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java
new file mode 100644
index 00000000000..e775be5d063
--- /dev/null
+++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.caffeine.resume.multi;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.camel.resume.cache.MultiEntryCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache that can store multiple key/valued resumables where the value is a list containing multiple values
+ * 
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class CaffeineCache<K, V> implements MultiEntryCache<K, V> {
+    private static final Logger LOG = LoggerFactory.getLogger(CaffeineCache.class);
+
+    private final Cache<K, List<V>> cache;
+    private final long cacheSize;
+
+    /**
+     * Builds a new instance of this object with the given cache size
+     * 
+     * @param cacheSize the size of the cache
+     */
+    public CaffeineCache(long cacheSize) {
+        this(Caffeine.newBuilder().maximumSize(cacheSize).build(), cacheSize);
+    }
+
+    /**
+     * Builds a new instance of this object
+     *
+     * @param cache     an instance of a pre-constructed cache object
+     * @param cacheSize the size of the pre-constructed cache object
+     */
+    public CaffeineCache(Cache<K, List<V>> cache, long cacheSize) {
+        this.cache = cache;
+        this.cacheSize = cacheSize;
+    }
+
+    @Override
+    public long capacity() {
+        return cacheSize;
+    }
+
+    @Override
+    public synchronized void add(K key, V offsetValue) {
+        LOG.trace("Adding entry to the cache (k/v): {}/{}", key, offsetValue);
+        LOG.trace("Adding entry to the cache (k/v) with types: {}/{}", key.getClass(), offsetValue.getClass());
+        List<V> entries = cache.get(key, k -> new ArrayList<>());
+
+        entries.add(offsetValue);
+    }
+
+    @Override
+    public synchronized boolean contains(K key, V entry) {
+        final List<V> entries = cache.getIfPresent(key);
+
+        if (entries == null) {
+            return false;
+        }
+
+        boolean ret = entries.contains(entry);
+        LOG.trace("Checking if cache contains key {} with value {} ({})", key, entry, ret);
+
+        return ret;
+    }
+
+    @Override
+    public boolean isFull() {
+        if (cache.estimatedSize() >= cacheSize) {
+            return true;
+        }
+
+        return false;
+    }
+
+}
diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java
new file mode 100644
index 00000000000..d551418cf38
--- /dev/null
+++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.caffeine.resume.single;
+
+import java.util.Optional;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.camel.resume.cache.SingleEntryCache;
+
+/**
+ * This is a simple cache implementation that uses Caffeine to store the resume offsets
+ *
+ * @param <K> The type of the key to cache
+ * @param <V> The type of the value/entry to cache
+ */
+public class CaffeineCache<K, V> implements SingleEntryCache<K, V> {
+    private final Cache<K, V> cache;
+    private final long cacheSize;
+
+    /**
+     * Builds a new cache with the given cache size
+     * 
+     * @param cacheSize the cache size
+     */
+    public CaffeineCache(long cacheSize) {
+        this(Caffeine.newBuilder().maximumSize(cacheSize).build(), cacheSize);
+    }
+
+    /**
+     * Builds a new instance of this object
+     * 
+     * @param cache     an instance of a pre-constructed cache object
+     * @param cacheSize the size of the pre-constructed cache object
+     */
+    public CaffeineCache(Cache<K, V> cache, long cacheSize) {
+        this.cache = cache;
+        this.cacheSize = cacheSize;
+    }
+
+    @Override
+    public boolean contains(K key, V entry) {
+        assert key != null;
+        V cachedEntry = cache.getIfPresent(key);
+
+        return entry.equals(cachedEntry);
+    }
+
+    @Override
+    public void add(K key, V offsetValue) {
+        cache.put(key, offsetValue);
+    }
+
+    @Override
+    public Optional<V> get(K key) {
+        V entry = cache.getIfPresent(key);
+
+        if (entry == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(entry);
+    }
+
+    @Override
+    public boolean isFull() {
+        if (cache.estimatedSize() >= cacheSize) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public long capacity() {
+        return cacheSize;
+    }
+}
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
index 659bc4cec9c..f3bb3bf971e 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
@@ -22,21 +22,22 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.component.cassandra.consumer.support.CassandraResumeStrategy;
+import org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.ScheduledPollConsumer;
 
 /**
  * Cassandra 2 CQL3 consumer.
  */
-public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAware<CassandraResumeStrategy> {
+public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAware<ResumeStrategy> {
 
     /**
      * Prepared statement used for polling
      */
     private PreparedStatement preparedStatement;
 
-    private CassandraResumeStrategy resumeStrategy;
+    private ResumeStrategy resumeStrategy;
 
     public CassandraConsumer(CassandraEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -86,8 +87,11 @@ public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAw
         if (resumeStrategy != null) {
             CqlSession session = getEndpoint().getSessionHolder().getSession();
 
-            resumeStrategy.setSession(session);
-            resumeStrategy.resume();
+            CassandraResumeAdapter resumeAdapter = resumeStrategy.getAdapter(CassandraResumeAdapter.class);
+            if (resumeAdapter != null) {
+                resumeAdapter.setSession(session);
+                resumeAdapter.resume();
+            }
         }
     }
 
@@ -102,12 +106,12 @@ public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAw
     }
 
     @Override
-    public CassandraResumeStrategy getResumeStrategy() {
+    public ResumeStrategy getResumeStrategy() {
         return resumeStrategy;
     }
 
     @Override
-    public void setResumeStrategy(CassandraResumeStrategy resumeStrategy) {
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
 }
diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeStrategy.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
similarity index 81%
rename from components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeStrategy.java
rename to components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
index 220242c7b2c..e8ccbed8efe 100644
--- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeStrategy.java
+++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
@@ -18,12 +18,12 @@
 package org.apache.camel.component.cassandra.consumer.support;
 
 import com.datastax.oss.driver.api.core.CqlSession;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 
 /**
- * Provides a resume strategy for Cassandra consumers
+ * Provides a resume adapter for Cassandra consumers
  */
-public interface CassandraResumeStrategy extends ResumeStrategy {
+public interface CassandraResumeAdapter extends ResumeAdapter {
 
     /**
      * Sets the session that allow implementations to run a one-time query on the DB
@@ -31,14 +31,4 @@ public interface CassandraResumeStrategy extends ResumeStrategy {
      * @param session
      */
     void setSession(CqlSession session);
-
-    @Override
-    default void start() {
-
-    }
-
-    @Override
-    default void stop() {
-
-    }
 }
diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
index dcff5c23842..b41a2a2ef33 100644
--- a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
+++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
@@ -24,15 +24,16 @@ import com.datastax.oss.driver.api.core.CqlSession;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.cassandra.consumer.support.CassandraResumeStrategy;
+import org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CassandraComponentResumeStrategyIT extends BaseCassandra {
 
-    private static class TestCassandraResumeStrategy implements CassandraResumeStrategy {
+    private static class TestCassandraResumeAdapter implements CassandraResumeAdapter {
         private boolean sessionCalled;
         private boolean sessionNotNull;
         private boolean resumeCalled;
@@ -62,7 +63,7 @@ public class CassandraComponentResumeStrategyIT extends BaseCassandra {
     }
 
     private static final String CQL = "select login, first_name, last_name from camel_user";
-    private final TestCassandraResumeStrategy resumeStrategy = new TestCassandraResumeStrategy();
+    private final TestCassandraResumeAdapter resumeStrategy = new TestCassandraResumeAdapter();
 
     @Test
     public void testConsumeAll() throws Exception {
@@ -88,7 +89,7 @@ public class CassandraComponentResumeStrategyIT extends BaseCassandra {
         return new RouteBuilder() {
             public void configure() {
                 fromF("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL)
-                        .resumable(resumeStrategy)
+                        .resumable(new TransientResumeStrategy(resumeStrategy))
                         .to("mock:resultAll");
             }
         };
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
index 1ac3bbefb05..a1dc66deefd 100644
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
+++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
@@ -26,7 +26,8 @@ import com.couchbase.client.java.view.ViewResult;
 import com.couchbase.client.java.view.ViewRow;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.DefaultScheduledPollConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +37,7 @@ import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_ID;
 import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_KEY;
 import static org.apache.camel.component.couchbase.CouchbaseConstants.HEADER_VIEWNAME;
 
-public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements ResumeAware<CouchbaseResumeStrategy> {
+public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements ResumeAware<ResumeStrategy> {
 
     private static final Logger LOG = LoggerFactory.getLogger(CouchbaseConsumer.class);
 
@@ -45,7 +46,7 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R
     private final Collection collection;
     private ViewOptions viewOptions;
 
-    private CouchbaseResumeStrategy resumeStrategy;
+    private ResumeStrategy resumeStrategy;
 
     public CouchbaseConsumer(CouchbaseEndpoint endpoint, Bucket client, Processor processor) {
         super(endpoint, processor);
@@ -96,9 +97,12 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R
 
         if (resumeStrategy != null) {
             LOG.info("Couchbase consumer running with resume strategy enabled");
-            resumeStrategy.setBucket(bucket);
 
-            resumeStrategy.resume();
+            CouchbaseResumeAdapter resumeAdapter = resumeStrategy.getAdapter(CouchbaseResumeAdapter.class);
+            if (resumeAdapter != null) {
+                resumeAdapter.setBucket(bucket);
+                resumeAdapter.resume();
+            }
         }
     }
 
@@ -182,12 +186,12 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R
     }
 
     @Override
-    public CouchbaseResumeStrategy getResumeStrategy() {
+    public ResumeStrategy getResumeStrategy() {
         return resumeStrategy;
     }
 
     @Override
-    public void setResumeStrategy(CouchbaseResumeStrategy resumeStrategy) {
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
 }
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeAdapter.java
similarity index 79%
copy from components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
copy to components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeAdapter.java
index 6e90080b8b8..b7e87ec6a47 100644
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
+++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeAdapter.java
@@ -18,22 +18,12 @@
 package org.apache.camel.component.couchbase;
 
 import com.couchbase.client.java.Bucket;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 
 /**
- * Allow implementing resume strategies for couchbase consumers
+ * Allow implementing resume adapters for couchbase consumers
  */
-public interface CouchbaseResumeStrategy extends ResumeStrategy {
-
-    @Override
-    default void start() {
-
-    }
-
-    @Override
-    default void stop() {
-
-    }
+public interface CouchbaseResumeAdapter extends ResumeAdapter {
 
     /**
      * Sets the bucket in use
diff --git a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
index ac7616a879e..20b03444564 100644
--- a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
+++ b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
@@ -22,8 +22,9 @@ import java.util.concurrent.TimeUnit;
 import com.couchbase.client.java.Bucket;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.couchbase.CouchbaseResumeStrategy;
+import org.apache.camel.component.couchbase.CouchbaseResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.resume.Resumables;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -32,7 +33,7 @@ import org.junit.jupiter.api.Test;
 import static org.awaitility.Awaitility.await;
 
 public class ConsumeResumeStrategyIT extends CouchbaseIntegrationTestBase {
-    static class TestResumeStrategy implements CouchbaseResumeStrategy {
+    static class TestCouchbaseResumeAdapter implements CouchbaseResumeAdapter {
         volatile boolean setBucketCalled;
         volatile boolean bucketNotNull;
         volatile boolean resumeCalled;
@@ -49,7 +50,7 @@ public class ConsumeResumeStrategyIT extends CouchbaseIntegrationTestBase {
         }
     }
 
-    TestResumeStrategy resumeStrategy = new TestResumeStrategy();
+    TransientResumeStrategy resumeStrategy = new TransientResumeStrategy(new TestCouchbaseResumeAdapter());
 
     @Test
     public void testQueryForBeers() throws Exception {
@@ -61,15 +62,18 @@ public class ConsumeResumeStrategyIT extends CouchbaseIntegrationTestBase {
 
         assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
 
+        TestCouchbaseResumeAdapter adapter = resumeStrategy.getAdapter(TestCouchbaseResumeAdapter.class);
+        await().atMost(30, TimeUnit.SECONDS).until(() -> adapter != null);
+
         await().atMost(30, TimeUnit.SECONDS)
-                .untilAsserted(() -> Assertions.assertTrue(resumeStrategy.setBucketCalled,
+                .untilAsserted(() -> Assertions.assertTrue(adapter.setBucketCalled,
                         "The setBucket method should have been called"));
         await().atMost(3, TimeUnit.SECONDS)
-                .untilAsserted(() -> Assertions.assertTrue(resumeStrategy.bucketNotNull,
+                .untilAsserted(() -> Assertions.assertTrue(adapter.bucketNotNull,
                         "The input bucket should not have been null"));
         await().atMost(3, TimeUnit.SECONDS)
                 .untilAsserted(
-                        () -> Assertions.assertTrue(resumeStrategy.resumeCalled, "The resume method should have been called"));
+                        () -> Assertions.assertTrue(adapter.resumeCalled, "The resume method should have been called"));
     }
 
     @AfterEach
diff --git a/components/camel-couchdb/pom.xml b/components/camel-couchdb/pom.xml
index b83914157d7..0372ec29c29 100644
--- a/components/camel-couchdb/pom.xml
+++ b/components/camel-couchdb/pom.xml
@@ -39,6 +39,10 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-support</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core-processor</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.lightcouch</groupId>
             <artifactId>lightcouch</artifactId>
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
index 58a9a70d6ee..3b239cea610 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
@@ -21,8 +21,9 @@ import java.time.Duration;
 import com.google.gson.JsonObject;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.couchdb.consumer.CouchDbResumable;
-import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy;
+import org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter;
 import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategyFactory;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.task.BlockingTask;
 import org.apache.camel.support.task.Tasks;
 import org.apache.camel.support.task.budget.Budgets;
@@ -53,10 +54,16 @@ public class CouchDbChangesetTracker implements Runnable {
         CouchDbResumable resumable = new CouchDbResumable(couchClient, sequence);
 
         if (sequence == null) {
-            CouchDbResumeStrategy resumeStrategy = CouchDbResumeStrategyFactory.newResumeStrategy(this.consumer);
+            ResumeStrategy resumeStrategy = CouchDbResumeStrategyFactory.newResumeStrategy(this.consumer);
 
-            resumeStrategy.setResumable(resumable);
-            resumeStrategy.resume();
+            assert resumeStrategy != null;
+
+            CouchDbResumeAdapter adapter = resumeStrategy.getAdapter(CouchDbResumeAdapter.class);
+
+            if (adapter != null) {
+                adapter.setResumable(resumable);
+                adapter.resume();
+            }
         }
 
         LOG.debug("Last sequence [{}]", resumable.getLastOffset());
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
index 615823c23ad..4512004fff1 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
@@ -21,17 +21,17 @@ import java.util.concurrent.ExecutorService;
 import com.google.gson.JsonObject;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.DefaultConsumer;
 
-public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<CouchDbResumeStrategy> {
+public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<ResumeStrategy> {
 
     private final CouchDbClientWrapper couchClient;
     private final CouchDbEndpoint endpoint;
     private ExecutorService executor;
     private CouchDbChangesetTracker task;
-    private CouchDbResumeStrategy resumeStrategy;
+    private ResumeStrategy resumeStrategy;
 
     public CouchDbConsumer(CouchDbEndpoint endpoint, CouchDbClientWrapper couchClient, Processor processor) {
         super(endpoint, processor);
@@ -40,12 +40,12 @@ public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<Couc
     }
 
     @Override
-    public void setResumeStrategy(CouchDbResumeStrategy resumeStrategy) {
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
 
     @Override
-    public CouchDbResumeStrategy getResumeStrategy() {
+    public ResumeStrategy getResumeStrategy() {
         return resumeStrategy;
     }
 
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
index e6ebd10c7c8..d7a7e91311e 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
@@ -17,10 +17,10 @@
 
 package org.apache.camel.component.couchdb.consumer;
 
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
 import org.apache.camel.component.couchdb.CouchDbClientWrapper;
+import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.Offsets;
+import org.apache.camel.resume.Resumable;
 
 /**
  * Wraps the resume data for CouchDb
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
similarity index 76%
rename from components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java
rename to components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
index 3a92a042e6d..f66a2ccd5a8 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeAdapter.java
@@ -17,21 +17,16 @@
 
 package org.apache.camel.component.couchdb.consumer;
 
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 
 /**
- * Defines a resumable strategy usable by the CouchDB component
+ * Defines a resumable adapter usable by the CouchDB component
  */
-public interface CouchDbResumeStrategy extends ResumeStrategy {
+public interface CouchDbResumeAdapter extends ResumeAdapter {
+    /**
+     * Sets the resumable for the adapter
+     * 
+     * @param resumable the resumable instance
+     */
     void setResumable(CouchDbResumable resumable);
-
-    @Override
-    default void start() {
-
-    }
-
-    @Override
-    default void stop() {
-
-    }
 }
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java
index ce0196f0b15..b8515b1ad6a 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java
@@ -18,16 +18,18 @@
 package org.apache.camel.component.couchdb.consumer;
 
 import org.apache.camel.component.couchdb.CouchDbConsumer;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.resume.ResumeStrategy;
 
 public final class CouchDbResumeStrategyFactory {
     private CouchDbResumeStrategyFactory() {
     }
 
-    public static CouchDbResumeStrategy newResumeStrategy(CouchDbConsumer consumer) {
-        CouchDbResumeStrategy resumeStrategy = consumer.getResumeStrategy();
+    public static ResumeStrategy newResumeStrategy(CouchDbConsumer consumer) {
+        ResumeStrategy resumeStrategy = consumer.getResumeStrategy();
 
         if (resumeStrategy == null) {
-            resumeStrategy = new LatestUpdateSequenceResumeStrategy();
+            resumeStrategy = new TransientResumeStrategy(new LatestUpdateSequenceResumeAdapter());
         }
 
         return resumeStrategy;
diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java
similarity index 88%
rename from components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java
rename to components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java
index 277d84243e7..e9815b9171a 100644
--- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java
+++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeAdapter.java
@@ -20,9 +20,9 @@ package org.apache.camel.component.couchdb.consumer;
 import org.apache.camel.component.couchdb.CouchDbClientWrapper;
 
 /**
- * A resume strategy that resumes from the last update sequence
+ * A resume adapter for couchdb that resumes from the last update sequence
  */
-public final class LatestUpdateSequenceResumeStrategy implements CouchDbResumeStrategy {
+public final class LatestUpdateSequenceResumeAdapter implements CouchDbResumeAdapter {
     private CouchDbResumable resumable;
 
     @Override
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 6dedb43cd18..9d730a528a4 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -31,11 +31,13 @@ import java.util.Set;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.component.file.consumer.FileConsumerResumeStrategy;
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
 import org.apache.camel.component.file.consumer.FileResumeSet;
-import org.apache.camel.component.file.consumer.FileSetResumeStrategy;
-import org.apache.camel.component.file.consumer.GenericFileResumeStrategy;
+import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
+import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -44,10 +46,10 @@ import org.slf4j.LoggerFactory;
 /**
  * File consumer.
  */
-public class FileConsumer extends GenericFileConsumer<File> implements ResumeAware<FileConsumerResumeStrategy> {
+public class FileConsumer extends GenericFileConsumer<File> implements ResumeAware<ResumeStrategy> {
 
     private static final Logger LOG = LoggerFactory.getLogger(FileConsumer.class);
-    private FileConsumerResumeStrategy resumeStrategy;
+    private ResumeStrategy resumeStrategy;
     private String endpointPath;
     private Set<String> extendedAttributes;
 
@@ -103,8 +105,11 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
             GenericFile<File> gf
                     = asGenericFile(endpointPath, file, getEndpoint().getCharset(), getEndpoint().isProbeContentType());
 
-            if (resumeStrategy instanceof GenericFileResumeStrategy) {
-                ((GenericFileResumeStrategy<File>) resumeStrategy).resume(gf);
+            if (resumeStrategy != null) {
+                ResumeAdapter adapter = resumeStrategy.getAdapter();
+                if (adapter instanceof GenericFileResumeAdapter) {
+                    ((FileResumeAdapter) adapter).resume(gf);
+                }
             }
 
             if (file.isDirectory()) {
@@ -171,11 +176,15 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
             }
         }
 
-        if (resumeStrategy instanceof FileSetResumeStrategy) {
-            FileResumeSet resumeSet = new FileResumeSet(dirFiles);
-            resumeStrategy.resume(resumeSet);
+        if (resumeStrategy != null) {
+            ResumeAdapter adapter = resumeStrategy.getAdapter();
+            if (adapter instanceof FileSetResumeAdapter) {
+                FileResumeSet resumeSet = new FileResumeSet(dirFiles);
+
+                ((FileResumeAdapter) adapter).resume(resumeSet);
 
-            return resumeSet.resumed();
+                return resumeSet.resumed();
+            }
         }
 
         return dirFiles;
@@ -307,12 +316,12 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
     }
 
     @Override
-    public FileConsumerResumeStrategy getResumeStrategy() {
+    public ResumeStrategy getResumeStrategy() {
         return resumeStrategy;
     }
 
     @Override
-    public void setResumeStrategy(FileConsumerResumeStrategy resumeStrategy) {
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
 
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
index c04a0e9ecc3..c1fcc9de5f9 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
@@ -22,9 +22,9 @@ import java.nio.file.Path;
 import java.util.Map;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.Offset;
 import org.apache.camel.WrappedFile;
 import org.apache.camel.component.file.consumer.GenericFileResumable;
+import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.Offsets;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
similarity index 85%
copy from components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
copy to components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
index fed8000c512..25b807cbf40 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
@@ -17,12 +17,12 @@
 
 package org.apache.camel.component.file.consumer;
 
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 
 /**
- * Defines resume strategy for consumers of the file component.
+ * Defines resume adapter for consumers of the file component.
  */
-public interface FileConsumerResumeStrategy<T> extends ResumeStrategy {
+public interface FileResumeAdapter<T> extends ResumeAdapter {
 
     /**
      * Returns the last offset read for the given file.
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
index f746c0f9fa3..41b1751223c 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
@@ -21,7 +21,7 @@ import java.io.File;
 import java.util.Objects;
 import java.util.function.Predicate;
 
-import org.apache.camel.ResumableSet;
+import org.apache.camel.resume.ResumableSet;
 
 /**
  * This contains the input/output file set for resume operations.
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
similarity index 79%
rename from components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeStrategy.java
rename to components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
index dc75e80bd89..2e1f5e56477 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeStrategy.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
@@ -17,6 +17,9 @@
 
 package org.apache.camel.component.file.consumer;
 
-public interface FileSetResumeStrategy extends FileConsumerResumeStrategy<FileResumeSet> {
-
+/**
+ * Allows the implementation of file adapters for handling resume operations for file sets (i.e.: file entries in a
+ * directory)
+ */
+public interface FileSetResumeAdapter extends FileResumeAdapter<FileResumeSet> {
 }
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
index 037621d751f..79899514a54 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
@@ -17,7 +17,7 @@
 
 package org.apache.camel.component.file.consumer;
 
-import org.apache.camel.Resumable;
+import org.apache.camel.resume.Resumable;
 
 public interface GenericFileResumable<T> extends Resumable<T, Long> {
 
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
similarity index 66%
rename from components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
rename to components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
index fed8000c512..8f7c1f2900c 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileConsumerResumeStrategy.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
@@ -17,17 +17,18 @@
 
 package org.apache.camel.component.file.consumer;
 
-import org.apache.camel.ResumeStrategy;
+import java.io.File;
+import java.util.Optional;
 
 /**
- * Defines resume strategy for consumers of the file component.
+ * Allows the implementation of file adapters for handling resume operations for generic files
  */
-public interface FileConsumerResumeStrategy<T> extends ResumeStrategy {
-
+public interface GenericFileResumeAdapter extends FileResumeAdapter<GenericFileResumable<File>> {
     /**
-     * Returns the last offset read for the given file.
+     * Gets the last offset for the given file
      *
-     * @param resumable the resumable file or resumable set to run the resume
+     * @param  addressable the file instance
+     * @return             An Optional with the offset value
      */
-    void resume(T resumable);
+    Optional<Long> getLastOffset(File addressable);
 }
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java
new file mode 100644
index 00000000000..9a1f7f49f76
--- /dev/null
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.file.consumer.adapters;
+
+import java.io.File;
+
+import org.apache.camel.component.file.consumer.FileResumeSet;
+import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
+import org.apache.camel.resume.cache.MultiEntryCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link FileSetResumeAdapter} that can be used for resume operations for multiple files. For
+ * instance, this can be used to manage the resume operations for files within a directory.
+ */
+public class DefaultFileSetResumeAdapter implements FileSetResumeAdapter {
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultFileSetResumeAdapter.class);
+
+    private final MultiEntryCache<File, File> cache;
+
+    public DefaultFileSetResumeAdapter(MultiEntryCache<File, File> cache) {
+        this.cache = cache;
+    }
+
+    private boolean notProcessed(File file) {
+        File key = file.getParentFile();
+
+        // if the file is in the cache, then it's already processed
+        boolean ret = !cache.contains(key, file);
+        return ret;
+    }
+
+    @Override
+    public void resume(FileResumeSet resumable) {
+        if (resumable != null) {
+            resumable.resumeEach(this::notProcessed);
+            if (resumable.hasResumables()) {
+                LOG.debug("There's {} files to still to be processed", resumable.resumed().length);
+            }
+        } else {
+            LOG.trace("Nothing to resume");
+        }
+    }
+
+    @Override
+    public void resume() {
+        // NO-OP
+    }
+}
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java
new file mode 100644
index 00000000000..54e339fe73e
--- /dev/null
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.file.consumer.adapters;
+
+import java.io.File;
+import java.util.Optional;
+
+import org.apache.camel.component.file.consumer.GenericFileResumable;
+import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
+import org.apache.camel.resume.cache.SingleEntryCache;
+
+/**
+ * An implementation of the {@link GenericFileResumeAdapter} that can be used to handle resume operations for file
+ * offsets (where the offsets are of Long format).
+ */
+public class DefaultGenericFileResumeAdapter implements GenericFileResumeAdapter {
+    private final SingleEntryCache<File, Long> cache;
+
+    public DefaultGenericFileResumeAdapter(SingleEntryCache<File, Long> cache) {
+        this.cache = cache;
+    }
+
+    private Optional<Long> getLastOffset(GenericFileResumable<File> resumable) {
+        final File addressable = resumable.getAddressable();
+        return cache.get(addressable);
+    }
+
+    @Override
+    public Optional<Long> getLastOffset(File addressable) {
+        return cache.get(addressable);
+    }
+
+    @Override
+    public void resume(GenericFileResumable<File> resumable) {
+        final Optional<Long> lastOffsetOpt = getLastOffset(resumable);
+
+        if (!lastOffsetOpt.isPresent()) {
+            return;
+        }
+
+        final long lastOffset = lastOffsetOpt.get();
+        resumable.updateLastOffset(lastOffset);
+    }
+
+    @Override
+    public void resume() {
+
+    }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index f864cf272b6..3a6d2860329 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -25,14 +25,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import org.apache.camel.ConsumerListenerAware;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
 import org.apache.camel.Suspendable;
 import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
 import org.apache.camel.health.HealthCheckAware;
 import org.apache.camel.health.HealthCheckHelper;
+import org.apache.camel.resume.ConsumerListenerAware;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.StateRepository;
 import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
 import org.apache.camel.support.DefaultConsumer;
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaConsumer extends DefaultConsumer
-        implements ResumeAware<KafkaConsumerResumeStrategy>, HealthCheckAware, ConsumerListenerAware<KafkaConsumerListener>,
+        implements ResumeAware<ResumeStrategy>, HealthCheckAware, ConsumerListenerAware<KafkaConsumerListener>,
         Suspendable {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
@@ -56,7 +56,7 @@ public class KafkaConsumer extends DefaultConsumer
     // This list helps to work around the infinite loop of KAFKA-1894
     private final List<KafkaFetchRecords> tasks = new ArrayList<>();
     private volatile boolean stopOffsetRepo;
-    private KafkaConsumerResumeStrategy resumeStrategy;
+    private ResumeStrategy resumeStrategy;
     private KafkaConsumerListener consumerListener;
 
     public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
@@ -65,12 +65,12 @@ public class KafkaConsumer extends DefaultConsumer
     }
 
     @Override
-    public void setResumeStrategy(KafkaConsumerResumeStrategy resumeStrategy) {
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
         this.resumeStrategy = resumeStrategy;
     }
 
     @Override
-    public KafkaConsumerResumeStrategy getResumeStrategy() {
+    public ResumeStrategy getResumeStrategy() {
         return resumeStrategy;
     }
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index b1befdfa5c3..f6f7392a6ac 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -27,7 +27,7 @@ import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.CommitManagers;
 import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
 import org.apache.camel.component.kafka.consumer.errorhandler.KafkaErrorStrategies;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
+import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter;
 import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
 import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
 import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
@@ -273,7 +273,7 @@ public class KafkaFetchRecords implements Runnable {
     }
 
     private void subscribe() {
-        KafkaConsumerResumeStrategy resumeStrategy = ResumeStrategyFactory.newResumeStrategy(kafkaConsumer);
+        KafkaConsumerResumeAdapter resumeStrategy = ResumeStrategyFactory.resolveResumeAdapter(kafkaConsumer);
         resumeStrategy.setConsumer(consumer);
 
         PartitionAssignmentListener listener = new PartitionAssignmentListener(
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
index 44f9f6337ed..58211ba86a0 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/KafkaConsumerListener.java
@@ -19,9 +19,9 @@ package org.apache.camel.component.kafka.consumer.errorhandler;
 
 import java.util.function.Predicate;
 
-import org.apache.camel.ConsumerListener;
 import org.apache.camel.component.kafka.SeekPolicy;
 import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
+import org.apache.camel.resume.ConsumerListener;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java
similarity index 65%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java
index 97f25e0745d..d25c05662cb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java
@@ -16,31 +16,32 @@
  */
 package org.apache.camel.component.kafka.consumer.support;
 
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeAdapter;
 import org.apache.kafka.clients.consumer.Consumer;
 
 /**
- * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
+ * Defines a adapters for handling resume operations. Implementations can define different ways to handle how to resume
  * processing records.
  *
  * The resume runs in the scope of the Kafka Consumer thread and may run concurrently with other consumer instances when
  * the component is set up to use more than one of them. As such, implementations are responsible for ensuring the
  * thread-safety of the operations within the resume method.
  */
-public interface KafkaConsumerResumeStrategy extends ResumeStrategy {
+public interface KafkaConsumerResumeAdapter extends ResumeAdapter {
+
+    /**
+     * Sets the Kafka consumer instance for the adapter. Please note that the Kafka consumer is not safe for concurrent
+     * access
+     * 
+     * @param consumer the consumer instance
+     */
     void setConsumer(Consumer<?, ?> consumer);
 
-    default void resume(KafkaResumable resumable) {
-
-    }
-
-    @Override
-    default void start() {
-
-    }
-
-    @Override
-    default void stop() {
-
-    }
+    /**
+     * Sets an optional resumable instance for the adapter. This is usually set during partition assignment. Garanteed
+     * not to be null and safe to ignore if partition and topic information are not used.
+     * 
+     * @param kafkaResumable the resumable instance
+     */
+    void setKafkaResumable(KafkaResumable kafkaResumable);
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
index 8a86b658b4d..11a0879a59c 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
@@ -17,7 +17,7 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
-import org.apache.camel.Offset;
+import org.apache.camel.resume.Offset;
 import org.apache.camel.util.KeyValueHolder;
 
 /**
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
index 388f47fa3e5..03bc3a977b1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
@@ -17,9 +17,9 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
+import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.Offsets;
+import org.apache.camel.resume.Resumable;
 
 public class KafkaResumable implements Resumable<String, String> {
     private final String partition;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java
similarity index 82%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java
index 4502831f8fb..3ef4f209c43 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java
@@ -30,14 +30,14 @@ import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProce
 /**
  * A resume strategy that uses Kafka's offset for resuming
  */
-public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+public class OffsetKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
 
-    private static final Logger LOG = LoggerFactory.getLogger(OffsetKafkaConsumerResumeStrategy.class);
+    private static final Logger LOG = LoggerFactory.getLogger(OffsetKafkaConsumerResumeAdapter.class);
 
     private final StateRepository<String, String> offsetRepository;
     private Consumer<?, ?> consumer;
 
-    public OffsetKafkaConsumerResumeStrategy(StateRepository<String, String> offsetRepository) {
+    public OffsetKafkaConsumerResumeAdapter(StateRepository<String, String> offsetRepository) {
         this.offsetRepository = offsetRepository;
     }
 
@@ -46,6 +46,11 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr
         this.consumer = consumer;
     }
 
+    @Override
+    public void setKafkaResumable(KafkaResumable kafkaResumable) {
+        // NO-OP
+    }
+
     private void resumeFromOffset(final Consumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) {
         // The state contains the last read offset, so you need to seek from the next one
         long offset = deserializeOffsetValue(offsetState) + 1;
@@ -63,14 +68,4 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr
             }
         }
     }
-
-    /*
-     Note: when self-managing the offsets, we don't need to use the information on the resumable
-     instance. We can collect the assignments directly from the consumer instance as we always did.
-     */
-    @SuppressWarnings("unused")
-    @Override
-    public void resume(KafkaResumable resumable) {
-        resume();
-    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index e471d1e2ea6..03046f7a10b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -33,12 +33,12 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
 
     private final String threadId;
     private final KafkaConfiguration configuration;
-    private final KafkaConsumerResumeStrategy resumeStrategy;
+    private final KafkaConsumerResumeAdapter resumeStrategy;
     private final CommitManager commitManager;
 
     public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration,
                                        CommitManager commitManager,
-                                       KafkaConsumerResumeStrategy resumeStrategy) {
+                                       KafkaConsumerResumeAdapter resumeStrategy) {
         this.threadId = threadId;
         this.configuration = configuration;
         this.commitManager = commitManager;
@@ -67,6 +67,11 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
         List<KafkaResumable> resumables = partitions.stream()
                 .map(p -> new KafkaResumable(String.valueOf(p.partition()), p.topic())).collect(Collectors.toList());
 
-        resumables.forEach(r -> resumeStrategy.resume(r));
+        resumables.forEach(this::doResume);
+    }
+
+    private void doResume(KafkaResumable r) {
+        resumeStrategy.setKafkaResumable(r);
+        resumeStrategy.resume();
     }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 3784b4c5c15..77315f2e85b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.consumer.support;
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaConsumer;
 import org.apache.camel.component.kafka.SeekPolicy;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.slf4j.Logger;
@@ -29,12 +30,17 @@ public final class ResumeStrategyFactory {
     /**
      * A NO-OP resume strategy that does nothing (i.e.: no resume)
      */
-    private static class NoOpKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+    private static class NoOpKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
 
         @SuppressWarnings("unused")
         @Override
         public void setConsumer(Consumer<?, ?> consumer) {
+            // NO-OP
+        }
 
+        @Override
+        public void setKafkaResumable(KafkaResumable kafkaResumable) {
+            // NO-OP
         }
 
         @SuppressWarnings("unused")
@@ -44,36 +50,40 @@ public final class ResumeStrategyFactory {
         }
     }
 
-    private static final NoOpKafkaConsumerResumeStrategy NO_OP_RESUME_STRATEGY = new NoOpKafkaConsumerResumeStrategy();
+    private static final NoOpKafkaConsumerResumeAdapter NO_OP_RESUME_STRATEGY = new NoOpKafkaConsumerResumeAdapter();
     private static final Logger LOG = LoggerFactory.getLogger(ResumeStrategyFactory.class);
 
     private ResumeStrategyFactory() {
     }
 
-    public static KafkaConsumerResumeStrategy newResumeStrategy(KafkaConsumer kafkaConsumer) {
+    public static KafkaConsumerResumeAdapter resolveResumeAdapter(KafkaConsumer kafkaConsumer) {
         // When using resumable routes, which register the strategy via service, it takes priority over everything else
-        KafkaConsumerResumeStrategy resumableRouteStrategy = kafkaConsumer.getResumeStrategy();
+        ResumeStrategy resumeStrategy = kafkaConsumer.getResumeStrategy();
+        if (resumeStrategy != null) {
+            KafkaConsumerResumeAdapter adapter = resumeStrategy.getAdapter(KafkaConsumerResumeAdapter.class);
+
+            // The strategy should not be able to be created without an adapter, but let's be safe
+            assert adapter != null;
 
-        if (resumableRouteStrategy != null) {
-            return resumableRouteStrategy;
+            return adapter;
         }
 
         KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration();
 
-        return builtinResumeStrategies(configuration);
+        return resolveBuiltinResumeAdapters(configuration);
     }
 
-    private static KafkaConsumerResumeStrategy builtinResumeStrategies(KafkaConfiguration configuration) {
+    private static KafkaConsumerResumeAdapter resolveBuiltinResumeAdapters(KafkaConfiguration configuration) {
         LOG.debug("No resume strategy was provided ... checking for built-ins ...");
         StateRepository<String, String> offsetRepository = configuration.getOffsetRepository();
         SeekPolicy seekTo = configuration.getSeekTo();
 
         if (offsetRepository != null) {
             LOG.info("Using resume from offset strategy");
-            return new OffsetKafkaConsumerResumeStrategy(offsetRepository);
+            return new OffsetKafkaConsumerResumeAdapter(offsetRepository);
         } else if (seekTo != null) {
             LOG.info("Using resume from seek policy strategy with seeking from {}", seekTo);
-            return new SeekPolicyKafkaConsumerResumeStrategy(seekTo);
+            return new SeekPolicyKafkaConsumerResumeAdapter(seekTo);
         }
 
         LOG.info("Using NO-OP resume strategy");
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java
similarity index 80%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java
index b877763854e..0dee4e1b849 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java
@@ -24,14 +24,14 @@ import org.slf4j.LoggerFactory;
 /**
  * A resume strategy that uses Camel's seekTo configuration for resuming
  */
-public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy {
+public class SeekPolicyKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
 
-    private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeStrategy.class);
+    private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeAdapter.class);
 
     private final SeekPolicy seekPolicy;
     private Consumer<?, ?> consumer;
 
-    public SeekPolicyKafkaConsumerResumeStrategy(SeekPolicy seekPolicy) {
+    public SeekPolicyKafkaConsumerResumeAdapter(SeekPolicy seekPolicy) {
         this.seekPolicy = seekPolicy;
     }
 
@@ -40,6 +40,11 @@ public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResum
         this.consumer = consumer;
     }
 
+    @Override
+    public void setKafkaResumable(KafkaResumable kafkaResumable) {
+        // NO-OP
+    }
+
     @Override
     public void resume() {
         if (seekPolicy == SeekPolicy.BEGINNING) {
@@ -50,13 +55,4 @@ public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResum
             consumer.seekToEnd(consumer.assignment());
         }
     }
-
-    /*
-     * Note: when using the seek policy strategy, we don't use the resumable information
-     * because we use the consumer to set the policy.
-     */
-    @Override
-    public void resume(KafkaResumable resumable) {
-        resume();
-    }
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/Offset.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
similarity index 63%
copy from core/camel-api/src/main/java/org/apache/camel/Offset.java
copy to components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
index 1e73d1dcc2a..a1614d4bfc2 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Offset.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
@@ -15,20 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.processor.resume.kafka;
+
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
 
 /**
- * Generic offset without a concrete type
- *
- * @param <T> the type of the offset
+ * Base interface for resume strategies that publish the offsets to a Kafka topic
+ * 
+ * @param <K> the type of key
+ * @param <V> the type of the value
  */
-public interface Offset<T> {
-
-    /**
-     * Gets the offset value
-     *
-     * @return the offset value
-     */
-    T offset();
+public interface KafkaResumeStrategy<K, V> extends UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>>, ResumeStrategy {
 
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
new file mode 100644
index 00000000000..e9974cac4e8
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume.kafka;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for multi node
+ * integrations. This is suitable, for instance, when using clusters with the master component.
+ *
+ * @param <K> the type of key
+ * @param <V> the type of the value
+ */
+public class MultiNodeKafkaResumeStrategy<K, V> extends SingleNodeKafkaResumeStrategy<K, V> {
+    private static final Logger LOG = LoggerFactory.getLogger(MultiNodeKafkaResumeStrategy.class);
+    private final ExecutorService executorService;
+
+    /**
+     * Create a new instance of this class
+     * 
+     * @param bootstrapServers the address of the Kafka broker
+     * @param topic            the topic where to publish the offsets
+     * @param resumeCache      a cache instance where to store the offsets locally for faster access
+     * @param resumeAdapter    the component-specific resume adapter
+     */
+    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache,
+                                        ResumeAdapter resumeAdapter) {
+        // just in case users don't want to provide their own worker thread pool
+        this(bootstrapServers, topic, resumeCache, resumeAdapter, Executors.newSingleThreadExecutor());
+    }
+
+    /**
+     * Builds an instance of this class
+     *
+     * @param bootstrapServers
+     * @param topic            the topic where to publish the offsets
+     * @param resumeCache      a cache instance where to store the offsets locally for faster access
+     * @param resumeAdapter    the component-specific resume adapter
+     * @param executorService  an executor service that will run a separate thread for periodically refreshing the
+     *                         offsets
+     */
+
+    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache,
+                                        ResumeAdapter resumeAdapter, ExecutorService executorService) {
+        super(bootstrapServers, topic, resumeCache, resumeAdapter);
+
+        // We need to keep refreshing the cache
+        this.executorService = executorService;
+        executorService.submit(() -> refresh());
+    }
+
+    /**
+     * Builds an instance of this class
+     *
+     * @param topic          the topic where to publish the offsets
+     * @param resumeCache    a cache instance where to store the offsets locally for faster access
+     * @param resumeAdapter  the component-specific resume adapter
+     * @param producerConfig the set of properties to be used by the Kafka producer within this class
+     * @param consumerConfig the set of properties to be used by the Kafka consumer within this class
+     */
+    public MultiNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, ResumeAdapter resumeAdapter,
+                                        Properties producerConfig, Properties consumerConfig) {
+        this(topic, resumeCache, resumeAdapter, producerConfig, consumerConfig, Executors.newSingleThreadExecutor());
+    }
+
+    /**
+     * Builds an instance of this class
+     *
+     * @param topic           the topic where to publish the offsets
+     * @param resumeCache     a cache instance where to store the offsets locally for faster access
+     * @param resumeAdapter   the component-specific resume adapter
+     * @param producerConfig  the set of properties to be used by the Kafka producer within this class
+     * @param consumerConfig  the set of properties to be used by the Kafka consumer within this class
+     * @param executorService an executor service that will run a separate thread for periodically refreshing the
+     *                        offsets
+     */
+
+    public MultiNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, ResumeAdapter resumeAdapter,
+                                        Properties producerConfig, Properties consumerConfig, ExecutorService executorService) {
+        super(topic, resumeCache, resumeAdapter, producerConfig, consumerConfig);
+
+        this.executorService = executorService;
+        executorService.submit(() -> refresh());
+    }
+
+    /**
+     * Launch a thread to refresh the offsets periodically
+     */
+    protected void refresh() {
+        LOG.trace("Creating a offset cache refresher");
+        try {
+            Properties prop = (Properties) getConsumerConfig().clone();
+            prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+
+            Consumer<K, V> consumer = new KafkaConsumer<>(prop);
+
+            consumer.subscribe(Collections.singletonList(getTopic()));
+
+            while (true) {
+                var records = consumer.poll(getPollDuration());
+                if (records.isEmpty()) {
+                    continue;
+                }
+
+                for (var record : records) {
+                    V value = record.value();
+
+                    LOG.trace("Read from Kafka: {}", value);
+                    getResumeCache().add(record.key(), record.value());
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            executorService.shutdown();
+        } finally {
+            super.stop();
+        }
+    }
+}
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/RecordError.java
similarity index 58%
rename from components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/RecordError.java
index 6e90080b8b8..c0bfbb4f8a1 100644
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/RecordError.java
@@ -15,30 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.couchbase;
+package org.apache.camel.processor.resume.kafka;
 
-import com.couchbase.client.java.Bucket;
-import org.apache.camel.ResumeStrategy;
+import org.apache.kafka.clients.producer.RecordMetadata;
 
 /**
- * Allow implementing resume strategies for couchbase consumers
+ * Contains the error details when failing to produce records
  */
-public interface CouchbaseResumeStrategy extends ResumeStrategy {
-
-    @Override
-    default void start() {
+public class RecordError {
+    private final RecordMetadata recordMetadata;
+    private final Exception exception;
 
+    public RecordError(RecordMetadata recordMetadata, Exception exception) {
+        this.recordMetadata = recordMetadata;
+        this.exception = exception;
     }
 
-    @Override
-    default void stop() {
-
+    public RecordMetadata getRecordMetadata() {
+        return recordMetadata;
     }
 
-    /**
-     * Sets the bucket in use
-     * 
-     * @param bucket the bucket in use
-     */
-    void setBucket(Bucket bucket);
+    public Exception getException() {
+        return exception;
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
similarity index 55%
rename from components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
rename to components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 9960cecccf4..0a14bb481f6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -19,20 +19,20 @@ package org.apache.camel.processor.resume.kafka;
 
 import java.io.IOException;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.Objects;
 import java.util.Properties;
+import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
-import org.apache.camel.Resumable;
-import org.apache.camel.ResumeCache;
-import org.apache.camel.Service;
-import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StringHelper;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -44,45 +44,64 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractKafkaResumeStrategy<K, V>
-        implements UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>>, Service {
-    public static final int UNLIMITED = -1;
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaResumeStrategy.class);
+/**
+ * A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for single node
+ * integrations. For multi-node integrations (i.e: using clusters with the master component check
+ * {@link MultiNodeKafkaResumeStrategy}.
+ * 
+ * @param <K> the type of key
+ * @param <V> the type of the value
+ */
+public class SingleNodeKafkaResumeStrategy<K, V> implements KafkaResumeStrategy<K, V> {
+    private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
 
     private final String topic;
 
     private Consumer<K, V> consumer;
     private Producer<K, V> producer;
-    private long errorCount;
     private Duration pollDuration = Duration.ofSeconds(1);
 
-    private final List<Future<RecordMetadata>> sentItems = new ArrayList<>();
+    private final Queue<RecordError> producerErrors = new ConcurrentLinkedQueue<>();
     private final ResumeCache<K, V> resumeCache;
     private boolean subscribed;
     private final Properties producerConfig;
     private final Properties consumerConfig;
+    private ResumeAdapter resumeAdapter;
 
-    public AbstractKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache) {
-        this.topic = topic;
-
-        this.producerConfig = createProducer(bootstrapServers);
-        this.consumerConfig = createConsumer(bootstrapServers);
-        this.resumeCache = resumeCache;
-
-        init();
+    /**
+     * Builds an instance of this class
+     * 
+     * @param bootstrapServers the address of the Kafka broker
+     * @param topic            the topic where to publish the offsets
+     * @param resumeCache      a cache instance where to store the offsets locally for faster access
+     * @param resumeAdapter    the component-specific resume adapter
+     */
+    public SingleNodeKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache,
+                                         ResumeAdapter resumeAdapter) {
+        this(topic, resumeCache, resumeAdapter, createProducer(bootstrapServers), createConsumer(bootstrapServers));
     }
 
-    public AbstractKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, Properties producerConfig,
-                                       Properties consumerConfig) {
-        this.topic = topic;
+    /**
+     * Builds an instance of this class
+     *
+     * @param topic          the topic where to publish the offsets
+     * @param resumeCache    a cache instance where to store the offsets locally for faster access
+     * @param resumeAdapter  the component-specific resume adapter
+     * @param producerConfig the set of properties to be used by the Kafka producer within this class
+     * @param consumerConfig the set of properties to be used by the Kafka consumer within this class
+     */
+    public SingleNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, ResumeAdapter resumeAdapter,
+                                         Properties producerConfig,
+                                         Properties consumerConfig) {
+        this.topic = ObjectHelper.notNull(topic, "The topic must not be null");
         this.resumeCache = resumeCache;
+        this.resumeAdapter = resumeAdapter;
         this.producerConfig = producerConfig;
         this.consumerConfig = consumerConfig;
 
@@ -132,24 +151,24 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
     }
 
     /**
-     * Sends data to a topic
-     * 
+     * Sends data to a topic. The records will always be sent asynchronously. If there's an error, a producer error
+     * counter will be increased.
+     *
+     * @see                         SingleNodeKafkaResumeStrategy#getProducerErrors()
      * @param  message              the message to send
      * @throws ExecutionException
      * @throws InterruptedException
+     *
      */
-    public void produce(K key, V message) throws ExecutionException, InterruptedException {
+    protected void produce(K key, V message) throws ExecutionException, InterruptedException {
         ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, message);
 
-        errorCount = 0;
-        Future<RecordMetadata> future = producer.send(record, (recordMetadata, e) -> {
+        producer.send(record, (recordMetadata, e) -> {
             if (e != null) {
                 LOG.error("Failed to send message {}", e.getMessage(), e);
-                errorCount++;
+                producerErrors.add(new RecordError(recordMetadata, e));
             }
         });
-
-        sentItems.add(future);
     }
 
     @Override
@@ -164,6 +183,11 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
         resumeCache.add(key, offsetValue);
     }
 
+    /**
+     * Loads the existing data into the cache
+     * 
+     * @throws Exception
+     */
     protected void loadCache() throws Exception {
         subscribe();
 
@@ -192,11 +216,12 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
         unsubscribe();
     }
 
-    // TODO: bad method ...
     /**
+     * Subscribe to the topic if not subscribed yet
+     * 
      * @param topic the topic to consume the messages from
      */
-    public void checkAndSubscribe(String topic) {
+    protected void checkAndSubscribe(String topic) {
         if (!subscribed) {
             consumer.subscribe(Collections.singletonList(topic));
 
@@ -205,40 +230,54 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
     }
 
     /**
-     * @param topic the topic to consume the messages from
+     * Subscribe to the topic if not subscribed yet
+     * 
+     * @param topic     the topic to consume the messages from
+     * @param remaining the number of messages to rewind from the last offset position (used to fill the cache)
      */
     public void checkAndSubscribe(String topic, long remaining) {
         if (!subscribed) {
-            consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
-                @Override
-                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
-
-                }
-
-                @Override
-                public void onPartitionsAssigned(Collection<TopicPartition> assignments) {
-                    consumer.seekToEnd(assignments);
-                    for (TopicPartition assignment : assignments) {
-                        final long endPosition = consumer.position(assignment);
-                        final long startPosition = endPosition - remaining;
-
-                        if (startPosition >= 0) {
-                            consumer.seek(assignment, startPosition);
-                        } else {
-                            LOG.info(
-                                    "Ignoring the seek command because the initial offset is negative (the topic is likely empty)");
-                        }
-                    }
-                }
-            });
+            consumer.subscribe(Collections.singletonList(topic), getConsumerRebalanceListener(remaining));
 
             subscribed = true;
         }
     }
 
-    public abstract void subscribe() throws Exception;
+    /**
+     * Creates a new consumer rebalance listener. This can be useful for setting the exact Kafka offset when necessary
+     * to read a limited amount of messages or customize the resume strategy behavior when a rebalance occurs.
+     * 
+     * @param  remaining
+     * @return
+     */
+    protected ConsumerRebalanceListener getConsumerRebalanceListener(long remaining) {
+        return new ConsumerRebalanceListener() {
+            @Override
+            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
+
+            }
 
-    public void unsubscribe() {
+            @Override
+            public void onPartitionsAssigned(Collection<TopicPartition> assignments) {
+                for (TopicPartition assignment : assignments) {
+                    final long endPosition = consumer.position(assignment);
+                    final long startPosition = endPosition - remaining;
+
+                    if (startPosition >= 0) {
+                        consumer.seek(assignment, startPosition);
+                    } else {
+                        LOG.info(
+                                "Ignoring the seek command because the initial offset is negative (the topic is likely empty)");
+                    }
+                }
+            }
+        };
+    }
+
+    /**
+     * Unsubscribe from the topic
+     */
+    protected void unsubscribe() {
         try {
             consumer.unsubscribe();
         } catch (IllegalStateException e) {
@@ -249,17 +288,23 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
     }
 
     /**
-     * Consumes message from the given topic until the predicate returns false
+     * Consumes message from the topic previously setup
      *
-     * @return
+     * @return An instance of the consumer records
      */
-    public ConsumerRecords<K, V> consume() {
+    protected ConsumerRecords<K, V> consume() {
         int retries = 10;
 
         return consume(retries);
     }
 
-    public ConsumerRecords<K, V> consume(int retries) {
+    /**
+     * Consumes message from the topic previously setup
+     * 
+     * @param  retries how many times to retry consuming data from the topic
+     * @return         An instance of the consumer records
+     */
+    protected ConsumerRecords<K, V> consume(int retries) {
         while (retries > 0) {
             ConsumerRecords<K, V> records = consumer.poll(pollDuration);
             if (!records.isEmpty()) {
@@ -271,22 +316,35 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
         return ConsumerRecords.empty();
     }
 
-    public long getErrorCount() {
-        return errorCount;
+    public void subscribe() throws Exception {
+        if (resumeCache.capacity() >= 1) {
+            checkAndSubscribe(topic, resumeCache.capacity());
+        } else {
+            checkAndSubscribe(topic);
+        }
     }
 
-    public List<Future<RecordMetadata>> getSentItems() {
-        return Collections.unmodifiableList(sentItems);
+    @Override
+    public ResumeAdapter getAdapter() {
+        return resumeAdapter;
+    }
+
+    /**
+     * Gets the set record of sent items
+     *
+     * @return
+     */
+    protected Collection<RecordError> getProducerErrors() {
+        return Collections.unmodifiableCollection(producerErrors);
     }
 
     @Override
     public void build() {
-        Service.super.build();
+        // NO-OP
     }
 
     @Override
     public void init() {
-        Service.super.init();
 
         LOG.debug("Initializing the Kafka resume strategy");
         if (consumer == null) {
@@ -300,12 +358,16 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
 
     @Override
     public void stop() {
+        LOG.info("Closing the Kafka producer");
+        IOHelper.close(producer, "Kafka producer", LOG);
 
+        LOG.info("Closing the Kafka consumer");
+        IOHelper.close(producer, "Kafka consumer", LOG);
     }
 
     @Override
     public void close() throws IOException {
-        Service.super.close();
+        stop();
     }
 
     @Override
@@ -334,4 +396,27 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
     protected Producer<K, V> getProducer() {
         return producer;
     }
+
+    protected Properties getProducerConfig() {
+        return producerConfig;
+    }
+
+    protected Properties getConsumerConfig() {
+        return consumerConfig;
+    }
+
+    protected String getTopic() {
+        return topic;
+    }
+
+    protected ResumeCache<K, V> getResumeCache() {
+        return resumeCache;
+    }
+
+    /**
+     * Clear the producer errors
+     */
+    public void resetProducerErrors() {
+        producerErrors.clear();
+    }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
index 181009da26c..52388fd9081 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
@@ -25,15 +25,16 @@ import java.util.concurrent.TimeUnit;
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
-import org.apache.camel.Service;
-import org.apache.camel.UpdatableConsumerResumeStrategy;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
+import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter;
 import org.apache.camel.component.kafka.consumer.support.KafkaResumable;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.Resumables;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTestSupport {
@@ -56,16 +58,13 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
     private MockEndpoint result;
 
     @BindToRegistry("resumeStrategy")
-    private TestKafkaConsumerResumeStrategy resumeStrategy;
+    private TestUpdateStrategy resumeStrategy;
     private CountDownLatch messagesLatch;
     private KafkaProducer<Object, Object> producer;
 
-    private static class TestKafkaConsumerResumeStrategy
-            implements KafkaConsumerResumeStrategy,
-            UpdatableConsumerResumeStrategy<String, Integer, Resumable<String, Integer>>, Service {
+    private static class TestUpdateStrategy extends TransientResumeStrategy
+            implements UpdatableConsumerResumeStrategy<String, Integer, Resumable<String, Integer>> {
         private final CountDownLatch messagesLatch;
-        private boolean resumeCalled;
-        private boolean consumerIsNull = true;
         private boolean startCalled;
         private boolean offsetNull = true;
         private boolean offsetAddressableNull = true;
@@ -74,34 +73,10 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
         private boolean offsetValueEmpty = true;
         private int lastOffset;
 
-        public TestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch) {
-            this.messagesLatch = messagesLatch;
-        }
-
-        @Override
-        public void setConsumer(Consumer<?, ?> consumer) {
-            if (consumer != null) {
-                consumerIsNull = false;
-            }
-        }
-
-        @Override
-        public void resume(KafkaResumable resumable) {
-            resumeCalled = true;
-
-        }
-
-        @Override
-        public void resume() {
-            resumeCalled = true;
-        }
-
-        public boolean isResumeCalled() {
-            return resumeCalled;
-        }
+        public TestUpdateStrategy(ResumeAdapter resumeAdapter, CountDownLatch messagesLatch) {
+            super(resumeAdapter);
 
-        public boolean isConsumerIsNull() {
-            return consumerIsNull;
+            this.messagesLatch = messagesLatch;
         }
 
         @Override
@@ -115,12 +90,8 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
             LOG.warn("Init was called");
         }
 
-        public boolean isStartCalled() {
-            return startCalled;
-        }
-
         @Override
-        public void updateLastOffset(Resumable<String, Integer> offset) {
+        public void updateLastOffset(Resumable<String, Integer> offset) throws Exception {
             try {
                 if (offset != null) {
                     offsetNull = false;
@@ -166,6 +137,40 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
         public boolean isOffsetValueEmpty() {
             return offsetValueEmpty;
         }
+
+        public boolean isStartCalled() {
+            return startCalled;
+        }
+    }
+
+    private static class TestKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
+        private boolean resumeCalled;
+        private boolean consumerIsNull = true;
+
+        @Override
+        public void setConsumer(Consumer<?, ?> consumer) {
+            if (consumer != null) {
+                consumerIsNull = false;
+            }
+        }
+
+        @Override
+        public void setKafkaResumable(KafkaResumable kafkaResumable) {
+
+        }
+
+        @Override
+        public void resume() {
+            resumeCalled = true;
+        }
+
+        public boolean isResumeCalled() {
+            return resumeCalled;
+        }
+
+        public boolean isConsumerIsNull() {
+            return consumerIsNull;
+        }
     }
 
     @BeforeEach
@@ -183,7 +188,7 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
         super.doPreSetup();
 
         messagesLatch = new CountDownLatch(1);
-        resumeStrategy = new TestKafkaConsumerResumeStrategy(messagesLatch);
+        resumeStrategy = new TestUpdateStrategy(new TestKafkaConsumerResumeAdapter(), messagesLatch);
     }
 
     @Test
@@ -191,9 +196,11 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
     public void testOffsetIsBeingChecked() throws InterruptedException {
         assertTrue(messagesLatch.await(100, TimeUnit.SECONDS), "The resume was not called");
 
-        assertTrue(resumeStrategy.isResumeCalled(),
+        final TestKafkaConsumerResumeAdapter adapter = resumeStrategy.getAdapter(TestKafkaConsumerResumeAdapter.class);
+        assertNotNull(adapter, "The adapter should not be null");
+        assertTrue(adapter.isResumeCalled(),
                 "The resume strategy should have been called when the partition was assigned");
-        assertFalse(resumeStrategy.isConsumerIsNull(),
+        assertFalse(adapter.isConsumerIsNull(),
                 "The consumer passed to the strategy should not be null");
         assertTrue(resumeStrategy.isStartCalled(),
                 "The resume strategy should have been started");
diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
index a79ce85a743..2e699a2de92 100644
--- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
+++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
@@ -21,8 +21,6 @@ import java.util.Optional;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.ResumeStrategy;
 import org.apache.camel.StartupListener;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.api.management.ManagedAttribute;
@@ -31,6 +29,8 @@ import org.apache.camel.cluster.CamelClusterEventListener;
 import org.apache.camel.cluster.CamelClusterMember;
 import org.apache.camel.cluster.CamelClusterService;
 import org.apache.camel.cluster.CamelClusterView;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.service.ServiceHelper;
 import org.slf4j.Logger;
diff --git a/core/camel-api/src/main/java/org/apache/camel/Route.java b/core/camel-api/src/main/java/org/apache/camel/Route.java
index 0e6f90882c5..c5f46ec328c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Route.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Route.java
@@ -21,6 +21,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.camel.resume.ConsumerListener;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.ManagementInterceptStrategy;
 import org.apache.camel.spi.Resource;
diff --git a/core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java b/core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListener.java
similarity index 98%
rename from core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListener.java
index d92808e5d2a..bbf6be9d018 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ConsumerListener.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListener.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 import java.util.function.Predicate;
 
diff --git a/core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java b/core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListenerAware.java
similarity index 97%
rename from core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListenerAware.java
index bbd24c59e47..2e188721028 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ConsumerListenerAware.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ConsumerListenerAware.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 /**
  * An interface to represent an object which wishes to support listening for consumer events using the
diff --git a/core/camel-api/src/main/java/org/apache/camel/Offset.java b/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
similarity index 96%
rename from core/camel-api/src/main/java/org/apache/camel/Offset.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
index 1e73d1dcc2a..c41f312d49a 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Offset.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 /**
  * Generic offset without a concrete type
diff --git a/core/camel-api/src/main/java/org/apache/camel/Resumable.java b/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
similarity index 98%
rename from core/camel-api/src/main/java/org/apache/camel/Resumable.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
index 804c05bf9a3..108af04293a 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Resumable.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 /**
  * This provides an interface for resumable objects. Such objects allow its users to address them at a specific offset.
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
similarity index 98%
rename from core/camel-api/src/main/java/org/apache/camel/ResumableSet.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
index 0a18ba9e232..d09a61f7e72 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 import java.lang.reflect.Array;
 import java.util.Arrays;
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
similarity index 62%
copy from core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
copy to core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
index f23e7625e75..53221ac7d10 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 /**
- * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
- * processing records.
+ * A resume adapter provides the component-specific logic that plugs the more generic strategic with the lower level
+ * requirements of the component being used.
+ *
+ * It is the responsibility of the supported components to implement the custom implementation for this part of the
+ * resume API, as well as to offer component-specific interfaces that can be specialized by other integrations.
  */
-public interface ResumeStrategy extends Service {
-
+public interface ResumeAdapter {
     /**
-     * A consumer, iterator or value class that can be used to set the index position from which to resume from. The
-     * type is specific to the component.
-     *
+     * Execute the resume logic for the adapter
      */
     void resume();
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeAware.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAware.java
similarity index 97%
rename from core/camel-api/src/main/java/org/apache/camel/ResumeAware.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/ResumeAware.java
index 6ee69db413f..10f59b3c00c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeAware.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAware.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 /**
  * An interface to represent an object which wishes to support resume operations using a {@link ResumeStrategy}.
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
similarity index 65%
copy from core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
copy to core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index f23e7625e75..06096be3b1e 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -15,7 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
+
+import org.apache.camel.Service;
 
 /**
  * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
@@ -24,9 +26,20 @@ package org.apache.camel;
 public interface ResumeStrategy extends Service {
 
     /**
-     * A consumer, iterator or value class that can be used to set the index position from which to resume from. The
-     * type is specific to the component.
-     *
+     * Gets an adapter for resuming operations
+     * 
+     * @return
+     */
+    ResumeAdapter getAdapter();
+
+    /**
+     * Gets and adapter for resuming operations
+     * 
+     * @param  clazz the class of the adapter
+     * @return       the adapter or null if it can't be cast to the requested class
+     * @param  <T>   the type of the adapter
      */
-    void resume();
+    default <T extends ResumeAdapter> T getAdapter(Class<T> clazz) {
+        return clazz.cast(getAdapter());
+    }
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/UpdatableConsumerResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
similarity index 97%
rename from core/camel-api/src/main/java/org/apache/camel/UpdatableConsumerResumeStrategy.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
index 34bd71690f2..af6e682f198 100644
--- a/core/camel-api/src/main/java/org/apache/camel/UpdatableConsumerResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume;
 
 /**
  * An updatable resume strategy
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
similarity index 71%
rename from components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeStrategy.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
index 294e77223ab..0708d1ddff0 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
@@ -15,8 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.file.consumer;
+package org.apache.camel.resume.cache;
 
-public interface GenericFileResumeStrategy<T> extends FileConsumerResumeStrategy<GenericFileResumable<T>> {
+/**
+ * A cache where an entry can point to one or more entries. For instance, a path as the key and the file entries as its
+ * entries
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public interface MultiEntryCache<K, V> extends ResumeCache<K, V> {
 
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeCache.java b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
similarity index 76%
rename from core/camel-api/src/main/java/org/apache/camel/ResumeCache.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
index 0abe2e6dca7..c2ce3758f33 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeCache.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume.cache;
 
-import java.util.Optional;
+import org.apache.camel.resume.ResumeStrategy;
 
 /**
  * This cache stored the resumed data from a {@link ResumeStrategy}.
@@ -27,6 +27,15 @@ import java.util.Optional;
  */
 public interface ResumeCache<K, V> {
 
+    /**
+     * Whether the cache contains the key with the given entry value
+     *
+     * @param  key   the key
+     * @param  entry the entry
+     * @return       true if the key/entry pair is stored in the cache
+     */
+    boolean contains(K key, V entry);
+
     /**
      * Adds a value to the cache
      * 
@@ -43,10 +52,9 @@ public interface ResumeCache<K, V> {
     boolean isFull();
 
     /**
-     * Gets the offset value for the key
+     * Gets the cache pool size
      * 
-     * @param  key the key
-     * @return     the key
+     * @return
      */
-    Optional<V> get(K key);
+    long capacity();
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
similarity index 67%
rename from core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
index f23e7625e75..e8b243846e7 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
@@ -15,18 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.camel;
+package org.apache.camel.resume.cache;
+
+import java.util.Optional;
 
 /**
- * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume
- * processing records.
+ * A resume cache where a single key can only be mapped to a single entry
+ *
+ * @param <K> the type the key
+ * @param <V> the type of the entry
  */
-public interface ResumeStrategy extends Service {
-
+public interface SingleEntryCache<K, V> extends ResumeCache<K, V> {
     /**
-     * A consumer, iterator or value class that can be used to set the index position from which to resume from. The
-     * type is specific to the component.
+     * Gets the offset value for the key
      *
+     * @param  key the key
+     * @return     the key
      */
-    void resume();
+    Optional<V> get(K key);
+
 }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
index 8a8fc6a10d5..c35da8960f3 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
@@ -28,15 +28,11 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
-import org.apache.camel.ConsumerListener;
-import org.apache.camel.ConsumerListenerAware;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.NamedNode;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeAware;
-import org.apache.camel.ResumeStrategy;
 import org.apache.camel.Route;
 import org.apache.camel.RouteAware;
 import org.apache.camel.Service;
@@ -44,6 +40,10 @@ import org.apache.camel.ShutdownRoute;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.Suspendable;
 import org.apache.camel.SuspendableService;
+import org.apache.camel.resume.ConsumerListener;
+import org.apache.camel.resume.ConsumerListenerAware;
+import org.apache.camel.resume.ResumeAware;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.ManagementInterceptStrategy;
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index 8176f7d6a52..bcaee49fecf 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -34,7 +34,7 @@ This instance can be bound in the Context registry as follows:
 
 [source,java]
 ----
-getCamelContext().getRegistry().bind("testResumeStrategy", new MyTestResumeStrategy());
+getCamelContext().getRegistry().bind("testResumeStrategy", new MyTestResumeStrategy(new MyAdapter()));
 
 from("some:component")
     .resumable("testResumeStrategy")
@@ -46,129 +46,70 @@ Or the instance can be constructed as follows:
 [source,java]
 ----
 from("some:component")
-    .resumable(new MyTestResumeStrategy())
+    .resumable(new MyTestResumeStrategy(new MyAdapter()))
     .process(this::process)
 ----
 
+=== The Resume Adapter
+
+The adapter class responsibility is to bind the component-specific part of the logic to the more generic handling of the
+resume strategy. The adapter is always component specific and some components may have more than one. Integrations with
+more complex resume processes, may implement their own adapters, although the builtin ones should be useful in most of the
+cases. Currently, the following adapters are available:
+
+* camel-atom: `org.apache.camel.component.feed.EntryFilter`
+* camel-aws2-kinesis: `org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter`
+* camel-cassandracql: `org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter`
+* camel-couchbase: `org.apache.camel.component.couchbase.CouchbaseResumeAdapter`
+* camel-couchdb: `org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter`
+* camel-file: `org.apache.camel.component.file.consumer.adapters.FileSetResumeAdapter` for directories
+* camel-file: `org.apache.camel.component.file.consumer.adapters.GenericFileResumeAdapter` for files
+* camel-kafka: `org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter`
+* camel-rss: `org.apache.camel.component.feed.EntryFilter`
+* generic: `org.apache.camel.processor.resume.DelegatingResumeAdapter`
+
+Note: in the future, these adapters will be resolved automatically by Camel.
+
 == The Resume API Interfaces
 
 These are the *core interfaces*:
 
-* `org.apache.camel.ResumeStrategy` - the basic resume strategy
-* `org.apache.camel.UpdatableConsumerResumeStrategy` - an extension to the resume strategy to allow updatable strategies
-* `org.apache.camel.ResumeCache` - an interface for local cache for resumable information
+* `org.apache.camel.resume.ResumeStrategy` - the resume strategy service
+* `org.apache.camel.resume.ResumeAdapter` - an adapter that binds the generic parts of the resume strategy with the component
+* `org.apache.camel.resume.UpdatableConsumerResumeStrategy` - an extension to the resume strategy to allow updatable strategies
+* `org.apache.camel.resume.cache.ResumeCache` - the base interface for local cache for resumable information
+* `org.apache.camel.resume.cache.SingleEntryCache` - an interface for local cache for resumable information where there is a one-to-one relationship between cache a key and its entry (i.e: a file and its offset)
+* `org.apache.camel.resume.cache.MultiEntryCache` - an interface for local cache for resumable information where there is a one-to-many relationship between the cache a keys and its entries (i.e.: a path and its file entries)
 
 These are the *core classes* supporting the strategies:
 
-* `org.apache.camel.Resumable` - an interface to allow users to work with abstract resumable entities (files, offsets, etc)
-* `org.apache.camel.ResumableSet` - an interface for resumables with a 1-to-many relationship
-* `org.apache.camel.Offset` - a generic offset without a concrete type (it may represent a long, a file name, etc)
+* `org.apache.camel.resume.Resumable` - an interface to allow users to work with abstract resumable entities (files, offsets, etc)
+* `org.apache.camel.resume.ResumableSet` - an interface for resumables with a 1-to-many relationship
+* `org.apache.camel.resume.Offset` - a generic offset without a concrete type (it may represent a long, a file name, etc)
 
 These are the *supporting classes*:
 
 * `org.apache.camel.support.Resumables` - resumables handling support
 * `org.apache.camel.support.Offsets` - offset handling support
 
-== Basic Strategies
-
-The basic strategies offer a component-specific skeleton that can be used to implement strategies.
-
-* `AbstractKafkaResumeStrategy` - a resume strategy from the `camel-kafka` component that uses Kafka as the store for the offsets.
-
-[source,java]
-----
-public class KafkaResumeStrategy<K> extends AbstractKafkaResumeStrategy<K, Long> implements GenericFileResumeStrategy<File> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaResumeStrategy.class);
-    public static final int CACHE_SIZE = 100;
-
-    private final String topic;
-    private final ResumeCache<K, Long> cache;
+== Builtin Resume Strategies
 
-    public KafkaResumeStrategy(String topic,
-                               ResumeCache<K, Long> cache,
-                               DefaultProducerPropertyFactory producerPropertyFactory,
-                               DefaultConsumerPropertyFactory consumerPropertyFactory)
-    {
-        super(topic, cache, producerPropertyFactory.getProperties(), consumerPropertyFactory.getProperties());
-        this.topic = topic;
-        this.cache = cache;
-    }
+Camel comes with a few builtin strategies that can be used to store, retrieve and update the offsets. The following strategies are available:
 
+* `SingleNodeKafkaResumeStrategy` - a resume strategy from the `camel-kafka` component that uses Kafka as the store for the offsets and is suitable for single node integrations.
+* `MultiNodeKafkaResumeStrategy` - a resume strategy from the `camel-kafka` component that uses Kafka as the store for the offsets and is suitable for multi node integrations (i.e.: integrations running on clusters using the xref:components::master-component.adoc[camel-master] component.
 
-    private Optional<Long> getLastOffset(GenericFileResumable<File> resumable) {
-        final File addressable = resumable.getAddressable();
-        return getLastOffset((K) addressable);
-    }
 
-    public Optional<Long> getLastOffset(K addressable) {
-        return cache.get(addressable);
-    }
-
-    @Override
-    public void subscribe() {
-        checkAndSubscribe(topic, 1);
-    }
-
-    @Override
-    public void resume(GenericFileResumable<File> resumable) {
-        final Optional<Long> lastOffsetOpt = getLastOffset(resumable);
-
-        if (!lastOffsetOpt.isPresent()) {
-            return;
-        }
-
-        final long lastOffset = lastOffsetOpt.get();
-        resumable.updateLastOffset(lastOffset);
-    }
-
-    @Override
-    public void resume() {
-        throw new UnsupportedOperationException("Cannot perform blind resume");
-    }
-}
-----
+=== Implementing New Builtin Resume Strategies
 
+New builtin resume strategies can be created by implementing the `UpdatableConsumerResumeStrategy` and the `ResumeStrategy` interfaces. Check the code for `SingleNodeKafkaResumeStrategy` for implementation details.
 
 == Local Cache Support
 
 A sample local cache implemented using https://github.com/ben-manes/caffeine[Caffeine].
 
-[source,java]
-----
-public class SingleItemCache<K> implements ResumeCache<K, Long> {
-    public static final int CACHE_SIZE = 100;
-    private final Cache<K, Long> cache = Caffeine.newBuilder()
-            .maximumSize(CACHE_SIZE)
-            .build();
-
-    @Override
-    public void add(K key, Long offsetValue) {
-        cache.put(key, offsetValue);
-    }
-
-    @Override
-    public Optional<Long> get(K key) {
-        Long entry = cache.getIfPresent(key);
-
-        if (entry == null) {
-            return Optional.empty();
-        }
-
-        return Optional.of(entry.longValue());
-    }
-
-    @Override
-    public boolean isFull() {
-        if (cache.estimatedSize() < CACHE_SIZE) {
-            return true;
-        }
-
-        return false;
-    }
-}
-----
-
+* `org.apache.camel.component.caffeine.resume.single.CaffeineCache`: for data with where 1 key can only point to 1 entry (1-to-1 relationship)
+* `org.apache.camel.component.caffeine.resume.multi.CaffeineCache`: for data with where 1 key can point to 1 or more entries (1-to-many relationship)
 
 == Known Limitations
 
@@ -209,7 +150,7 @@ Currently, support for pausable consumers is available for the following compone
 
 To use the API, it needs an instance of a Consumer listener along with a predicate that tests whether to continue.
 
-* `org.apache.camel.ConsumerListener` - the consumer listener interface. Camel already comes with pre-built consumer listeners, but users in need of more complex behaviors can create their own listeners.
+* `org.apache.camel.resume.ConsumerListener` - the consumer listener interface. Camel already comes with pre-built consumer listeners, but users in need of more complex behaviors can create their own listeners.
 * a predicate that returns true if data consumption should resume or false if consumption should be put on pause
 
 Usage example:
diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
index 88b088805ab..f1526b5311b 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/pausable.json
@@ -12,7 +12,7 @@
     "output": false
   },
   "properties": {
-    "consumerListener": { "kind": "attribute", "displayName": "Consumer Listener", "required": true, "type": "object", "javaType": "org.apache.camel.ConsumerListener", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the consumer listener to use" },
+    "consumerListener": { "kind": "attribute", "displayName": "Consumer Listener", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ConsumerListener", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the consumer listener to use" },
     "untilCheck": { "kind": "attribute", "displayName": "Until Check", "required": true, "type": "object", "javaType": "java.util.function.Predicate", "deprecated": false, "autowired": false, "secret": false },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
diff --git a/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json b/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json
index acbea228865..03a0a440e46 100644
--- a/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json
+++ b/core/camel-core-model/src/generated/resources/org/apache/camel/model/resumable.json
@@ -12,7 +12,7 @@
     "output": false
   },
   "properties": {
-    "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" },
+    "resumeStrategy": { "kind": "attribute", "displayName": "Resume Strategy", "required": true, "type": "object", "javaType": "org.apache.camel.resume.ResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the resume strategy to use" },
     "id": { "kind": "attribute", "displayName": "Id", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the id of this node" },
     "description": { "kind": "element", "displayName": "Description", "required": false, "type": "object", "javaType": "org.apache.camel.model.DescriptionDefinition", "deprecated": false, "autowired": false, "secret": false, "description": "Sets the description of this node" }
   }
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
index d6f1bdfcaf2..7c8ceef4738 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/PausableDefinition.java
@@ -25,7 +25,7 @@ import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
-import org.apache.camel.ConsumerListener;
+import org.apache.camel.resume.ConsumerListener;
 import org.apache.camel.spi.Metadata;
 
 /**
@@ -40,7 +40,7 @@ public class PausableDefinition extends NoOutputDefinition<PausableDefinition> {
     private ConsumerListener<?, ?> consumerListenerBean;
 
     @XmlAttribute(required = true)
-    @Metadata(required = true, javaType = "org.apache.camel.ConsumerListener")
+    @Metadata(required = true, javaType = "org.apache.camel.resume.ConsumerListener")
     private String consumerListener;
 
     @XmlTransient
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index af065fcbae0..e2b0cadf078 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -37,7 +37,6 @@ import org.apache.camel.AggregationStrategy;
 import org.apache.camel.BeanScope;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
-import org.apache.camel.ConsumerListener;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -45,7 +44,6 @@ import org.apache.camel.Expression;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeStrategy;
 import org.apache.camel.builder.DataFormatClause;
 import org.apache.camel.builder.EndpointConsumerBuilder;
 import org.apache.camel.builder.EndpointProducerBuilder;
@@ -60,6 +58,8 @@ import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.model.language.LanguageExpression;
 import org.apache.camel.model.language.SimpleExpression;
 import org.apache.camel.processor.loadbalancer.LoadBalancer;
+import org.apache.camel.resume.ConsumerListener;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.AsEndpointUri;
 import org.apache.camel.spi.AsPredicate;
 import org.apache.camel.spi.DataFormat;
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
index 4e2d62deb86..bc68161ae30 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
@@ -22,7 +22,7 @@ import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.Metadata;
 
 /**
@@ -37,7 +37,7 @@ public class ResumableDefinition extends NoOutputDefinition<ResumableDefinition>
     private ResumeStrategy resumeStrategyBean;
 
     @XmlAttribute(required = true)
-    @Metadata(required = true, javaType = "org.apache.camel.ResumeStrategy")
+    @Metadata(required = true, javaType = "org.apache.camel.resume.ResumeStrategy")
     private String resumeStrategy;
 
     @Metadata(label = "advanced", javaType = "org.apache.camel.LoggingLevel", defaultValue = "ERROR",
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
index 85954c3b6f7..09c3ab6db35 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PausableProcessor.java
@@ -24,10 +24,10 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
-import org.apache.camel.ConsumerListener;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
+import org.apache.camel.resume.ConsumerListener;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java
new file mode 100644
index 00000000000..6aee7fd8f4f
--- /dev/null
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+import java.util.function.IntFunction;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+
+import org.apache.camel.resume.ResumeAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A delegating adapter that can be used to delegate to and/or abstract resume adapters
+ */
+public class DelegatingResumeAdapter implements ResumeAdapter {
+    private static final Logger LOG = LoggerFactory.getLogger(DelegatingResumeAdapter.class);
+
+    private final List<ResumeAdapter> resumeStrategies;
+
+    public DelegatingResumeAdapter() {
+        resumeStrategies = new ArrayList<>();
+    }
+
+    protected DelegatingResumeAdapter(List<ResumeAdapter> resumeStrategies) {
+        this.resumeStrategies = resumeStrategies;
+    }
+
+    public boolean add(ResumeAdapter resumeAdapter) {
+        return resumeStrategies.add(resumeAdapter);
+    }
+
+    public boolean remove(Object resumeAdapter) {
+        return resumeStrategies.remove(resumeAdapter);
+    }
+
+    public boolean removeIf(Predicate<? super ResumeAdapter> filter) {
+        return resumeStrategies.removeIf(filter);
+    }
+
+    @Override
+    public void resume() {
+        resumeStrategies.forEach(ResumeAdapter::resume);
+    }
+
+    public int size() {
+        return resumeStrategies.size();
+    }
+
+    public boolean isEmpty() {
+        return resumeStrategies.isEmpty();
+    }
+
+    public boolean contains(Object o) {
+        return resumeStrategies.contains(o);
+    }
+
+    public Iterator<ResumeAdapter> iterator() {
+        return resumeStrategies.iterator();
+    }
+
+    public Object[] toArray() {
+        return resumeStrategies.toArray();
+    }
+
+    public <T> T[] toArray(T[] a) {
+        return resumeStrategies.toArray(a);
+    }
+
+    public boolean containsAll(Collection<?> c) {
+        return resumeStrategies.containsAll(c);
+    }
+
+    public boolean addAll(Collection<? extends ResumeAdapter> c) {
+        return resumeStrategies.addAll(c);
+    }
+
+    public boolean addAll(int index, Collection<? extends ResumeAdapter> c) {
+        return resumeStrategies.addAll(index, c);
+    }
+
+    public boolean removeAll(Collection<?> c) {
+        return resumeStrategies.removeAll(c);
+    }
+
+    public boolean retainAll(Collection<?> c) {
+        return resumeStrategies.retainAll(c);
+    }
+
+    public void replaceAll(UnaryOperator<ResumeAdapter> operator) {
+        resumeStrategies.replaceAll(operator);
+    }
+
+    public void sort(Comparator<? super ResumeAdapter> c) {
+        resumeStrategies.sort(c);
+    }
+
+    public void clear() {
+        resumeStrategies.clear();
+    }
+
+    public ResumeAdapter get(int index) {
+        return resumeStrategies.get(index);
+    }
+
+    public ResumeAdapter set(int index, ResumeAdapter element) {
+        return resumeStrategies.set(index, element);
+    }
+
+    public void add(int index, ResumeAdapter element) {
+        resumeStrategies.add(index, element);
+    }
+
+    public ResumeAdapter remove(int index) {
+        return resumeStrategies.remove(index);
+    }
+
+    public int indexOf(Object o) {
+        return resumeStrategies.indexOf(o);
+    }
+
+    public int lastIndexOf(Object o) {
+        return resumeStrategies.lastIndexOf(o);
+    }
+
+    public ListIterator<ResumeAdapter> listIterator() {
+        return resumeStrategies.listIterator();
+    }
+
+    public ListIterator<ResumeAdapter> listIterator(int index) {
+        return resumeStrategies.listIterator(index);
+    }
+
+    public List<ResumeAdapter> subList(int fromIndex, int toIndex) {
+        return resumeStrategies.subList(fromIndex, toIndex);
+    }
+
+    public Spliterator<ResumeAdapter> spliterator() {
+        return resumeStrategies.spliterator();
+    }
+
+    public <T> T[] toArray(IntFunction<T[]> generator) {
+        return resumeStrategies.toArray(generator);
+    }
+
+    public Stream<ResumeAdapter> stream() {
+        return resumeStrategies.stream();
+    }
+
+    public Stream<ResumeAdapter> parallelStream() {
+        return resumeStrategies.parallelStream();
+    }
+
+    public void forEach(Consumer<? super ResumeAdapter> action) {
+        resumeStrategies.forEach(action);
+    }
+}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeStrategy.java
deleted file mode 100644
index 71ea930f95c..00000000000
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeStrategy.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.processor.resume;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Predicate;
-
-import org.apache.camel.ResumeStrategy;
-import org.apache.camel.Service;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A delegating strategy that can be used to delegate to and/or abstract resume strategies
- */
-public class DelegatingResumeStrategy implements ResumeStrategy {
-    private static final Logger LOG = LoggerFactory.getLogger(DelegatingResumeStrategy.class);
-
-    private final List<ResumeStrategy> resumeStrategies;
-
-    public DelegatingResumeStrategy() {
-        resumeStrategies = new ArrayList<>();
-    }
-
-    protected DelegatingResumeStrategy(List<ResumeStrategy> resumeStrategies) {
-        this.resumeStrategies = resumeStrategies;
-    }
-
-    public boolean add(ResumeStrategy resumeStrategy) {
-        return resumeStrategies.add(resumeStrategy);
-    }
-
-    public boolean remove(Object resumeStrategy) {
-        return resumeStrategies.remove(resumeStrategy);
-    }
-
-    public boolean removeIf(Predicate<? super ResumeStrategy> filter) {
-        return resumeStrategies.removeIf(filter);
-    }
-
-    @Override
-    public void resume() {
-        resumeStrategies.forEach(ResumeStrategy::resume);
-    }
-
-    @Override
-    public void start() {
-        resumeStrategies.forEach(Service::start);
-    }
-
-    @Override
-    public void stop() {
-        resumeStrategies.forEach(Service::stop);
-    }
-
-    @Override
-    public void build() {
-        resumeStrategies.forEach(Service::build);
-    }
-
-    @Override
-    public void init() {
-        resumeStrategies.forEach(Service::init);
-    }
-
-    private void close(ResumeStrategy resumeStrategy) {
-        try {
-            resumeStrategy.close();
-        } catch (IOException e) {
-            LOG.warn("Failed to close resume strategy {}: {}", resumeStrategy.getClass().getSimpleName(), e.getMessage());
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        resumeStrategies.forEach(r -> close(r));
-    }
-
-    @Override
-    public String toString() {
-        return "DelegatingResumeStrategy{" +
-               "resumeStrategies=" + resumeStrategies +
-               '}';
-    }
-}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
index a09d04591e6..aa347da63e1 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
@@ -19,9 +19,9 @@ package org.apache.camel.processor.resume;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
-import org.apache.camel.Resumable;
-import org.apache.camel.ResumeStrategy;
-import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ExchangeHelper;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
index 240376d5acf..34b2b475c7e 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
@@ -29,7 +29,7 @@ import org.apache.camel.ExtendedExchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.Synchronization;
@@ -67,6 +67,13 @@ public class ResumableProcessor extends AsyncProcessorSupport
         super.doStart();
     }
 
+    @Override
+    protected void doStop() throws Exception {
+        LOG.info("Stopping the resumable strategy: {}", resumeStrategy.getClass().getSimpleName());
+        resumeStrategy.stop();
+        super.doStop();
+    }
+
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         final Synchronization onCompletion = new ResumableCompletion(resumeStrategy, loggingLevel);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
similarity index 54%
copy from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
copy to core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 388f47fa3e5..b0ffd26c080 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -15,33 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.processor.resume;
 
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
-import org.apache.camel.resume.Offsets;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.ResumeStrategy;
 
-public class KafkaResumable implements Resumable<String, String> {
-    private final String partition;
-    private String offset;
+/**
+ * A resume strategy that keeps all the resume strategy information in memory. This is hardly useful for production
+ * level implementations, but can be useful for testing the resume strategies
+ */
+public class TransientResumeStrategy implements ResumeStrategy {
+    private final ResumeAdapter resumeAdapter;
 
-    public KafkaResumable(String partition, String offset) {
-        this.partition = partition;
-        this.offset = offset;
+    public TransientResumeStrategy(ResumeAdapter resumeAdapter) {
+        this.resumeAdapter = resumeAdapter;
     }
 
     @Override
-    public void updateLastOffset(String offset) {
-        this.offset = offset;
+    public ResumeAdapter getAdapter() {
+        return resumeAdapter;
     }
 
     @Override
-    public Offset<String> getLastOffset() {
-        return Offsets.of(offset);
+    public void start() {
+        // this is NO-OP
     }
 
     @Override
-    public String getAddressable() {
-        return partition;
+    public void stop() {
+        // this is NO-OP
     }
 }
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
index 0626fcb05e5..f67406b871d 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/PausableReifier.java
@@ -19,12 +19,12 @@ package org.apache.camel.reifier;
 
 import java.util.function.Predicate;
 
-import org.apache.camel.ConsumerListener;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.model.PausableDefinition;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.processor.PausableProcessor;
+import org.apache.camel.resume.ConsumerListener;
 import org.apache.camel.util.ObjectHelper;
 
 public class PausableReifier extends ProcessorReifier<PausableDefinition> {
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
index c96b14a941c..da5c21e9dc1 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
@@ -18,11 +18,11 @@ package org.apache.camel.reifier;
 
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Processor;
-import org.apache.camel.ResumeStrategy;
 import org.apache.camel.Route;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.ResumableDefinition;
 import org.apache.camel.processor.resume.ResumableProcessor;
+import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.util.ObjectHelper;
 
 public class ResumableReifier extends ProcessorReifier<ResumableDefinition> {
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index f258e5e32c9..9053c2ca3d3 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -20,25 +20,27 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
-import org.apache.camel.Resumable;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.UpdatableConsumerResumeStrategy;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.consumer.GenericFileResumable;
-import org.apache.camel.component.file.consumer.GenericFileResumeStrategy;
+import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.Resumables;
+import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 
 public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport {
 
-    private static class TestResumeStrategy implements GenericFileResumeStrategy<File> {
+    private static class TestFileResumeAdapter implements GenericFileResumeAdapter {
         @Override
         public void resume(GenericFileResumable<File> resumable) {
             if (!resumable.getAddressable().getName().startsWith("resume-from-offset")) {
@@ -55,17 +57,12 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
         }
 
         @Override
-        public void start() {
-
-        }
-
-        @Override
-        public void stop() {
-
+        public Optional<Long> getLastOffset(File addressable) {
+            return Optional.empty();
         }
     }
 
-    private static class FailResumeStrategy extends TestResumeStrategy
+    private static class FailResumeAdapter extends TestFileResumeAdapter
             implements UpdatableConsumerResumeStrategy<File, Long, Resumable<File, Long>> {
         private boolean called;
 
@@ -75,7 +72,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
         }
     }
 
-    private static final FailResumeStrategy FAIL_RESUME_STRATEGY = new FailResumeStrategy();
+    private static final TransientResumeStrategy FAIL_RESUME_STRATEGY = new TransientResumeStrategy(new FailResumeAdapter());
 
     @DisplayName("Tests whether it can resume from an offset")
     @Test
@@ -106,7 +103,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
 
         List<Exchange> exchangeList = mock.getExchanges();
         Assertions.assertFalse(exchangeList.isEmpty(), "It should have received a few messages");
-        Assertions.assertFalse(FAIL_RESUME_STRATEGY.called);
+        Assertions.assertFalse(((FailResumeAdapter) FAIL_RESUME_STRATEGY.getAdapter()).called);
     }
 
     @DisplayName("Tests whether we can start from the beginning (i.e.: no resume strategy)")
@@ -127,7 +124,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
             @Override
             public void configure() {
 
-                bindToRegistry("myResumeStrategy", new TestResumeStrategy());
+                bindToRegistry("myResumeStrategy", new TransientResumeStrategy(new TestFileResumeAdapter()));
                 bindToRegistry("resumeNotToBeCalledStrategy", FAIL_RESUME_STRATEGY);
 
                 from(fileUri("resumeOff?noop=true&recursive=true"))
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index c8308b0b0ff..b0f6ef96026 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -25,14 +25,15 @@ import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.consumer.FileResumeSet;
-import org.apache.camel.component.file.consumer.FileSetResumeStrategy;
+import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.resume.Resumables;
 import org.junit.jupiter.api.Test;
 
 public class FileConsumerResumeStrategyTest extends ContextTestSupport {
 
-    private static class TestResumeStrategy implements FileSetResumeStrategy {
+    private static class TestFileSetResumeAdapter implements FileSetResumeAdapter {
         private List<String> processedFiles = Arrays.asList("0.txt", "1.txt", "2.txt");
         private FileResumeSet resumeSet;
 
@@ -49,16 +50,6 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
                 resumeSet.resumeEach(f -> !processedFiles.contains(f.getName()));
             }
         }
-
-        @Override
-        public void start() {
-
-        }
-
-        @Override
-        public void stop() {
-
-        }
     }
 
     private static Map<String, Object> headerFor(int num) {
@@ -99,7 +90,7 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
             @Override
             public void configure() throws Exception {
 
-                bindToRegistry("testResumeStrategy", new TestResumeStrategy());
+                bindToRegistry("testResumeStrategy", new TransientResumeStrategy(new TestFileSetResumeAdapter()));
 
                 from(fileUri("resume?noop=true&recursive=true"))
                         .resumable("testResumeStrategy")
diff --git a/core/camel-support/src/main/java/org/apache/camel/resume/Offsets.java b/core/camel-support/src/main/java/org/apache/camel/resume/Offsets.java
index a7f46836879..ab5421af693 100644
--- a/core/camel-support/src/main/java/org/apache/camel/resume/Offsets.java
+++ b/core/camel-support/src/main/java/org/apache/camel/resume/Offsets.java
@@ -17,8 +17,6 @@
 
 package org.apache.camel.resume;
 
-import org.apache.camel.Offset;
-
 /**
  * Offset handling support
  */
diff --git a/core/camel-support/src/main/java/org/apache/camel/resume/Resumables.java b/core/camel-support/src/main/java/org/apache/camel/resume/Resumables.java
index 38bfed33c69..05a931cfd6b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/resume/Resumables.java
+++ b/core/camel-support/src/main/java/org/apache/camel/resume/Resumables.java
@@ -17,9 +17,6 @@
 
 package org.apache.camel.resume;
 
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
-
 /**
  * A wrapper for resumable entities
  */