You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/11/03 15:48:55 UTC
[camel] branch main updated: CAMEL-18675: support auto-instantiation for the resume strategy
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new c9b7e694b3a CAMEL-18675: support auto-instantiation for the resume strategy
c9b7e694b3a is described below
commit c9b7e694b3ae8acdc46e0e1bf9e2d2465086b75a
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Nov 2 19:13:14 2022 +0100
CAMEL-18675: support auto-instantiation for the resume strategy
---
.../org/apache/camel/kafka-resume-strategy | 2 +
.../kafka/KafkaResumeStrategyConfiguration.java | 5 +++
.../KafkaResumeStrategyConfigurationBuilder.java | 3 +-
.../kafka/SingleNodeKafkaResumeStrategy.java | 34 +++++++++++----
.../org/apache/camel/write-ahead-resume-strategy | 2 +
.../component/wal/WriteAheadResumeStrategy.java | 49 +++++++++++++++++-----
.../wal/WriteAheadResumeStrategyConfiguration.java | 5 +++
.../org/apache/camel/resume/ResumeStrategy.java | 4 ++
.../camel/resume/ResumeStrategyConfiguration.java | 11 ++++-
.../docs/modules/eips/pages/resume-strategies.adoc | 24 ++++++++++-
.../apache/camel/model/ResumableDefinition.java | 22 ++++++++++
.../org/apache/camel/transient-resume-strategy | 2 +
.../processor/resume/TransientResumeStrategy.java | 34 +++++++++++++++
.../org/apache/camel/reifier/ResumableReifier.java | 28 ++++++++++++-
14 files changed, 200 insertions(+), 25 deletions(-)
diff --git a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-resume-strategy b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-resume-strategy
new file mode 100644
index 00000000000..723ccd25e8c
--- /dev/null
+++ b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-resume-strategy
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
index 94aede6fadd..3ce847cee0e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
@@ -81,4 +81,9 @@ public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguratio
this.maxInitializationRetries = maxInitializationRetries;
}
+
+ @Override
+ public String resumeStrategyService() {
+ return "kafka-resume-strategy";
+ }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
index 3c4e6a21ca5..e38ae808d37 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
@@ -33,8 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A configuration builder appropriate for building configurations for the {@link SingleNodeKafkaResumeStrategy},
- * {@link MultiNodeKafkaResumeStrategy} and any of their subclasses
+ * A configuration builder appropriate for building configurations for the {@link SingleNodeKafkaResumeStrategy}
*/
public class KafkaResumeStrategyConfigurationBuilder
extends
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 605b498cd9a..c3863b6508e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -37,7 +37,9 @@ import org.apache.camel.resume.Offset;
import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.camel.spi.annotations.JdkService;
import org.apache.camel.util.IOHelper;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -56,6 +58,7 @@ import org.slf4j.LoggerFactory;
* A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for single node
* integrations.
*/
+@JdkService("kafka-resume-strategy")
public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
@@ -65,10 +68,14 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
private boolean subscribed;
private ResumeAdapter adapter;
- private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
+ private KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
private final ExecutorService executorService;
private final ReentrantLock writeLock = new ReentrantLock();
- private final CountDownLatch initLatch;
+ private CountDownLatch initLatch;
+
+ public SingleNodeKafkaResumeStrategy() {
+ executorService = Executors.newSingleThreadExecutor();
+ }
/**
* Builds an instance of this class
@@ -78,8 +85,6 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
this.resumeStrategyConfiguration = resumeStrategyConfiguration;
executorService = Executors.newSingleThreadExecutor();
-
- initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
}
/**
@@ -91,8 +96,6 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
ExecutorService executorService) {
this.resumeStrategyConfiguration = resumeStrategyConfiguration;
this.executorService = executorService;
-
- initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
}
/**
@@ -170,6 +173,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
}
+ initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
executorService.submit(() -> refresh(initLatch));
}
@@ -198,7 +202,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
subscribe(consumer);
LOG.debug("Loading records from topic {}", resumeStrategyConfiguration.getTopic());
- consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
+ consumer.subscribe(Collections.singletonList(resumeStrategyConfiguration.getTopic()));
poll(consumer, latch);
} catch (WakeupException e) {
@@ -424,8 +428,20 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
return producer;
}
- protected KafkaResumeStrategyConfiguration getResumeStrategyConfiguration() {
- return resumeStrategyConfiguration;
+ @Override
+ public void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration) {
+ if (resumeStrategyConfiguration instanceof KafkaResumeStrategyConfiguration) {
+ this.resumeStrategyConfiguration = (KafkaResumeStrategyConfiguration) resumeStrategyConfiguration;
+ } else {
+ throw new RuntimeCamelException(
+ "Invalid resume strategy configuration of type " +
+ resumeStrategyConfiguration == null
+ ? "null" : resumeStrategyConfiguration.getClass().getName());
+ }
}
+ @Override
+ public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
+ return resumeStrategyConfiguration;
+ }
}
diff --git a/components/camel-wal/src/generated/resources/META-INF/services/org/apache/camel/write-ahead-resume-strategy b/components/camel-wal/src/generated/resources/META-INF/services/org/apache/camel/write-ahead-resume-strategy
new file mode 100644
index 00000000000..3f966bb262f
--- /dev/null
+++ b/components/camel-wal/src/generated/resources/META-INF/services/org/apache/camel/write-ahead-resume-strategy
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.wal.WriteAheadResumeStrategy
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
index 83a72e3fa82..2bbf92ca3a1 100644
--- a/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
@@ -21,12 +21,15 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.resume.Deserializable;
import org.apache.camel.resume.Offset;
import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+import org.apache.camel.spi.annotations.JdkService;
import org.apache.camel.support.resume.OffsetKeys;
import org.apache.camel.support.resume.Offsets;
import org.slf4j.Logger;
@@ -41,6 +44,7 @@ import org.slf4j.LoggerFactory;
* Among other things, it implements data recovery on startup, so that records cached locally, are automatically
* recovered
*/
+@JdkService("write-ahead-resume-strategy")
public class WriteAheadResumeStrategy implements ResumeStrategy {
/**
@@ -64,22 +68,25 @@ public class WriteAheadResumeStrategy implements ResumeStrategy {
}
private static final Logger LOG = LoggerFactory.getLogger(WriteAheadResumeStrategy.class);
- private final File logFile;
- private final LogWriter logWriter;
- private final ResumeStrategy resumeStrategy;
+ private File logFile;
+ private LogWriter logWriter;
+ private ResumeStrategy resumeStrategy;
+ private WriteAheadResumeStrategyConfiguration resumeStrategyConfiguration;
/**
* Creates a new write-ahead resume strategy
- *
- * @param resumeStrategyConfiguration the configuration to use for this strategy instance
- * @throws IOException
*/
- public WriteAheadResumeStrategy(WriteAheadResumeStrategyConfiguration resumeStrategyConfiguration) throws IOException {
- this.logFile = resumeStrategyConfiguration.getLogFile();
- this.resumeStrategy = resumeStrategyConfiguration.getDelegateResumeStrategy();
+ public WriteAheadResumeStrategy() {
- DefaultLogSupervisor flushPolicy = new DefaultLogSupervisor(resumeStrategyConfiguration.getSupervisorInterval());
- logWriter = new LogWriter(logFile, flushPolicy);
+ }
+
+ /**
+ * Creates a new write-ahead resume strategy
+ *
+ * @param resumeStrategyConfiguration the configuration to use for this strategy instance
+ */
+ public WriteAheadResumeStrategy(WriteAheadResumeStrategyConfiguration resumeStrategyConfiguration) {
+ this.resumeStrategyConfiguration = resumeStrategyConfiguration;
}
@Override
@@ -294,6 +301,16 @@ public class WriteAheadResumeStrategy implements ResumeStrategy {
@Override
public void start() {
+ try {
+ this.logFile = resumeStrategyConfiguration.getLogFile();
+ this.resumeStrategy = resumeStrategyConfiguration.getDelegateResumeStrategy();
+
+ DefaultLogSupervisor flushPolicy = new DefaultLogSupervisor(resumeStrategyConfiguration.getSupervisorInterval());
+ logWriter = new LogWriter(logFile, flushPolicy);
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+
resumeStrategy.start();
}
@@ -307,4 +324,14 @@ public class WriteAheadResumeStrategy implements ResumeStrategy {
logWriter.close();
LOG.trace("Writer is closed");
}
+
+ @Override
+ public void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration) {
+ this.resumeStrategyConfiguration = (WriteAheadResumeStrategyConfiguration) resumeStrategyConfiguration;
+ }
+
+ @Override
+ public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
+ return resumeStrategyConfiguration;
+ }
}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategyConfiguration.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategyConfiguration.java
index 8b90656d9f0..2ec93102d23 100644
--- a/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategyConfiguration.java
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategyConfiguration.java
@@ -52,4 +52,9 @@ public class WriteAheadResumeStrategyConfiguration extends ResumeStrategyConfigu
void setSupervisorInterval(long supervisorInterval) {
this.supervisorInterval = supervisorInterval;
}
+
+ @Override
+ public String resumeStrategyService() {
+ return "write-ahead-resume-strategy";
+ }
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index 10aed024a65..a40720e0847 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -106,4 +106,8 @@ public interface ResumeStrategy extends Service {
* @throws Exception if unable to update the offset
*/
void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset, UpdateCallBack updateCallBack) throws Exception;
+
+ void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration);
+
+ ResumeStrategyConfiguration getResumeStrategyConfiguration();
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java
index 441370e25ed..64244bca740 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.camel.resume;
/**
* Basic configuration holder for resume strategies
*/
-public class ResumeStrategyConfiguration {
+public abstract class ResumeStrategyConfiguration {
private Cacheable.FillPolicy cacheFillPolicy;
/**
@@ -40,4 +40,13 @@ public class ResumeStrategyConfiguration {
public void setCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy) {
this.cacheFillPolicy = cacheFillPolicy;
}
+
+ /**
+ * Allows the implementation to provide custom strategy service factories. It binds to service name provided in the
+ * {@link org.apache.camel.spi.annotations.JdkService} strategy .This allows the strategy to be resolved
+ * automatically in runtime while also allowing fallback to manually constructed strategies when necessary
+ *
+ * @return
+ */
+ public abstract String resumeStrategyService();
}
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index 8328d99931c..19f96143fb3 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -28,7 +28,29 @@ The resume strategies comes in 3 parts:
== The DSL method
-The route needs to use the `resumable()` method followed by a `resumableStrategy` to point to an instance of the resume strategy in use.
+The route needs to use the `resumable()` method followed by passing a strategy configuration using `configuration`.
+It is also possible to use the `resumableStrategy` to point to an instance of the resume strategy in use, although
+this is much more complex. The vast majority of the cases should use a `configuration`, in which case Camel will do
+the heavy-lifting for you.
+
+Using the resume API with the configuration should look like this:
+
+[source,java]
+----
+KafkaResumeStrategyConfigurationBuilder kafkaConfigurationBuilder = KafkaResumeStrategyConfigurationBuilder.newBuilder()
+ .withBootstrapServers("kafka-address:9092")
+ .withTopic("offset")
+ .withProducerProperty("max.block.ms", "10000")
+ .withMaxInitializationDuration(Duration.ofSeconds(5));
+
+getCamelContext().getRegistry().bind("resumeCache", new MyChoiceOfResumeCache<>(100));
+
+from("some:component")
+ .resumable().configuration(kafkaConfigurationBuilder)
+ .process(this::process);
+----
+
+=== Configuring via beans
This instance can be bound in the Context registry as follows:
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
index 96d07951092..53b88ac605b 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
@@ -23,6 +23,8 @@ import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+import org.apache.camel.resume.ResumeStrategyConfigurationBuilder;
import org.apache.camel.spi.Metadata;
/**
@@ -48,6 +50,9 @@ public class ResumableDefinition extends NoOutputDefinition<ResumableDefinition>
@Metadata(label = "advanced", javaType = "java.lang.Boolean", defaultValue = "false")
private String intermittent;
+ @XmlTransient
+ private ResumeStrategyConfiguration resumeStrategyConfiguration;
+
@Override
public String getShortName() {
return "resumable";
@@ -90,6 +95,14 @@ public class ResumableDefinition extends NoOutputDefinition<ResumableDefinition>
this.intermittent = intermitent;
}
+ public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
+ return resumeStrategyConfiguration;
+ }
+
+ public void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration) {
+ this.resumeStrategyConfiguration = resumeStrategyConfiguration;
+ }
+
// Fluent API
// -------------------------------------------------------------------------
@@ -127,6 +140,15 @@ public class ResumableDefinition extends NoOutputDefinition<ResumableDefinition>
return this;
}
+ /***
+ * Uses a configuration builder to auto-instantiate the resume strategy
+ */
+ public ResumableDefinition configuration(
+ ResumeStrategyConfigurationBuilder<? extends ResumeStrategyConfigurationBuilder, ? extends ResumeStrategyConfiguration> builder) {
+ setResumeStrategyConfiguration(builder.build());
+ return this;
+ }
+
/**
* Sets whether the offsets will be intermittently present or whether they must be present in every exchange
*/
diff --git a/core/camel-core-processor/src/generated/resources/META-INF/services/org/apache/camel/transient-resume-strategy b/core/camel-core-processor/src/generated/resources/META-INF/services/org/apache/camel/transient-resume-strategy
new file mode 100644
index 00000000000..7f912ccfabf
--- /dev/null
+++ b/core/camel-core-processor/src/generated/resources/META-INF/services/org/apache/camel/transient-resume-strategy
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.processor.resume.TransientResumeStrategy
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 78245e3816e..cbb418421c0 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -17,16 +17,21 @@
package org.apache.camel.processor.resume;
+import org.apache.camel.resume.Cacheable;
import org.apache.camel.resume.Offset;
import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+import org.apache.camel.resume.ResumeStrategyConfigurationBuilder;
+import org.apache.camel.spi.annotations.JdkService;
/**
* A resume strategy that keeps all the resume strategy information in memory. This is hardly useful for production
* level implementations, but can be useful for testing the resume strategies
*/
+@JdkService("transient-resume-strategy")
public class TransientResumeStrategy implements ResumeStrategy {
private final ResumeAdapter resumeAdapter;
@@ -64,6 +69,16 @@ public class TransientResumeStrategy implements ResumeStrategy {
// this is NO-OP
}
+ @Override
+ public void setResumeStrategyConfiguration(ResumeStrategyConfiguration resumeStrategyConfiguration) {
+ // This is NO-OP
+ }
+
+ @Override
+ public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
+ return null;
+ }
+
@Override
public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset) {
// this is NO-OP
@@ -78,4 +93,23 @@ public class TransientResumeStrategy implements ResumeStrategy {
public void stop() {
// this is NO-OP
}
+
+ public static ResumeStrategyConfigurationBuilder<ResumeStrategyConfigurationBuilder, ResumeStrategyConfiguration> configurationBuilder() {
+ return new ResumeStrategyConfigurationBuilder<>() {
+ @Override
+ public ResumeStrategyConfigurationBuilder withCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy) {
+ return this;
+ }
+
+ @Override
+ public ResumeStrategyConfiguration build() {
+ return new ResumeStrategyConfiguration() {
+ @Override
+ public String resumeStrategyService() {
+ return "transient-resume-strategy";
+ }
+ };
+ }
+ };
+ }
}
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
index 7c08bde948c..2db23aeee80 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
@@ -16,13 +16,19 @@
*/
package org.apache.camel.reifier;
+import java.util.Optional;
+
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
import org.apache.camel.Route;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.ResumableDefinition;
import org.apache.camel.processor.resume.ResumableProcessor;
import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+import org.apache.camel.spi.FactoryFinder;
import org.apache.camel.util.ObjectHelper;
public class ResumableReifier extends ProcessorReifier<ResumableDefinition> {
@@ -49,7 +55,27 @@ public class ResumableReifier extends ProcessorReifier<ResumableDefinition> {
ResumeStrategy strategy = definition.getResumeStrategyBean();
if (strategy == null) {
String ref = parseString(definition.getResumeStrategy());
- strategy = mandatoryLookup(ref, ResumeStrategy.class);
+
+ if (ref != null) {
+ strategy = mandatoryLookup(ref, ResumeStrategy.class);
+ } else {
+ final FactoryFinder factoryFinder
+ = camelContext.adapt(ExtendedCamelContext.class).getFactoryFinder(FactoryFinder.DEFAULT_PATH);
+
+ final ResumeStrategyConfiguration resumeStrategyConfiguration = definition.getResumeStrategyConfiguration();
+ Optional<ResumeStrategy> resumeStrategyOptional = factoryFinder.newInstance(
+ resumeStrategyConfiguration.resumeStrategyService(), ResumeStrategy.class);
+
+ if (!resumeStrategyOptional.isPresent()) {
+ throw new RuntimeCamelException("Cannot find a resume strategy class in the classpath or the registry");
+ }
+
+ final ResumeStrategy resumeStrategy = resumeStrategyOptional.get();
+
+ resumeStrategy.setResumeStrategyConfiguration(resumeStrategyConfiguration);
+
+ return resumeStrategy;
+ }
}
return strategy;