You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/11/03 15:49:07 UTC

[camel-examples] 02/02: CAMEL-18148: added support for configuration builders in camel-wal

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git

commit c62831bdd6b6a469cec39aebb5adf51c9b6d1121
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Nov 3 15:29:41 2022 +0100

    CAMEL-18148: added support for configuration builders in camel-wal
---
 .../resume/aws/kinesis/main/KinesisRoute.java      | 11 +++-----
 .../example/resume/aws/kinesis/main/MainApp.java   | 21 +-------------
 .../example/resume/strategies/kafka/KafkaUtil.java | 33 +++++++++++++---------
 .../kafka/file/LargeFileRouteBuilder.java          | 13 ++++-----
 .../kafka/fileset/LargeDirectoryRouteBuilder.java  | 23 +++++++++------
 .../example/resume/file/offset/main/MainApp.java   |  3 +-
 .../clusterized/main/ClusterizedListener.java      |  5 ----
 .../ClusterizedLargeDirectoryRouteBuilder.java     |  8 ++++--
 .../example/resume/fileset/wal/main/MainApp.java   | 11 ++------
 .../camel/example/resume/fileset/main/MainApp.java |  5 +---
 10 files changed, 56 insertions(+), 77 deletions(-)

diff --git a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java
index 7b7b4664..cd097042 100644
--- a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java
+++ b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java
@@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
-import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
 import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.support.resume.Resumables;
 import org.slf4j.Logger;
@@ -33,14 +33,12 @@ public class KinesisRoute extends RouteBuilder {
     private static final Logger LOG = LoggerFactory.getLogger(KinesisRoute.class);
 
     private final String streamName;
-    private final ResumeStrategy resumeStrategy;
     private final ResumeCache<String> resumeCache;
     private final KinesisClient client;
     private final CountDownLatch latch;
 
-    public KinesisRoute(String streamName, ResumeStrategy resumeStrategy, ResumeCache<String> resumeCache, KinesisClient client, CountDownLatch latch) {
+    public KinesisRoute(String streamName, ResumeCache<String> resumeCache, KinesisClient client, CountDownLatch latch) {
         this.streamName = streamName;
-        this.resumeStrategy = resumeStrategy;
         this.resumeCache = resumeCache;
         this.client = client;
         this.latch = latch;
@@ -57,14 +55,13 @@ public class KinesisRoute extends RouteBuilder {
 
     @Override
     public void configure() {
-        bindToRegistry(ResumeStrategy.DEFAULT_NAME, resumeStrategy);
         bindToRegistry(ResumeCache.DEFAULT_NAME, resumeCache);
         bindToRegistry("amazonKinesisClient", client);
 
         String kinesisEndpointUri = "aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient";
 
         fromF(kinesisEndpointUri, streamName)
-                .process(this::addResumeOffset)
-                .resumable(ResumeStrategy.DEFAULT_NAME);
+                .resumable().configuration(KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder())
+                .process(this::addResumeOffset);
     }
 }
diff --git a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/MainApp.java b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/MainApp.java
index 9cfdd59b..d1b3e0fa 100644
--- a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/MainApp.java
+++ b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/MainApp.java
@@ -23,11 +23,6 @@ import java.util.concurrent.Executors;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.caffeine.resume.CaffeineCache;
 import org.apache.camel.main.Main;
-import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfiguration;
-import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder;
-import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
-import org.apache.camel.resume.Cacheable;
-import org.apache.camel.resume.Resumable;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
 import org.apache.camel.test.infra.aws2.clients.KinesisUtils;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
@@ -53,31 +48,17 @@ public class MainApp {
             return;
         }
 
-        SingleNodeKafkaResumeStrategy resumeStrategy = getUpdatableConsumerResumeStrategyForSet();
         Integer batchSize = Integer.parseInt(System.getProperty("batch.size", "50"));
         CountDownLatch latch = new CountDownLatch(batchSize);
 
         Executors.newSingleThreadExecutor().submit(() -> waitForStop(main, latch));
 
-        RouteBuilder routeBuilder = new KinesisRoute(streamName, resumeStrategy, new CaffeineCache<>(100), client, latch);
+        RouteBuilder routeBuilder = new KinesisRoute(streamName, new CaffeineCache<>(100), client, latch);
 
         main.configure().addRoutesBuilder(routeBuilder);
         main.start();
     }
 
-    private static SingleNodeKafkaResumeStrategy getUpdatableConsumerResumeStrategyForSet() {
-        String bootStrapAddress = System.getProperty("bootstrap.address", "localhost:9092");
-        String kafkaTopic = System.getProperty("resume.type.kafka.topic", "offsets");
-
-        KafkaResumeStrategyConfiguration resumeStrategyConfiguration =
-                KafkaResumeStrategyConfigurationBuilder.newBuilder()
-                        .withCacheFillPolicy(Cacheable.FillPolicy.MAXIMIZING)
-                        .withBootstrapServers(bootStrapAddress)
-                        .withTopic(kafkaTopic)
-                        .build();
-
-        return new SingleNodeKafkaResumeStrategy(resumeStrategyConfiguration);
-    }
 
     private static void loadData(KinesisClient client, String streamName, int recordCount) {
         KinesisUtils.createStream(client, streamName);
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
index 6c6eb0a8..65aba596 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
@@ -29,21 +29,28 @@ public final class KafkaUtil {
     }
 
     public static SingleNodeKafkaResumeStrategy getDefaultStrategy() {
+        KafkaResumeStrategyConfiguration resumeStrategyConfiguration = getDefaultKafkaResumeStrategyConfiguration();
+
+        return new SingleNodeKafkaResumeStrategy(resumeStrategyConfiguration);
+    }
+
+    public static KafkaResumeStrategyConfiguration getDefaultKafkaResumeStrategyConfiguration() {
+        return getDefaultKafkaResumeStrategyConfigurationBuilder().build();
+    }
+
+    public static KafkaResumeStrategyConfigurationBuilder getDefaultKafkaResumeStrategyConfigurationBuilder() {
         String bootStrapAddress = System.getProperty("bootstrap.address", "localhost:9092");
         String kafkaTopic = System.getProperty("resume.type.kafka.topic", "offsets");
 
-        KafkaResumeStrategyConfiguration resumeStrategyConfiguration =
-                KafkaResumeStrategyConfigurationBuilder.newBuilder()
-                        .withBootstrapServers(bootStrapAddress)
-                        .withTopic(kafkaTopic)
-                        .withProducerProperty("max.block.ms", "10000")
-                        .withMaxInitializationDuration(Duration.ofSeconds(5))
-                        .withProducerProperty("delivery.timeout.ms", "30000")
-                        .withProducerProperty("session.timeout.ms", "15000")
-                        .withProducerProperty("request.timeout.ms", "15000")
-                        .withConsumerProperty("session.timeout.ms", "20000")
-                        .build();
-
-        return new SingleNodeKafkaResumeStrategy(resumeStrategyConfiguration);
+        return KafkaResumeStrategyConfigurationBuilder.newBuilder()
+                .withBootstrapServers(bootStrapAddress)
+                .withTopic(kafkaTopic)
+                .withProducerProperty("max.block.ms", "10000")
+                .withMaxInitializationDuration(Duration.ofSeconds(5))
+                .withProducerProperty("delivery.timeout.ms", "30000")
+                .withProducerProperty("session.timeout.ms", "15000")
+                .withProducerProperty("request.timeout.ms", "15000")
+                .withConsumerProperty("session.timeout.ms", "20000");
     }
+
 }
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
index 33451452..0ad7e275 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
@@ -24,9 +24,9 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.FileConstants;
-import org.apache.camel.processor.resume.kafka.KafkaResumeStrategy;
+import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
+import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder;
 import org.apache.camel.resume.Resumable;
-import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.support.resume.Resumables;
 import org.slf4j.Logger;
@@ -39,7 +39,6 @@ public class LargeFileRouteBuilder extends RouteBuilder {
     private static final Logger LOG = LoggerFactory.getLogger(LargeFileRouteBuilder.class);
 
     private ProducerTemplate producerTemplate;
-    private KafkaResumeStrategy testResumeStrategy;
     private final ResumeCache<File> cache;
 
     private long lastOffset;
@@ -47,8 +46,7 @@ public class LargeFileRouteBuilder extends RouteBuilder {
 
     private final CountDownLatch latch;
 
-    public LargeFileRouteBuilder(KafkaResumeStrategy resumeStrategy, ResumeCache<File> cache, CountDownLatch latch) {
-        this.testResumeStrategy = resumeStrategy;
+    public LargeFileRouteBuilder(ResumeCache<File> cache, CountDownLatch latch) {
         this.cache = cache;
         this.latch = latch;
     }
@@ -87,9 +85,10 @@ public class LargeFileRouteBuilder extends RouteBuilder {
     public void configure() {
         producerTemplate = getContext().createProducerTemplate();
 
-        getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, testResumeStrategy);
         getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, cache);
 
+        final KafkaResumeStrategyConfigurationBuilder defaultKafkaResumeStrategyConfigurationBuilder = KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder();
+
         from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
                 .routeId("largeFileRoute")
                 .convertBodyTo(String.class)
@@ -97,7 +96,7 @@ public class LargeFileRouteBuilder extends RouteBuilder {
                     .streaming()
                     .stopOnException()
                 .resumable()
-                    .resumeStrategy(ResumeStrategy.DEFAULT_NAME)
+                    .configuration(defaultKafkaResumeStrategyConfigurationBuilder)
                     .intermittent(true)
                     .process(this::process);
 
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
index 2d6a67bd..c85b2ca3 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
@@ -21,7 +21,9 @@ import java.io.File;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+import org.apache.camel.resume.ResumeStrategyConfigurationBuilder;
 import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.support.resume.Resumables;
 import org.slf4j.Logger;
@@ -29,16 +31,20 @@ import org.slf4j.LoggerFactory;
 
 public class LargeDirectoryRouteBuilder extends RouteBuilder {
     private static final Logger LOG = LoggerFactory.getLogger(LargeDirectoryRouteBuilder.class);
-    private final ResumeStrategy resumeStrategy;
     private final ResumeCache<File> cache;
+    private final ResumeStrategyConfigurationBuilder<? extends ResumeStrategyConfigurationBuilder, ? extends ResumeStrategyConfiguration> resumeStrategyConfigurationBuilder;
     private final long delay;
 
-    public LargeDirectoryRouteBuilder(ResumeStrategy resumeStrategy, ResumeCache<File> cache) {
-        this(resumeStrategy, cache, 0);
+    public LargeDirectoryRouteBuilder(ResumeCache<File> cache) {
+        this(KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder(), cache);
     }
 
-    public LargeDirectoryRouteBuilder(ResumeStrategy resumeStrategy, ResumeCache<File> cache, long delay) {
-        this.resumeStrategy = resumeStrategy;
+    public LargeDirectoryRouteBuilder(ResumeStrategyConfigurationBuilder<?, ?> resumeStrategyConfigurationBuilder, ResumeCache<File> cache) {
+        this(resumeStrategyConfigurationBuilder, cache, 0);
+    }
+
+    public LargeDirectoryRouteBuilder(ResumeStrategyConfigurationBuilder<? extends ResumeStrategyConfigurationBuilder, ? extends ResumeStrategyConfiguration> resumeStrategyConfigurationBuilder, ResumeCache<File> cache, long delay) {
+        this.resumeStrategyConfigurationBuilder = resumeStrategyConfigurationBuilder;
         this.cache = cache;
         this.delay = delay;
     }
@@ -57,11 +63,10 @@ public class LargeDirectoryRouteBuilder extends RouteBuilder {
      * Let's configure the Camel routing rules using Java code...
      */
     public void configure() {
-        getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, resumeStrategy);
         getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, cache);
 
-        from("file:{{input.dir}}?noop=true&recursive=true&preSort=true")
-                .resumable(ResumeStrategy.DEFAULT_NAME)
+        from("file:{{input.dir}}?noop=true&recursive=true")
+                .resumable().configuration(resumeStrategyConfigurationBuilder)
                 .process(this::process)
                 .to("file:{{output.dir}}");
     }
diff --git a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
index 26e11fdd..f0ac3c2a 100644
--- a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
+++ b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
@@ -42,9 +42,8 @@ public class MainApp {
         int batchSize = Integer.valueOf(tmp);
 
         CountDownLatch latch = new CountDownLatch(batchSize);
-        SingleNodeKafkaResumeStrategy resumeStrategy = KafkaUtil.getDefaultStrategy();
 
-        RouteBuilder routeBuilder = new LargeFileRouteBuilder(resumeStrategy, new CaffeineCache<>(1), latch);
+        RouteBuilder routeBuilder = new LargeFileRouteBuilder(new CaffeineCache<>(1), latch);
         main.configure().addRoutesBuilder(routeBuilder);
 
         Executors.newSingleThreadExecutor().submit(() -> waitForStop(main, latch));
diff --git a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
index 90f4dc1c..677024c3 100644
--- a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
+++ b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
@@ -20,11 +20,8 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService;
 import org.apache.camel.example.resume.fileset.clusterized.strategies.ClusterizedLargeDirectoryRouteBuilder;
-import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
 import org.apache.camel.main.BaseMainSupport;
 import org.apache.camel.main.MainListener;
-import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
-import org.apache.camel.resume.ResumeStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,8 +50,6 @@ class ClusterizedListener implements MainListener {
             main.getCamelContext().addService(clusterService);
 
             LOG.trace("Creating the strategy");
-            SingleNodeKafkaResumeStrategy resumeStrategy = KafkaUtil.getDefaultStrategy();
-            main.getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, resumeStrategy);
 
             LOG.trace("Creating the route");
             RouteBuilder routeBuilder = new ClusterizedLargeDirectoryRouteBuilder();
diff --git a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java
index d7340907..407c9105 100644
--- a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java
+++ b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java
@@ -22,7 +22,8 @@ import java.io.File;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.caffeine.resume.CaffeineCache;
-import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
+import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder;
 import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.support.resume.Resumables;
 import org.slf4j.Logger;
@@ -50,12 +51,15 @@ public class ClusterizedLargeDirectoryRouteBuilder extends RouteBuilder {
     public void configure() {
         getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, new CaffeineCache<>(10000));
 
+        final KafkaResumeStrategyConfigurationBuilder defaultKafkaResumeStrategyConfigurationBuilder = KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder();
+
         from("timer:heartbeat?period=10000")
                 .routeId("heartbeat")
                 .log("HeartBeat route (timer) ...");
 
         from("master:resume-ns:file:{{input.dir}}?noop=true&recursive=true&repeatCount=1")
-                .resumable(ResumeStrategy.DEFAULT_NAME)
+                .resumable()
+                .configuration(defaultKafkaResumeStrategyConfigurationBuilder)
                 .routeId("clustered")
                 .process(this::process)
                 .to("file:{{output.dir}}");
diff --git a/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java b/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java
index 6945a4ad..f1a78fe2 100644
--- a/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java
+++ b/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java
@@ -21,8 +21,6 @@ import java.io.File;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.caffeine.resume.CaffeineCache;
-import org.apache.camel.component.wal.WriteAheadResumeStrategy;
-import org.apache.camel.component.wal.WriteAheadResumeStrategyConfiguration;
 import org.apache.camel.component.wal.WriteAheadResumeStrategyConfigurationBuilder;
 import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
 import org.apache.camel.example.resume.strategies.kafka.check.CheckRoute;
@@ -45,15 +43,12 @@ public class MainApp {
         final String logFile = System.getProperty("wal.log.file");
         final long delay = Long.parseLong(System.getProperty("processing.delay", "0"));
 
-        WriteAheadResumeStrategyConfiguration resumeStrategyConfiguration = WriteAheadResumeStrategyConfigurationBuilder
+        final WriteAheadResumeStrategyConfigurationBuilder configurationBuilder = WriteAheadResumeStrategyConfigurationBuilder
                 .newBuilder()
                 .withDelegateResumeStrategy(resumeStrategy)
-                .withLogFile(new File(logFile))
-                .build();
+                .withLogFile(new File(logFile));
 
-        WriteAheadResumeStrategy writeAheadResumeStrategy = new WriteAheadResumeStrategy(resumeStrategyConfiguration);
-
-        RouteBuilder routeBuilder = new LargeDirectoryRouteBuilder(writeAheadResumeStrategy, new CaffeineCache<>(10000), delay);
+        RouteBuilder routeBuilder = new LargeDirectoryRouteBuilder(configurationBuilder, new CaffeineCache<>(10000), delay);
 
         main.configure().addRoutesBuilder(new CheckRoute());
         main.configure().addRoutesBuilder(routeBuilder);
diff --git a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
index 78837e69..830b78cf 100644
--- a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
+++ b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
@@ -19,11 +19,9 @@ package org.apache.camel.example.resume.fileset.main;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.caffeine.resume.CaffeineCache;
-import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
 import org.apache.camel.example.resume.strategies.kafka.check.CheckRoute;
 import org.apache.camel.example.resume.strategies.kafka.fileset.LargeDirectoryRouteBuilder;
 import org.apache.camel.main.Main;
-import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
 
 /**
  * A Camel Application
@@ -36,8 +34,7 @@ public class MainApp {
     public static void main(String... args) throws Exception {
         Main main = new Main();
 
-        SingleNodeKafkaResumeStrategy resumeStrategy = KafkaUtil.getDefaultStrategy();
-        RouteBuilder routeBuilder = new LargeDirectoryRouteBuilder(resumeStrategy, new CaffeineCache<>(10000));
+        RouteBuilder routeBuilder = new LargeDirectoryRouteBuilder(new CaffeineCache<>(10000));
 
         main.configure().addRoutesBuilder(new CheckRoute());
         main.configure().addRoutesBuilder(routeBuilder);