You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/11/03 15:49:07 UTC
[camel-examples] 02/02: CAMEL-18148: added support for configuration builders in camel-wal
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git
commit c62831bdd6b6a469cec39aebb5adf51c9b6d1121
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Nov 3 15:29:41 2022 +0100
CAMEL-18148: added support for configuration builders in camel-wal
---
.../resume/aws/kinesis/main/KinesisRoute.java | 11 +++-----
.../example/resume/aws/kinesis/main/MainApp.java | 21 +-------------
.../example/resume/strategies/kafka/KafkaUtil.java | 33 +++++++++++++---------
.../kafka/file/LargeFileRouteBuilder.java | 13 ++++-----
.../kafka/fileset/LargeDirectoryRouteBuilder.java | 23 +++++++++------
.../example/resume/file/offset/main/MainApp.java | 3 +-
.../clusterized/main/ClusterizedListener.java | 5 ----
.../ClusterizedLargeDirectoryRouteBuilder.java | 8 ++++--
.../example/resume/fileset/wal/main/MainApp.java | 11 ++------
.../camel/example/resume/fileset/main/MainApp.java | 5 +---
10 files changed, 56 insertions(+), 77 deletions(-)
diff --git a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java
index 7b7b4664..cd097042 100644
--- a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java
+++ b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java
@@ -22,7 +22,7 @@ import java.util.concurrent.CountDownLatch;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
-import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.support.resume.Resumables;
import org.slf4j.Logger;
@@ -33,14 +33,12 @@ public class KinesisRoute extends RouteBuilder {
private static final Logger LOG = LoggerFactory.getLogger(KinesisRoute.class);
private final String streamName;
- private final ResumeStrategy resumeStrategy;
private final ResumeCache<String> resumeCache;
private final KinesisClient client;
private final CountDownLatch latch;
- public KinesisRoute(String streamName, ResumeStrategy resumeStrategy, ResumeCache<String> resumeCache, KinesisClient client, CountDownLatch latch) {
+ public KinesisRoute(String streamName, ResumeCache<String> resumeCache, KinesisClient client, CountDownLatch latch) {
this.streamName = streamName;
- this.resumeStrategy = resumeStrategy;
this.resumeCache = resumeCache;
this.client = client;
this.latch = latch;
@@ -57,14 +55,13 @@ public class KinesisRoute extends RouteBuilder {
@Override
public void configure() {
- bindToRegistry(ResumeStrategy.DEFAULT_NAME, resumeStrategy);
bindToRegistry(ResumeCache.DEFAULT_NAME, resumeCache);
bindToRegistry("amazonKinesisClient", client);
String kinesisEndpointUri = "aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient";
fromF(kinesisEndpointUri, streamName)
- .process(this::addResumeOffset)
- .resumable(ResumeStrategy.DEFAULT_NAME);
+ .resumable().configuration(KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder())
+ .process(this::addResumeOffset);
}
}
diff --git a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/MainApp.java b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/MainApp.java
index 9cfdd59b..d1b3e0fa 100644
--- a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/MainApp.java
+++ b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/MainApp.java
@@ -23,11 +23,6 @@ import java.util.concurrent.Executors;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.caffeine.resume.CaffeineCache;
import org.apache.camel.main.Main;
-import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfiguration;
-import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder;
-import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
-import org.apache.camel.resume.Cacheable;
-import org.apache.camel.resume.Resumable;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
import org.apache.camel.test.infra.aws2.clients.KinesisUtils;
import software.amazon.awssdk.services.kinesis.KinesisClient;
@@ -53,31 +48,17 @@ public class MainApp {
return;
}
- SingleNodeKafkaResumeStrategy resumeStrategy = getUpdatableConsumerResumeStrategyForSet();
Integer batchSize = Integer.parseInt(System.getProperty("batch.size", "50"));
CountDownLatch latch = new CountDownLatch(batchSize);
Executors.newSingleThreadExecutor().submit(() -> waitForStop(main, latch));
- RouteBuilder routeBuilder = new KinesisRoute(streamName, resumeStrategy, new CaffeineCache<>(100), client, latch);
+ RouteBuilder routeBuilder = new KinesisRoute(streamName, new CaffeineCache<>(100), client, latch);
main.configure().addRoutesBuilder(routeBuilder);
main.start();
}
- private static SingleNodeKafkaResumeStrategy getUpdatableConsumerResumeStrategyForSet() {
- String bootStrapAddress = System.getProperty("bootstrap.address", "localhost:9092");
- String kafkaTopic = System.getProperty("resume.type.kafka.topic", "offsets");
-
- KafkaResumeStrategyConfiguration resumeStrategyConfiguration =
- KafkaResumeStrategyConfigurationBuilder.newBuilder()
- .withCacheFillPolicy(Cacheable.FillPolicy.MAXIMIZING)
- .withBootstrapServers(bootStrapAddress)
- .withTopic(kafkaTopic)
- .build();
-
- return new SingleNodeKafkaResumeStrategy(resumeStrategyConfiguration);
- }
private static void loadData(KinesisClient client, String streamName, int recordCount) {
KinesisUtils.createStream(client, streamName);
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
index 6c6eb0a8..65aba596 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
@@ -29,21 +29,28 @@ public final class KafkaUtil {
}
public static SingleNodeKafkaResumeStrategy getDefaultStrategy() {
+ KafkaResumeStrategyConfiguration resumeStrategyConfiguration = getDefaultKafkaResumeStrategyConfiguration();
+
+ return new SingleNodeKafkaResumeStrategy(resumeStrategyConfiguration);
+ }
+
+ public static KafkaResumeStrategyConfiguration getDefaultKafkaResumeStrategyConfiguration() {
+ return getDefaultKafkaResumeStrategyConfigurationBuilder().build();
+ }
+
+ public static KafkaResumeStrategyConfigurationBuilder getDefaultKafkaResumeStrategyConfigurationBuilder() {
String bootStrapAddress = System.getProperty("bootstrap.address", "localhost:9092");
String kafkaTopic = System.getProperty("resume.type.kafka.topic", "offsets");
- KafkaResumeStrategyConfiguration resumeStrategyConfiguration =
- KafkaResumeStrategyConfigurationBuilder.newBuilder()
- .withBootstrapServers(bootStrapAddress)
- .withTopic(kafkaTopic)
- .withProducerProperty("max.block.ms", "10000")
- .withMaxInitializationDuration(Duration.ofSeconds(5))
- .withProducerProperty("delivery.timeout.ms", "30000")
- .withProducerProperty("session.timeout.ms", "15000")
- .withProducerProperty("request.timeout.ms", "15000")
- .withConsumerProperty("session.timeout.ms", "20000")
- .build();
-
- return new SingleNodeKafkaResumeStrategy(resumeStrategyConfiguration);
+ return KafkaResumeStrategyConfigurationBuilder.newBuilder()
+ .withBootstrapServers(bootStrapAddress)
+ .withTopic(kafkaTopic)
+ .withProducerProperty("max.block.ms", "10000")
+ .withMaxInitializationDuration(Duration.ofSeconds(5))
+ .withProducerProperty("delivery.timeout.ms", "30000")
+ .withProducerProperty("session.timeout.ms", "15000")
+ .withProducerProperty("request.timeout.ms", "15000")
+ .withConsumerProperty("session.timeout.ms", "20000");
}
+
}
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
index 33451452..0ad7e275 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
@@ -24,9 +24,9 @@ import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.file.FileConstants;
-import org.apache.camel.processor.resume.kafka.KafkaResumeStrategy;
+import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
+import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder;
import org.apache.camel.resume.Resumable;
-import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.support.resume.Resumables;
import org.slf4j.Logger;
@@ -39,7 +39,6 @@ public class LargeFileRouteBuilder extends RouteBuilder {
private static final Logger LOG = LoggerFactory.getLogger(LargeFileRouteBuilder.class);
private ProducerTemplate producerTemplate;
- private KafkaResumeStrategy testResumeStrategy;
private final ResumeCache<File> cache;
private long lastOffset;
@@ -47,8 +46,7 @@ public class LargeFileRouteBuilder extends RouteBuilder {
private final CountDownLatch latch;
- public LargeFileRouteBuilder(KafkaResumeStrategy resumeStrategy, ResumeCache<File> cache, CountDownLatch latch) {
- this.testResumeStrategy = resumeStrategy;
+ public LargeFileRouteBuilder(ResumeCache<File> cache, CountDownLatch latch) {
this.cache = cache;
this.latch = latch;
}
@@ -87,9 +85,10 @@ public class LargeFileRouteBuilder extends RouteBuilder {
public void configure() {
producerTemplate = getContext().createProducerTemplate();
- getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, testResumeStrategy);
getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, cache);
+ final KafkaResumeStrategyConfigurationBuilder defaultKafkaResumeStrategyConfigurationBuilder = KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder();
+
from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
.routeId("largeFileRoute")
.convertBodyTo(String.class)
@@ -97,7 +96,7 @@ public class LargeFileRouteBuilder extends RouteBuilder {
.streaming()
.stopOnException()
.resumable()
- .resumeStrategy(ResumeStrategy.DEFAULT_NAME)
+ .configuration(defaultKafkaResumeStrategyConfigurationBuilder)
.intermittent(true)
.process(this::process);
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
index 2d6a67bd..c85b2ca3 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
@@ -21,7 +21,9 @@ import java.io.File;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+import org.apache.camel.resume.ResumeStrategyConfigurationBuilder;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.support.resume.Resumables;
import org.slf4j.Logger;
@@ -29,16 +31,20 @@ import org.slf4j.LoggerFactory;
public class LargeDirectoryRouteBuilder extends RouteBuilder {
private static final Logger LOG = LoggerFactory.getLogger(LargeDirectoryRouteBuilder.class);
- private final ResumeStrategy resumeStrategy;
private final ResumeCache<File> cache;
+ private final ResumeStrategyConfigurationBuilder<? extends ResumeStrategyConfigurationBuilder, ? extends ResumeStrategyConfiguration> resumeStrategyConfigurationBuilder;
private final long delay;
- public LargeDirectoryRouteBuilder(ResumeStrategy resumeStrategy, ResumeCache<File> cache) {
- this(resumeStrategy, cache, 0);
+ public LargeDirectoryRouteBuilder(ResumeCache<File> cache) {
+ this(KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder(), cache);
}
- public LargeDirectoryRouteBuilder(ResumeStrategy resumeStrategy, ResumeCache<File> cache, long delay) {
- this.resumeStrategy = resumeStrategy;
+ public LargeDirectoryRouteBuilder(ResumeStrategyConfigurationBuilder<?, ?> resumeStrategyConfigurationBuilder, ResumeCache<File> cache) {
+ this(resumeStrategyConfigurationBuilder, cache, 0);
+ }
+
+ public LargeDirectoryRouteBuilder(ResumeStrategyConfigurationBuilder<? extends ResumeStrategyConfigurationBuilder, ? extends ResumeStrategyConfiguration> resumeStrategyConfigurationBuilder, ResumeCache<File> cache, long delay) {
+ this.resumeStrategyConfigurationBuilder = resumeStrategyConfigurationBuilder;
this.cache = cache;
this.delay = delay;
}
@@ -57,11 +63,10 @@ public class LargeDirectoryRouteBuilder extends RouteBuilder {
* Let's configure the Camel routing rules using Java code...
*/
public void configure() {
- getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, resumeStrategy);
getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, cache);
- from("file:{{input.dir}}?noop=true&recursive=true&preSort=true")
- .resumable(ResumeStrategy.DEFAULT_NAME)
+ from("file:{{input.dir}}?noop=true&recursive=true")
+ .resumable().configuration(resumeStrategyConfigurationBuilder)
.process(this::process)
.to("file:{{output.dir}}");
}
diff --git a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
index 26e11fdd..f0ac3c2a 100644
--- a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
+++ b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
@@ -42,9 +42,8 @@ public class MainApp {
int batchSize = Integer.valueOf(tmp);
CountDownLatch latch = new CountDownLatch(batchSize);
- SingleNodeKafkaResumeStrategy resumeStrategy = KafkaUtil.getDefaultStrategy();
- RouteBuilder routeBuilder = new LargeFileRouteBuilder(resumeStrategy, new CaffeineCache<>(1), latch);
+ RouteBuilder routeBuilder = new LargeFileRouteBuilder(new CaffeineCache<>(1), latch);
main.configure().addRoutesBuilder(routeBuilder);
Executors.newSingleThreadExecutor().submit(() -> waitForStop(main, latch));
diff --git a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
index 90f4dc1c..677024c3 100644
--- a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
+++ b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
@@ -20,11 +20,8 @@ import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService;
import org.apache.camel.example.resume.fileset.clusterized.strategies.ClusterizedLargeDirectoryRouteBuilder;
-import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
import org.apache.camel.main.BaseMainSupport;
import org.apache.camel.main.MainListener;
-import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
-import org.apache.camel.resume.ResumeStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,8 +50,6 @@ class ClusterizedListener implements MainListener {
main.getCamelContext().addService(clusterService);
LOG.trace("Creating the strategy");
- SingleNodeKafkaResumeStrategy resumeStrategy = KafkaUtil.getDefaultStrategy();
- main.getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, resumeStrategy);
LOG.trace("Creating the route");
RouteBuilder routeBuilder = new ClusterizedLargeDirectoryRouteBuilder();
diff --git a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java
index d7340907..407c9105 100644
--- a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java
+++ b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java
@@ -22,7 +22,8 @@ import java.io.File;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.caffeine.resume.CaffeineCache;
-import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
+import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.support.resume.Resumables;
import org.slf4j.Logger;
@@ -50,12 +51,15 @@ public class ClusterizedLargeDirectoryRouteBuilder extends RouteBuilder {
public void configure() {
getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, new CaffeineCache<>(10000));
+ final KafkaResumeStrategyConfigurationBuilder defaultKafkaResumeStrategyConfigurationBuilder = KafkaUtil.getDefaultKafkaResumeStrategyConfigurationBuilder();
+
from("timer:heartbeat?period=10000")
.routeId("heartbeat")
.log("HeartBeat route (timer) ...");
from("master:resume-ns:file:{{input.dir}}?noop=true&recursive=true&repeatCount=1")
- .resumable(ResumeStrategy.DEFAULT_NAME)
+ .resumable()
+ .configuration(defaultKafkaResumeStrategyConfigurationBuilder)
.routeId("clustered")
.process(this::process)
.to("file:{{output.dir}}");
diff --git a/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java b/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java
index 6945a4ad..f1a78fe2 100644
--- a/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java
+++ b/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java
@@ -21,8 +21,6 @@ import java.io.File;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.caffeine.resume.CaffeineCache;
-import org.apache.camel.component.wal.WriteAheadResumeStrategy;
-import org.apache.camel.component.wal.WriteAheadResumeStrategyConfiguration;
import org.apache.camel.component.wal.WriteAheadResumeStrategyConfigurationBuilder;
import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
import org.apache.camel.example.resume.strategies.kafka.check.CheckRoute;
@@ -45,15 +43,12 @@ public class MainApp {
final String logFile = System.getProperty("wal.log.file");
final long delay = Long.parseLong(System.getProperty("processing.delay", "0"));
- WriteAheadResumeStrategyConfiguration resumeStrategyConfiguration = WriteAheadResumeStrategyConfigurationBuilder
+ final WriteAheadResumeStrategyConfigurationBuilder configurationBuilder = WriteAheadResumeStrategyConfigurationBuilder
.newBuilder()
.withDelegateResumeStrategy(resumeStrategy)
- .withLogFile(new File(logFile))
- .build();
+ .withLogFile(new File(logFile));
- WriteAheadResumeStrategy writeAheadResumeStrategy = new WriteAheadResumeStrategy(resumeStrategyConfiguration);
-
- RouteBuilder routeBuilder = new LargeDirectoryRouteBuilder(writeAheadResumeStrategy, new CaffeineCache<>(10000), delay);
+ RouteBuilder routeBuilder = new LargeDirectoryRouteBuilder(configurationBuilder, new CaffeineCache<>(10000), delay);
main.configure().addRoutesBuilder(new CheckRoute());
main.configure().addRoutesBuilder(routeBuilder);
diff --git a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
index 78837e69..830b78cf 100644
--- a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
+++ b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
@@ -19,11 +19,9 @@ package org.apache.camel.example.resume.fileset.main;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.caffeine.resume.CaffeineCache;
-import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
import org.apache.camel.example.resume.strategies.kafka.check.CheckRoute;
import org.apache.camel.example.resume.strategies.kafka.fileset.LargeDirectoryRouteBuilder;
import org.apache.camel.main.Main;
-import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
/**
* A Camel Application
@@ -36,8 +34,7 @@ public class MainApp {
public static void main(String... args) throws Exception {
Main main = new Main();
- SingleNodeKafkaResumeStrategy resumeStrategy = KafkaUtil.getDefaultStrategy();
- RouteBuilder routeBuilder = new LargeDirectoryRouteBuilder(resumeStrategy, new CaffeineCache<>(10000));
+ RouteBuilder routeBuilder = new LargeDirectoryRouteBuilder(new CaffeineCache<>(10000));
main.configure().addRoutesBuilder(new CheckRoute());
main.configure().addRoutesBuilder(routeBuilder);