You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/06/17 14:13:17 UTC

[camel] branch main updated (31c59ab989b -> 75a75ca47b0)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from 31c59ab989b CAMEL-18151: camel-jbang - Export should be a single command making it easier to use from ci/cd
     new 72af230f652 CAMEL-18128: use a single constant for commonly addressing the resume strategy
     new 26086db62a3 CAMEL-18128: delay the initialization of the producer/consumer
     new e8896bc3ab4 CAMEL-18128: allow determining the desirable resume cache fill policy
     new 75a75ca47b0 CAMEL-18128: provide a way for integrations to configure specific details for the resume strategy

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kafka/KafkaResumeStrategyConfiguration.java    |  61 +++++++
 .../KafkaResumeStrategyConfigurationBuilder.java   | 187 +++++++++++++++++++++
 .../resume/kafka/MultiNodeKafkaResumeStrategy.java |  52 ++----
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 128 ++++----------
 .../java/org/apache/camel/resume/Cacheable.java    |  24 +++
 .../org/apache/camel/resume/ResumeStrategy.java    |   1 +
 ...Aware.java => ResumeStrategyConfiguration.java} |  22 ++-
 ...ava => ResumeStrategyConfigurationBuilder.java} |  21 ++-
 .../org/apache/camel/reifier/ResumableReifier.java |   2 +-
 .../BasicResumeStrategyConfigurationBuilder.java   |  41 +++++
 10 files changed, 381 insertions(+), 158 deletions(-)
 create mode 100644 components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
 create mode 100644 components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
 copy core/camel-api/src/main/java/org/apache/camel/resume/{ConsumerListenerAware.java => ResumeStrategyConfiguration.java} (57%)
 copy core/camel-api/src/main/java/org/apache/camel/resume/{ConsumerListenerAware.java => ResumeStrategyConfigurationBuilder.java} (53%)
 create mode 100644 core/camel-support/src/main/java/org/apache/camel/support/resume/BasicResumeStrategyConfigurationBuilder.java


[camel] 02/04: CAMEL-18128: delay the initialization of the producer/consumer

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 26086db62a3667f19cfbe34f0acdec25790bcc90
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Jun 16 17:02:32 2022 +0200

    CAMEL-18128: delay the initialization of the producer/consumer
    
    This should allow certain parameters of the the client classes to be
    adjusted according to the adapter
---
 .../resume/kafka/SingleNodeKafkaResumeStrategy.java      | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 206293826f7..71d8ad26781 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -178,6 +178,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     @Override
     public void updateLastOffset(T offset) throws Exception {
+        createProducer();
+
         OffsetKey<?> key = offset.getOffsetKey();
         Offset<?> offsetValue = offset.getLastOffset();
 
@@ -197,6 +199,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
      * @throws Exception
      */
     public void loadCache() throws Exception {
+        createConsumer();
+
         subscribe();
 
         LOG.debug("Loading records from topic {}", topic);
@@ -391,17 +395,21 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     @Override
     public void init() {
-
         LOG.debug("Initializing the Kafka resume strategy");
-        if (consumer == null) {
-            consumer = new KafkaConsumer<>(consumerConfig);
-        }
+    }
 
+    private void createProducer() {
         if (producer == null) {
             producer = new KafkaProducer<>(producerConfig);
         }
     }
 
+    private void createConsumer() {
+        if (consumer == null) {
+            consumer = new KafkaConsumer<>(consumerConfig);
+        }
+    }
+
     @Override
     public void stop() {
         LOG.info("Closing the Kafka producer");


[camel] 01/04: CAMEL-18128: use a single constant for commonly addressing the resume strategy

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 72af230f652dd0cd4d24f6502d143367f52879fa
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Jun 16 16:56:11 2022 +0200

    CAMEL-18128: use a single constant for commonly addressing the resume strategy
---
 .../camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java | 1 +
 .../src/main/java/org/apache/camel/reifier/ResumableReifier.java        | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index 0c30dab50bf..26039f712de 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -24,6 +24,7 @@ import org.apache.camel.Service;
  * processing records.
  */
 public interface ResumeStrategy extends Service {
+    String DEFAULT_NAME = "resumeStrategy";
 
     /**
      * Sets an adapter for resuming operations with this strategy
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
index be76af8c7ae..7c08bde948c 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
@@ -36,7 +36,7 @@ public class ResumableReifier extends ProcessorReifier<ResumableDefinition> {
         Processor childProcessor = createChildProcessor(false);
 
         ResumeStrategy resumeStrategy = resolveResumeStrategy();
-        ObjectHelper.notNull(resumeStrategy, "resumeStrategy", definition);
+        ObjectHelper.notNull(resumeStrategy, ResumeStrategy.DEFAULT_NAME, definition);
 
         route.setResumeStrategy(resumeStrategy);
         LoggingLevel loggingLevel = resolveLoggingLevel();


[camel] 03/04: CAMEL-18128: allow determining the desirable resume cache fill policy

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e8896bc3ab473086efd504d52542414b03a67b01
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Jun 16 17:26:26 2022 +0200

    CAMEL-18128: allow determining the desirable resume cache fill policy
---
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 10 +++++++++
 .../java/org/apache/camel/resume/Cacheable.java    | 24 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 71d8ad26781..4de3f0acc7a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -199,6 +199,16 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
      * @throws Exception
      */
     public void loadCache() throws Exception {
+        if (adapter instanceof Cacheable) {
+            Cacheable cacheable = (Cacheable) adapter;
+
+            if (cacheable.getFillPolicy() == Cacheable.FillPolicy.MAXIMIZING) {
+                consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+            } else {
+                consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+            }
+        }
+
         createConsumer();
 
         subscribe();
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java b/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
index 157d3689fe0..e11337c4464 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
@@ -24,6 +24,21 @@ import org.apache.camel.resume.cache.ResumeCache;
  */
 public interface Cacheable {
 
+    /**
+     * The cache fill policy can be used to determine how this cache should be filled with data.
+     */
+    enum FillPolicy {
+        /**
+         * With MAXIMIZING, entities try to maximize cache usage and fill it with as much data as possible
+         */
+        MAXIMIZING,
+
+        /**
+         * With MINIMIZING, entities should fill it with as little data as reasonable.
+         */
+        MINIMIZING,
+    }
+
     /**
      * Adds an offset key and value to the cache
      * 
@@ -46,4 +61,13 @@ public interface Cacheable {
      * @return A resume cache instance
      */
     ResumeCache<?> getCache();
+
+    /**
+     * Gets the {@Link FillPolicy} for this cache instance
+     *
+     * @return the fill policy set for this instance FillPolicy.MAXIMIZING
+     */
+    default FillPolicy getFillPolicy() {
+        return FillPolicy.MAXIMIZING;
+    }
 }


[camel] 04/04: CAMEL-18128: provide a way for integrations to configure specific details for the resume strategy

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 75a75ca47b0fbea3ef61a7a774b003b9f9800e20
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jun 17 15:01:45 2022 +0200

    CAMEL-18128: provide a way for integrations to configure specific details for the resume strategy
---
 .../kafka/KafkaResumeStrategyConfiguration.java    |  61 +++++++
 .../KafkaResumeStrategyConfigurationBuilder.java   | 187 +++++++++++++++++++++
 .../resume/kafka/MultiNodeKafkaResumeStrategy.java |  52 ++----
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 124 +++-----------
 .../camel/resume/ResumeStrategyConfiguration.java  |  43 +++++
 .../resume/ResumeStrategyConfigurationBuilder.java |  42 +++++
 .../BasicResumeStrategyConfigurationBuilder.java   |  41 +++++
 7 files changed, 404 insertions(+), 146 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
new file mode 100644
index 00000000000..2e314079223
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume.kafka;
+
+import java.util.Properties;
+
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+
+/**
+ * A configuration suitable for using with the {@link KafkaResumeStrategy} and any of its implementations
+ */
+public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguration {
+    private Properties producerProperties;
+    private Properties consumerProperties;
+    private String topic;
+
+    public Properties getProducerProperties() {
+        return producerProperties;
+    }
+
+    void setProducerProperties(Properties producerProperties) {
+        assert producerProperties != null;
+
+        this.producerProperties = producerProperties;
+    }
+
+    public Properties getConsumerProperties() {
+        return consumerProperties;
+    }
+
+    void setConsumerProperties(Properties consumerProperties) {
+        assert consumerProperties != null;
+
+        this.consumerProperties = consumerProperties;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    void setTopic(String topic) {
+        assert topic != null;
+
+        this.topic = topic;
+    }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
new file mode 100644
index 00000000000..150936148d0
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume.kafka;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.support.resume.BasicResumeStrategyConfigurationBuilder;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StringHelper;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A configuration builder appropriate for building configurations for the {@link SingleNodeKafkaResumeStrategy},
+ * {@link MultiNodeKafkaResumeStrategy} and any of their subclasses
+ */
+public class KafkaResumeStrategyConfigurationBuilder
+        extends
+        BasicResumeStrategyConfigurationBuilder<KafkaResumeStrategyConfigurationBuilder, KafkaResumeStrategyConfiguration> {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaResumeStrategyConfigurationBuilder.class);
+
+    private Properties producerProperties;
+    private Properties consumerProperties;
+    private String topic;
+
+    private KafkaResumeStrategyConfigurationBuilder() {
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder(Properties producerProperties, Properties consumerProperties) {
+        this.producerProperties = ObjectHelper.notNull(producerProperties, "producerProperties");
+        this.consumerProperties = ObjectHelper.notNull(consumerProperties, "consumerProperties");
+    }
+
+    @Override
+    public KafkaResumeStrategyConfigurationBuilder withCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy) {
+        if (cacheFillPolicy == Cacheable.FillPolicy.MINIMIZING) {
+            consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+        } else {
+            consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        }
+
+        return super.withCacheFillPolicy(cacheFillPolicy);
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withProducerProperty(String key, Object value) {
+        producerProperties.put(key, value);
+
+        return this;
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withConsumerProperty(String key, Object value) {
+        consumerProperties.put(key, value);
+
+        return this;
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withGroupId(String value) {
+        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, value);
+
+        return this;
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withEnableAutoCommit(boolean value) {
+        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.valueOf(value));
+
+        return this;
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withBootstrapServers(String value) {
+        final String bootstrapServers = StringHelper.notEmpty(value, "bootstrapServers");
+
+        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+        return this;
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withTopic(String value) {
+        this.topic = value;
+
+        return this;
+    }
+
+    /**
+     * Creates a basic consumer
+     *
+     * @return A set of default properties for consuming byte-based key/pair records from Kafka
+     */
+    public static Properties createConsumerProperties() {
+        Properties config = new Properties();
+
+        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        return config;
+    }
+
+    /**
+     * Creates a basic producer
+     *
+     * @return A set of default properties for producing byte-based key/pair records from Kafka
+     */
+    public static Properties createProducerProperties() {
+        Properties config = new Properties();
+
+        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+
+        return config;
+    }
+
+    @Override
+    public KafkaResumeStrategyConfiguration build() {
+        KafkaResumeStrategyConfiguration resumeStrategyConfiguration = new KafkaResumeStrategyConfiguration();
+
+        resumeStrategyConfiguration.setCacheFillPolicy(super.cacheFillPolicy);
+        resumeStrategyConfiguration.setConsumerProperties(consumerProperties);
+        resumeStrategyConfiguration.setProducerProperties(producerProperties);
+        resumeStrategyConfiguration.setTopic(topic);
+
+        return resumeStrategyConfiguration;
+    }
+
+    /**
+     * Creates the most basic builder possible
+     * 
+     * @return a pre-configured basic builder
+     */
+    public static KafkaResumeStrategyConfigurationBuilder newBuilder() {
+        final Properties producerProperties = KafkaResumeStrategyConfigurationBuilder.createProducerProperties();
+        final Properties consumerProperties = KafkaResumeStrategyConfigurationBuilder.createConsumerProperties();
+
+        KafkaResumeStrategyConfigurationBuilder builder = new KafkaResumeStrategyConfigurationBuilder(
+                producerProperties,
+                consumerProperties);
+
+        String groupId = UUID.randomUUID().toString();
+        LOG.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        builder.withGroupId(groupId);
+        builder.withEnableAutoCommit(true);
+        builder.withCacheFillPolicy(Cacheable.FillPolicy.MAXIMIZING);
+
+        return builder;
+    }
+
+    /**
+     * Creates an empty builder
+     * 
+     * @return an empty configuration builder
+     */
+    public static KafkaResumeStrategyConfigurationBuilder newEmptyBuilder() {
+        final Properties producerProperties = new Properties();
+        final Properties consumerProperties = new Properties();
+
+        KafkaResumeStrategyConfigurationBuilder builder = new KafkaResumeStrategyConfigurationBuilder(
+                producerProperties,
+                consumerProperties);
+
+        String groupId = UUID.randomUUID().toString();
+        LOG.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        builder.withGroupId(groupId);
+        builder.withEnableAutoCommit(true);
+
+        return builder;
+    }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
index 7ece1b23699..c887a21b84b 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
@@ -47,56 +47,26 @@ public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNod
     /**
      * Create a new instance of this class
      * 
-     * @param bootstrapServers the address of the Kafka broker
-     * @param topic            the topic where to publish the offsets
+     * @param resumeStrategyConfiguration the configuration to use for this strategy instance
      */
-    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic) {
+    public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
         // just in case users don't want to provide their own worker thread pool
-        this(bootstrapServers, topic, Executors.newSingleThreadExecutor());
+        this(resumeStrategyConfiguration, Executors.newSingleThreadExecutor());
     }
 
     /**
      * Builds an instance of this class
      *
-     * @param bootstrapServers the address of the Kafka broker
-     * @param topic            the topic where to publish the offsets
-     * @param executorService  an executor service that will run a separate thread for periodically refreshing the
-     *                         offsets
+     * @param resumeStrategyConfiguration the configuration to use for this strategy instance
+     * @param executorService             an executor service that will run a separate thread for periodically
+     *                                    refreshing the offsets
      */
 
-    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, ExecutorService executorService) {
-        super(bootstrapServers, topic);
-
-        // We need to keep refreshing the cache
-        this.executorService = executorService;
-        executorService.submit(() -> refresh());
-    }
-
-    /**
-     * Builds an instance of this class
-     *
-     * @param topic          the topic where to publish the offsets
-     * @param producerConfig the set of properties to be used by the Kafka producer within this class
-     * @param consumerConfig the set of properties to be used by the Kafka consumer within this class
-     */
-    public MultiNodeKafkaResumeStrategy(String topic, Properties producerConfig, Properties consumerConfig) {
-        this(topic, producerConfig, consumerConfig, Executors.newSingleThreadExecutor());
-    }
-
-    /**
-     * Builds an instance of this class
-     *
-     * @param topic           the topic where to publish the offsets
-     * @param producerConfig  the set of properties to be used by the Kafka producer within this class
-     * @param consumerConfig  the set of properties to be used by the Kafka consumer within this class
-     * @param executorService an executor service that will run a separate thread for periodically refreshing the
-     *                        offsets
-     */
-
-    public MultiNodeKafkaResumeStrategy(String topic, Properties producerConfig, Properties consumerConfig,
+    public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration,
                                         ExecutorService executorService) {
-        super(topic, producerConfig, consumerConfig);
+        super(resumeStrategyConfiguration);
 
+        // We need to keep refreshing the cache
         this.executorService = executorService;
         executorService.submit(() -> refresh());
     }
@@ -132,11 +102,11 @@ public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNod
     protected void refresh() {
         LOG.trace("Creating a offset cache refresher");
         try {
-            Properties prop = (Properties) getConsumerConfig().clone();
+            Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone();
             prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
 
             try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) {
-                consumer.subscribe(Collections.singletonList(getTopic()));
+                consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
 
                 poll(consumer);
             }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 4de3f0acc7a..870185127de 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -23,9 +23,7 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
-import java.util.Properties;
 import java.util.Queue;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 
@@ -38,21 +36,15 @@ import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.util.IOHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.StringHelper;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,8 +57,6 @@ import org.slf4j.LoggerFactory;
 public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements KafkaResumeStrategy<T> {
     private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
 
-    private final String topic;
-
     private Consumer<byte[], byte[]> consumer;
     private Producer<byte[], byte[]> producer;
     private Duration pollDuration = Duration.ofSeconds(1);
@@ -74,77 +64,17 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
     private final Queue<RecordError> producerErrors = new ConcurrentLinkedQueue<>();
 
     private boolean subscribed;
-    private final Properties producerConfig;
-    private final Properties consumerConfig;
     private ResumeAdapter adapter;
+    private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
 
     /**
      * Builds an instance of this class
      * 
-     * @param bootstrapServers the address of the Kafka broker
-     * @param topic            the topic where to publish the offsets
+     * @param resumeStrategyConfiguration the configuration to use for this strategy instance
      *
      */
-    public SingleNodeKafkaResumeStrategy(String bootstrapServers, String topic) {
-        this(topic, createProducer(bootstrapServers), createConsumer(bootstrapServers));
-    }
-
-    /**
-     * Builds an instance of this class
-     *
-     * @param topic          the topic where to publish the offsets
-     * @param producerConfig the set of properties to be used by the Kafka producer within this class
-     * @param consumerConfig the set of properties to be used by the Kafka consumer within this class
-     */
-    public SingleNodeKafkaResumeStrategy(String topic, Properties producerConfig,
-                                         Properties consumerConfig) {
-        this.topic = ObjectHelper.notNull(topic, "The topic must not be null");
-        this.producerConfig = producerConfig;
-        this.consumerConfig = consumerConfig;
-
-        init();
-    }
-
-    /**
-     * Creates a basic string-based producer
-     * 
-     * @param  bootstrapServers the Kafka host
-     * @return                  A set of default properties for producing string-based key/pair records from Kafka
-     */
-    public static Properties createProducer(String bootstrapServers) {
-        Properties config = new Properties();
-
-        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-
-        StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
-        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-
-        return config;
-    }
-
-    /**
-     * Creates a basic string-based consumer
-     * 
-     * @param  bootstrapServers the Kafka host
-     * @return                  A set of default properties for consuming string-based key/pair records from Kafka
-     */
-    public static Properties createConsumer(String bootstrapServers) {
-        Properties config = new Properties();
-
-        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-
-        StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
-        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-
-        String groupId = UUID.randomUUID().toString();
-        LOG.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId);
-
-        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE.toString());
-
-        return config;
+    public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
+        this.resumeStrategyConfiguration = resumeStrategyConfiguration;
     }
 
     /**
@@ -158,7 +88,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
      *
      */
     protected void produce(byte[] key, byte[] message) throws ExecutionException, InterruptedException {
-        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, key, message);
+        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(resumeStrategyConfiguration.getTopic(), key, message);
 
         producer.send(record, (recordMetadata, e) -> {
             if (e != null) {
@@ -199,21 +129,11 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
      * @throws Exception
      */
     public void loadCache() throws Exception {
-        if (adapter instanceof Cacheable) {
-            Cacheable cacheable = (Cacheable) adapter;
-
-            if (cacheable.getFillPolicy() == Cacheable.FillPolicy.MAXIMIZING) {
-                consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-            } else {
-                consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-            }
-        }
-
         createConsumer();
 
         subscribe();
 
-        LOG.debug("Loading records from topic {}", topic);
+        LOG.debug("Loading records from topic {}", resumeStrategyConfiguration.getTopic());
 
         if (!(adapter instanceof Deserializable)) {
             throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
@@ -311,9 +231,11 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         try {
             consumer.unsubscribe();
         } catch (IllegalStateException e) {
-            LOG.warn("The consumer is likely already closed. Skipping unsubscribing from {}", topic);
+            LOG.warn("The consumer is likely already closed. Skipping unsubscribing from {}",
+                    resumeStrategyConfiguration.getTopic());
         } catch (Exception e) {
-            LOG.error("Error unsubscribing from the Kafka topic {}: {}", topic, e.getMessage(), e);
+            LOG.error("Error unsubscribing from the Kafka topic {}: {}", resumeStrategyConfiguration.getTopic(), e.getMessage(),
+                    e);
         }
     }
 
@@ -370,12 +292,12 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
             ResumeCache<?> cache = ((Cacheable) adapter).getCache();
 
             if (cache.capacity() >= 1) {
-                checkAndSubscribe(topic, cache.capacity());
+                checkAndSubscribe(resumeStrategyConfiguration.getTopic(), cache.capacity());
             } else {
-                checkAndSubscribe(topic);
+                checkAndSubscribe(resumeStrategyConfiguration.getTopic());
             }
         } else {
-            checkAndSubscribe(topic);
+            checkAndSubscribe(resumeStrategyConfiguration.getTopic());
         }
     }
 
@@ -410,13 +332,13 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     private void createProducer() {
         if (producer == null) {
-            producer = new KafkaProducer<>(producerConfig);
+            producer = new KafkaProducer<>(resumeStrategyConfiguration.getProducerProperties());
         }
     }
 
     private void createConsumer() {
         if (consumer == null) {
-            consumer = new KafkaConsumer<>(consumerConfig);
+            consumer = new KafkaConsumer<>(resumeStrategyConfiguration.getConsumerProperties());
         }
     }
 
@@ -455,18 +377,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         return producer;
     }
 
-    protected Properties getProducerConfig() {
-        return producerConfig;
-    }
-
-    protected Properties getConsumerConfig() {
-        return consumerConfig;
-    }
-
-    protected String getTopic() {
-        return topic;
-    }
-
     /**
      * Clear the producer errors
      */
@@ -474,4 +384,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         producerErrors.clear();
     }
 
+    protected KafkaResumeStrategyConfiguration getResumeStrategyConfiguration() {
+        return resumeStrategyConfiguration;
+    }
+
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java
new file mode 100644
index 00000000000..441370e25ed
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.resume;
+
+/**
+ * Basic configuration holder for resume strategies
+ */
+public class ResumeStrategyConfiguration {
+    private Cacheable.FillPolicy cacheFillPolicy;
+
+    /**
+     * Gets the {@link org.apache.camel.resume.Cacheable.FillPolicy} for the cache used in the strategy
+     *
+     * @return the fill policy to use
+     */
+    public Cacheable.FillPolicy getCacheFillPolicy() {
+        return cacheFillPolicy;
+    }
+
+    /**
+     * Sets the {@link org.apache.camel.resume.Cacheable.FillPolicy} for the cache used in the strategy
+     *
+     * @param cacheFillPolicy the fill policy to use
+     */
+    public void setCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy) {
+        this.cacheFillPolicy = cacheFillPolicy;
+    }
+}
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfigurationBuilder.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfigurationBuilder.java
new file mode 100644
index 00000000000..b1992c86221
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfigurationBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.resume;
+
+/**
+ * Provides a basic interface for implementing component-specific configuration builder
+ *
+ * @param <T> The {@link ResumeStrategyConfigurationBuilder} providing the custom configuration
+ * @param <Y> The type of the {@link ResumeStrategyConfiguration} that will be built by the builder
+ */
+public interface ResumeStrategyConfigurationBuilder<
+        T extends ResumeStrategyConfigurationBuilder, Y extends ResumeStrategyConfiguration> {
+
+    /**
+     * Sets the {@link org.apache.camel.resume.Cacheable.FillPolicy} for the cache used in the strategy
+     *
+     * @param cacheFillPolicy the fill policy to use
+     */
+    T withCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy);
+
+    /**
+     * Builds the resume strategy configuration
+     *
+     * @return a new instance of the resume strategy configuration
+     */
+    Y build();
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/resume/BasicResumeStrategyConfigurationBuilder.java b/core/camel-support/src/main/java/org/apache/camel/support/resume/BasicResumeStrategyConfigurationBuilder.java
new file mode 100644
index 00000000000..3821ac3142a
--- /dev/null
+++ b/core/camel-support/src/main/java/org/apache/camel/support/resume/BasicResumeStrategyConfigurationBuilder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.support.resume;
+
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+import org.apache.camel.resume.ResumeStrategyConfigurationBuilder;
+
+/**
+ * This class implements the most basic configuration set used by all resume strategy builders
+ *
+ * @param <T> The {@link ResumeStrategyConfigurationBuilder} providing the custom configuration
+ * @param <Y> The type of the {@link ResumeStrategyConfiguration} that will be built by the builder
+ */
+public abstract class BasicResumeStrategyConfigurationBuilder<
+        T extends BasicResumeStrategyConfigurationBuilder, Y extends ResumeStrategyConfiguration>
+        implements ResumeStrategyConfigurationBuilder<T, Y> {
+    protected Cacheable.FillPolicy cacheFillPolicy = Cacheable.FillPolicy.MAXIMIZING;
+
+    @Override
+    public T withCacheFillPolicy(Cacheable.FillPolicy fillPolicy) {
+        this.cacheFillPolicy = fillPolicy;
+
+        return (T) this;
+    }
+}