You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/11/03 15:48:55 UTC

[camel] branch main updated: CAMEL-18675: support auto-instantiation for the resume strategy

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new c9b7e694b3a CAMEL-18675: support auto-instantiation for the resume strategy
c9b7e694b3a is described below

commit c9b7e694b3ae8acdc46e0e1bf9e2d2465086b75a
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Nov 2 19:13:14 2022 +0100

    CAMEL-18675: support auto-instantiation for the resume strategy
---
 .../org/apache/camel/kafka-resume-strategy         |  2 +
 .../kafka/KafkaResumeStrategyConfiguration.java    |  5 +++
 .../KafkaResumeStrategyConfigurationBuilder.java   |  3 +-
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 34 +++++++++++----
 .../org/apache/camel/write-ahead-resume-strategy   |  2 +
 .../component/wal/WriteAheadResumeStrategy.java    | 49 +++++++++++++++++-----
 .../wal/WriteAheadResumeStrategyConfiguration.java |  5 +++
 .../org/apache/camel/resume/ResumeStrategy.java    |  4 ++
 .../camel/resume/ResumeStrategyConfiguration.java  | 11 ++++-
 .../docs/modules/eips/pages/resume-strategies.adoc | 24 ++++++++++-
 .../apache/camel/model/ResumableDefinition.java    | 22 ++++++++++
 .../org/apache/camel/transient-resume-strategy     |  2 +
 .../processor/resume/TransientResumeStrategy.java  | 34 +++++++++++++++
 .../org/apache/camel/reifier/ResumableReifier.java | 28 ++++++++++++-
 14 files changed, 200 insertions(+), 25 deletions(-)

diff --git a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-resume-strategy b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-resume-strategy
new file mode 100644
index 00000000000..723ccd25e8c
--- /dev/null
+++ b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-resume-strategy
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
index 94aede6fadd..3ce847cee0e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
@@ -81,4 +81,9 @@ public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguratio
 
         this.maxInitializationRetries = maxInitializationRetries;
     }
+
+    @Override
+    public String resumeStrategyService() {
+        return "kafka-resume-strategy";
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
index 3c4e6a21ca5..e38ae808d37 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
@@ -33,8 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A configuration builder appropriate for building configurations for the {@link SingleNodeKafkaResumeStrategy},
- * {@link MultiNodeKafkaResumeStrategy} and any of their subclasses
+ * A configuration builder appropriate for building configurations for the {@link SingleNodeKafkaResumeStrategy}
  */
 public class KafkaResumeStrategyConfigurationBuilder
         extends
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 605b498cd9a..c3863b6508e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -37,7 +37,9 @@ import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
 import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.camel.spi.annotations.JdkService;
 import org.apache.camel.util.IOHelper;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -56,6 +58,7 @@ import org.slf4j.LoggerFactory;
  * A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for single node
  * integrations.
  */
+@JdkService("kafka-resume-strategy")
 public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
 
@@ -65,10 +68,14 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
 
     private boolean subscribed;
     private ResumeAdapter adapter;
-    private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
+    private KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
     private final ExecutorService executorService;
     private final ReentrantLock writeLock = new ReentrantLock();
-    private final CountDownLatch initLatch;
+    private CountDownLatch initLatch;
+
+    public SingleNodeKafkaResumeStrategy() {
+        executorService = Executors.newSingleThreadExecutor();
+    }
 
     /**
      * Builds an instance of this class
@@ -78,8 +85,6 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
     public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
         this.resumeStrategyConfiguration = resumeStrategyConfiguration;
         executorService = Executors.newSingleThreadExecutor();
-
-        initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
     }
 
     /**
@@ -91,8 +96,6 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
                                          ExecutorService executorService) {
         this.resumeStrategyConfiguration = resumeStrategyConfiguration;
         this.executorService = executorService;
-
-        initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
     }
 
     /**
@@ -170,6 +173,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
             throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
         }
 
+        initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
         executorService.submit(() -> refresh(initLatch));
     }
 
@@ -198,7 +202,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
             subscribe(consumer);
 
             LOG.debug("Loading records from topic {}", resumeStrategyConfiguration.getTopic());
-            consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
+            consumer.subscribe(Collections.singletonList(resumeStrategyConfiguration.getTopic()));
 
             poll(consumer, latch);
         } catch (WakeupException e) {
@@ -424,8 +428,20 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
         return producer;
     }
 
-    protected KafkaResumeStrategyConfiguration getResumeStrategyConfiguration() {
-        return resumeStrategyConfiguration;
+    @Override
+    public void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration) {
+        if (resumeStrategyConfiguration instanceof KafkaResumeStrategyConfiguration) {
+            this.resumeStrategyConfiguration = (KafkaResumeStrategyConfiguration) resumeStrategyConfiguration;
+        } else {
+            throw new RuntimeCamelException(
+                    "Invalid resume strategy configuration of type " +
+                                            resumeStrategyConfiguration == null
+                                                    ? "null" : resumeStrategyConfiguration.getClass().getName());
+        }
     }
 
+    @Override
+    public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
+        return resumeStrategyConfiguration;
+    }
 }
diff --git a/components/camel-wal/src/generated/resources/META-INF/services/org/apache/camel/write-ahead-resume-strategy b/components/camel-wal/src/generated/resources/META-INF/services/org/apache/camel/write-ahead-resume-strategy
new file mode 100644
index 00000000000..3f966bb262f
--- /dev/null
+++ b/components/camel-wal/src/generated/resources/META-INF/services/org/apache/camel/write-ahead-resume-strategy
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.wal.WriteAheadResumeStrategy
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
index 83a72e3fa82..2bbf92ca3a1 100644
--- a/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
@@ -21,12 +21,15 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.resume.Deserializable;
 import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+import org.apache.camel.spi.annotations.JdkService;
 import org.apache.camel.support.resume.OffsetKeys;
 import org.apache.camel.support.resume.Offsets;
 import org.slf4j.Logger;
@@ -41,6 +44,7 @@ import org.slf4j.LoggerFactory;
  * Among other things, it implements data recovery on startup, so that records cached locally, are automatically
  * recovered
  */
+@JdkService("write-ahead-resume-strategy")
 public class WriteAheadResumeStrategy implements ResumeStrategy {
 
     /**
@@ -64,22 +68,25 @@ public class WriteAheadResumeStrategy implements ResumeStrategy {
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(WriteAheadResumeStrategy.class);
-    private final File logFile;
-    private final LogWriter logWriter;
-    private final ResumeStrategy resumeStrategy;
+    private File logFile;
+    private LogWriter logWriter;
+    private ResumeStrategy resumeStrategy;
+    private WriteAheadResumeStrategyConfiguration resumeStrategyConfiguration;
 
     /**
      * Creates a new write-ahead resume strategy
-     * 
-     * @param  resumeStrategyConfiguration the configuration to use for this strategy instance
-     * @throws IOException
      */
-    public WriteAheadResumeStrategy(WriteAheadResumeStrategyConfiguration resumeStrategyConfiguration) throws IOException {
-        this.logFile = resumeStrategyConfiguration.getLogFile();
-        this.resumeStrategy = resumeStrategyConfiguration.getDelegateResumeStrategy();
+    public WriteAheadResumeStrategy() {
 
-        DefaultLogSupervisor flushPolicy = new DefaultLogSupervisor(resumeStrategyConfiguration.getSupervisorInterval());
-        logWriter = new LogWriter(logFile, flushPolicy);
+    }
+
+    /**
+     * Creates a new write-ahead resume strategy
+     * 
+     * @param resumeStrategyConfiguration the configuration to use for this strategy instance
+     */
+    public WriteAheadResumeStrategy(WriteAheadResumeStrategyConfiguration resumeStrategyConfiguration) {
+        this.resumeStrategyConfiguration = resumeStrategyConfiguration;
     }
 
     @Override
@@ -294,6 +301,16 @@ public class WriteAheadResumeStrategy implements ResumeStrategy {
 
     @Override
     public void start() {
+        try {
+            this.logFile = resumeStrategyConfiguration.getLogFile();
+            this.resumeStrategy = resumeStrategyConfiguration.getDelegateResumeStrategy();
+
+            DefaultLogSupervisor flushPolicy = new DefaultLogSupervisor(resumeStrategyConfiguration.getSupervisorInterval());
+            logWriter = new LogWriter(logFile, flushPolicy);
+        } catch (Exception e) {
+            throw new RuntimeCamelException(e);
+        }
+
         resumeStrategy.start();
     }
 
@@ -307,4 +324,14 @@ public class WriteAheadResumeStrategy implements ResumeStrategy {
         logWriter.close();
         LOG.trace("Writer is closed");
     }
+
+    @Override
+    public void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration) {
+        this.resumeStrategyConfiguration = (WriteAheadResumeStrategyConfiguration) resumeStrategyConfiguration;
+    }
+
+    @Override
+    public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
+        return resumeStrategyConfiguration;
+    }
 }
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategyConfiguration.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategyConfiguration.java
index 8b90656d9f0..2ec93102d23 100644
--- a/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategyConfiguration.java
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategyConfiguration.java
@@ -52,4 +52,9 @@ public class WriteAheadResumeStrategyConfiguration extends ResumeStrategyConfigu
     void setSupervisorInterval(long supervisorInterval) {
         this.supervisorInterval = supervisorInterval;
     }
+
+    @Override
+    public String resumeStrategyService() {
+        return "write-ahead-resume-strategy";
+    }
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index 10aed024a65..a40720e0847 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -106,4 +106,8 @@ public interface ResumeStrategy extends Service {
      * @throws Exception      if unable to update the offset
      */
     void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset, UpdateCallBack updateCallBack) throws Exception;
+
+    void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration);
+
+    ResumeStrategyConfiguration getResumeStrategyConfiguration();
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java
index 441370e25ed..64244bca740 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.camel.resume;
 /**
  * Basic configuration holder for resume strategies
  */
-public class ResumeStrategyConfiguration {
+public abstract class ResumeStrategyConfiguration {
     private Cacheable.FillPolicy cacheFillPolicy;
 
     /**
@@ -40,4 +40,13 @@ public class ResumeStrategyConfiguration {
     public void setCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy) {
         this.cacheFillPolicy = cacheFillPolicy;
     }
+
+    /**
+     * Allows the implementation to provide custom strategy service factories. It binds to service name provided in the
+     * {@link org.apache.camel.spi.annotations.JdkService} strategy .This allows the strategy to be resolved
+     * automatically in runtime while also allowing fallback to manually constructed strategies when necessary
+     * 
+     * @return
+     */
+    public abstract String resumeStrategyService();
 }
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index 8328d99931c..19f96143fb3 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -28,7 +28,29 @@ The resume strategies comes in 3 parts:
 
 == The DSL method
 
-The route needs to use the `resumable()` method followed by a `resumableStrategy` to point to an instance of the resume strategy in use.
+The route needs to use the `resumable()` method followed by passing a strategy configuration using `configuration`.
+It is also possible to use the `resumableStrategy` to point to an instance of the resume strategy in use, although
+this is much more complex. The vast majority of the cases should use a `configuration`, in which case Camel will do
+the heavy-lifting for you.
+
+Using the resume API with the configuration should look like this:
+
+[source,java]
+----
+KafkaResumeStrategyConfigurationBuilder kafkaConfigurationBuilder = KafkaResumeStrategyConfigurationBuilder.newBuilder()
+    .withBootstrapServers("kafka-address:9092")
+    .withTopic("offset")
+    .withProducerProperty("max.block.ms", "10000")
+    .withMaxInitializationDuration(Duration.ofSeconds(5));
+
+getCamelContext().getRegistry().bind("resumeCache", new MyChoiceOfResumeCache<>(100));
+
+from("some:component")
+    .resumable().configuration(kafkaConfigurationBuilder)
+    .process(this::process);
+----
+
+=== Configuring via beans
 
 This instance can be bound in the Context registry as follows:
 
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
index 96d07951092..53b88ac605b 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
@@ -23,6 +23,8 @@ import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+import org.apache.camel.resume.ResumeStrategyConfigurationBuilder;
 import org.apache.camel.spi.Metadata;
 
 /**
@@ -48,6 +50,9 @@ public class ResumableDefinition extends NoOutputDefinition<ResumableDefinition>
     @Metadata(label = "advanced", javaType = "java.lang.Boolean", defaultValue = "false")
     private String intermittent;
 
+    @XmlTransient
+    private ResumeStrategyConfiguration resumeStrategyConfiguration;
+
     @Override
     public String getShortName() {
         return "resumable";
@@ -90,6 +95,14 @@ public class ResumableDefinition extends NoOutputDefinition<ResumableDefinition>
         this.intermittent = intermitent;
     }
 
+    public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
+        return resumeStrategyConfiguration;
+    }
+
+    public void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration) {
+        this.resumeStrategyConfiguration = resumeStrategyConfiguration;
+    }
+
     // Fluent API
     // -------------------------------------------------------------------------
 
@@ -127,6 +140,15 @@ public class ResumableDefinition extends NoOutputDefinition<ResumableDefinition>
         return this;
     }
 
+    /***
+     * Uses a configuration builder to auto-instantiate the resume strategy
+     */
+    public ResumableDefinition configuration(
+            ResumeStrategyConfigurationBuilder<? extends ResumeStrategyConfigurationBuilder, ? extends ResumeStrategyConfiguration> builder) {
+        setResumeStrategyConfiguration(builder.build());
+        return this;
+    }
+
     /**
      * Sets whether the offsets will be intermittently present or whether they must be present in every exchange
      */
diff --git a/core/camel-core-processor/src/generated/resources/META-INF/services/org/apache/camel/transient-resume-strategy b/core/camel-core-processor/src/generated/resources/META-INF/services/org/apache/camel/transient-resume-strategy
new file mode 100644
index 00000000000..7f912ccfabf
--- /dev/null
+++ b/core/camel-core-processor/src/generated/resources/META-INF/services/org/apache/camel/transient-resume-strategy
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.processor.resume.TransientResumeStrategy
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 78245e3816e..cbb418421c0 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -17,16 +17,21 @@
 
 package org.apache.camel.processor.resume;
 
+import org.apache.camel.resume.Cacheable;
 import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+import org.apache.camel.resume.ResumeStrategyConfigurationBuilder;
+import org.apache.camel.spi.annotations.JdkService;
 
 /**
  * A resume strategy that keeps all the resume strategy information in memory. This is hardly useful for production
  * level implementations, but can be useful for testing the resume strategies
  */
+@JdkService("transient-resume-strategy")
 public class TransientResumeStrategy implements ResumeStrategy {
     private final ResumeAdapter resumeAdapter;
 
@@ -64,6 +69,16 @@ public class TransientResumeStrategy implements ResumeStrategy {
         // this is NO-OP
     }
 
+    @Override
+    public void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration) {
+        // This is NO-OP
+    }
+
+    @Override
+    public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
+        return null;
+    }
+
     @Override
     public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) {
         // this is NO-OP
@@ -78,4 +93,23 @@ public class TransientResumeStrategy implements ResumeStrategy {
     public void stop() {
         // this is NO-OP
     }
+
+    public static ResumeStrategyConfigurationBuilder<ResumeStrategyConfigurationBuilder, ResumeStrategyConfiguration> configurationBuilder() {
+        return new ResumeStrategyConfigurationBuilder<>() {
+            @Override
+            public ResumeStrategyConfigurationBuilder withCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy) {
+                return this;
+            }
+
+            @Override
+            public ResumeStrategyConfiguration build() {
+                return new ResumeStrategyConfiguration() {
+                    @Override
+                    public String resumeStrategyService() {
+                        return "transient-resume-strategy";
+                    }
+                };
+            }
+        };
+    }
 }
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
index 7c08bde948c..2db23aeee80 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
@@ -16,13 +16,19 @@
  */
 package org.apache.camel.reifier;
 
+import java.util.Optional;
+
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.ResumableDefinition;
 import org.apache.camel.processor.resume.ResumableProcessor;
 import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.util.ObjectHelper;
 
 public class ResumableReifier extends ProcessorReifier<ResumableDefinition> {
@@ -49,7 +55,27 @@ public class ResumableReifier extends ProcessorReifier<ResumableDefinition> {
         ResumeStrategy strategy = definition.getResumeStrategyBean();
         if (strategy == null) {
             String ref = parseString(definition.getResumeStrategy());
-            strategy = mandatoryLookup(ref, ResumeStrategy.class);
+
+            if (ref != null) {
+                strategy = mandatoryLookup(ref, ResumeStrategy.class);
+            } else {
+                final FactoryFinder factoryFinder
+                        = camelContext.adapt(ExtendedCamelContext.class).getFactoryFinder(FactoryFinder.DEFAULT_PATH);
+
+                final ResumeStrategyConfiguration resumeStrategyConfiguration = definition.getResumeStrategyConfiguration();
+                Optional<ResumeStrategy> resumeStrategyOptional = factoryFinder.newInstance(
+                        resumeStrategyConfiguration.resumeStrategyService(), ResumeStrategy.class);
+
+                if (!resumeStrategyOptional.isPresent()) {
+                    throw new RuntimeCamelException("Cannot find a resume strategy class in the classpath or the registry");
+                }
+
+                final ResumeStrategy resumeStrategy = resumeStrategyOptional.get();
+
+                resumeStrategy.setResumeStrategyConfiguration(resumeStrategyConfiguration);
+
+                return resumeStrategy;
+            }
         }
 
         return strategy;