You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/11/03 08:29:18 UTC
[camel-examples] branch main updated: CAMEL-16653: added an example with WAL
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git
The following commit(s) were added to refs/heads/main by this push:
new 00831ca7 CAMEL-16653: added an example with WAL
00831ca7 is described below
commit 00831ca7cee6db9acccda8e9a310ba223dfad5a2
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Oct 26 15:49:28 2022 +0200
CAMEL-16653: added an example with WAL
---
examples/resume-api/README.md | 1 +
examples/resume-api/pom.xml | 1 +
.../example/resume/strategies/kafka/KafkaUtil.java | 4 ++
.../kafka/fileset/LargeDirectoryRouteBuilder.java | 24 ++++---
.../resume-api/resume-api-fileset-wal/README.md | 66 +++++++++++++++++
.../resume-api-fileset-wal/docker-compose.yaml | 54 ++++++++++++++
examples/resume-api/resume-api-fileset-wal/pom.xml | 65 +++++++++++++++++
.../src/main/docker/Dockerfile | 30 ++++++++
.../example/resume/fileset/wal/main/MainApp.java | 56 +++++++++++++++
.../src/main/resources/log4j2.properties | 74 +++++++++++++++++++
.../src/main/scripts/noop.sh | 19 +++++
.../resume-api-fileset-wal/src/main/scripts/run.sh | 83 ++++++++++++++++++++++
12 files changed, 469 insertions(+), 8 deletions(-)
diff --git a/examples/resume-api/README.md b/examples/resume-api/README.md
index a0d00b23..0fba98de 100644
--- a/examples/resume-api/README.md
+++ b/examples/resume-api/README.md
@@ -6,3 +6,4 @@ This module contains examples for the resume API.
* resume-api-common: common code used for all the resume API examples
* resume-api-file-offset: an example that shows how to use the Resume API for processing large files
* resume-api-fileset: an example that shows how to use the Resume API for processing a large directory
+* resume-api-fileset-wal: an example that shows how to use the Resume API with the Write Ahead Log for processing a large directory
diff --git a/examples/resume-api/pom.xml b/examples/resume-api/pom.xml
index b7faef35..92125f7a 100644
--- a/examples/resume-api/pom.xml
+++ b/examples/resume-api/pom.xml
@@ -44,6 +44,7 @@
<module>resume-api-fileset-clusterized</module>
<module>resume-api-aws2-kinesis</module>
<module>resume-api-cassandraql</module>
+ <module>resume-api-fileset-wal</module>
</modules>
<dependencyManagement>
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
index 051c704e..6c6eb0a8 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
@@ -38,6 +38,10 @@ public final class KafkaUtil {
.withTopic(kafkaTopic)
.withProducerProperty("max.block.ms", "10000")
.withMaxInitializationDuration(Duration.ofSeconds(5))
+ .withProducerProperty("delivery.timeout.ms", "30000")
+ .withProducerProperty("session.timeout.ms", "15000")
+ .withProducerProperty("request.timeout.ms", "15000")
+ .withConsumerProperty("session.timeout.ms", "20000")
.build();
return new SingleNodeKafkaResumeStrategy(resumeStrategyConfiguration);
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
index 2b537381..2d6a67bd 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
@@ -21,8 +21,6 @@ import java.io.File;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.processor.resume.kafka.KafkaResumeStrategy;
-import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.support.resume.Resumables;
@@ -31,28 +29,38 @@ import org.slf4j.LoggerFactory;
public class LargeDirectoryRouteBuilder extends RouteBuilder {
private static final Logger LOG = LoggerFactory.getLogger(LargeDirectoryRouteBuilder.class);
- private final KafkaResumeStrategy testResumeStrategy;
+ private final ResumeStrategy resumeStrategy;
private final ResumeCache<File> cache;
+ private final long delay;
- public LargeDirectoryRouteBuilder(KafkaResumeStrategy resumeStrategy, ResumeCache<File> cache) {
- this.testResumeStrategy = resumeStrategy;
+ public LargeDirectoryRouteBuilder(ResumeStrategy resumeStrategy, ResumeCache<File> cache) {
+ this(resumeStrategy, cache, 0);
+ }
+
+ public LargeDirectoryRouteBuilder(ResumeStrategy resumeStrategy, ResumeCache<File> cache, long delay) {
+ this.resumeStrategy = resumeStrategy;
this.cache = cache;
+ this.delay = delay;
}
- private void process(Exchange exchange) {
+ private void process(Exchange exchange) throws InterruptedException {
File path = exchange.getMessage().getHeader("CamelFilePath", File.class);
LOG.debug("Processing {}", path.getPath());
exchange.getMessage().setHeader(Exchange.OFFSET, Resumables.of(path.getParentFile(), path));
+
+ if (delay > 0) {
+ Thread.sleep(delay);
+ }
}
/**
* Let's configure the Camel routing rules using Java code...
*/
public void configure() {
- getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, testResumeStrategy);
+ getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, resumeStrategy);
getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, cache);
- from("file:{{input.dir}}?noop=true&recursive=true")
+ from("file:{{input.dir}}?noop=true&recursive=true&preSort=true")
.resumable(ResumeStrategy.DEFAULT_NAME)
.process(this::process)
.to("file:{{output.dir}}");
diff --git a/examples/resume-api/resume-api-fileset-wal/README.md b/examples/resume-api/resume-api-fileset-wal/README.md
new file mode 100644
index 00000000..1c9d38e7
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/README.md
@@ -0,0 +1,66 @@
+Resume API Example: File Set with Write Ahead Log
+=========================
+
+This example shows how to use the Resume API for processing a large directory using a the write ahead log. It uses the file component to read a large directory and subdirectories, consisting of about 500 files. The processing reads a batch of 50 files and then stop. Then, when it restarts, it continues from the last file it processed in the previous execution. The offsets are stored in a Kafka topic.
+
+
+*Note*: this demo runs in a container. Although it is possible to run it outside a container, doing so requires additional infrastructure. Therefore, it's not covered by the documentation.
+
+*Note 2*: the code is deliberately slowed down for a better display of the execution.
+
+Building the demo
+===
+
+To build the demo and the containers:
+
+```shell
+mvn clean package && docker-compose build
+```
+
+Run
+===
+
+This demo runs manually. First, start the demo using the command:
+
+```shell
+docker-compose up -d && docker exec -it resume-api-fileset-wal-example-1 /bin/bash ; docker-compose down
+```
+
+This will take you to another shell. Open another window or terminal and leave that shell open for now.
+
+In the new terminal, connect to the Kafka topic used for the offsets:
+
+```shell
+docker exec -it resume-api-fileset-wal-kafka-1 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dir-offsets
+```
+
+Go back to the first terminal and run the demo:
+
+```shell
+./run.sh
+```
+
+You'll need to press Enter after every iteration.
+
+Open one more terminal to simulate outages in the Kafka instance used for the offsets.
+
+For instance, use the following command to disconnect that instance from the network:
+
+```shell
+docker network disconnect --force resume-api-fileset-wal_default resume-api-fileset-wal-kafka-1
+```
+
+Kafka can be reconnected to the network using the following command:
+
+```shell
+docker network connect --alias kafka resume-api-fileset-wal_default resume-api-fileset-wal-kafka-1
+```
+
+While running the demo, note how the application performs the recovery prior to starting the execution.
+
+
+*Note*: it assumes docker-compose will create the containers using its defualt name pattern. If that's not the case,
+please adjust the names accordingly.
+
+
+
diff --git a/examples/resume-api/resume-api-fileset-wal/docker-compose.yaml b/examples/resume-api/resume-api-fileset-wal/docker-compose.yaml
new file mode 100644
index 00000000..e1ee1bd6
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/docker-compose.yaml
@@ -0,0 +1,54 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+version: '3.4'
+services:
+ zookeeper:
+ image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
+ logging:
+ driver: "none"
+ command: [
+ "sh", "-c",
+ "bin/zookeeper-server-start.sh config/zookeeper.properties"
+ ]
+ ports:
+ - "2181:2181"
+ environment:
+ LOG_DIR: /tmp/logs
+ kafka:
+ image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
+ logging:
+ driver: "none"
+ command: [
+ "sh", "-c",
+ "sleep 10s && bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
+ ]
+ depends_on:
+ - zookeeper
+ ports:
+ - "9092:9092"
+ environment:
+ LOG_DIR: "/tmp/logs"
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+ KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ example:
+ build:
+ dockerfile: src/main/docker/Dockerfile
+ context: .
+ depends_on:
+ - kafka
diff --git a/examples/resume-api/resume-api-fileset-wal/pom.xml b/examples/resume-api/resume-api-fileset-wal/pom.xml
new file mode 100644
index 00000000..be786777
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>camel-example-resume-api-parent</artifactId>
+ <groupId>org.apache.camel.example</groupId>
+ <version>3.20.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>resume-api-fileset-wal</artifactId>
+ <name>Camel :: Example :: Resume API :: File Set (w/ WAL)</name>
+
+ <properties>
+ <resume.main.class>org.apache.camel.example.resume.fileset.wal.main.MainApp</resume.main.class>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-caffeine</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-wal</artifactId>
+ <version>3.20.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.example</groupId>
+ <artifactId>resume-api-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/examples/resume-api/resume-api-fileset-wal/src/main/docker/Dockerfile b/examples/resume-api/resume-api-fileset-wal/src/main/docker/Dockerfile
new file mode 100644
index 00000000..3f1fce64
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/src/main/docker/Dockerfile
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+FROM fedora:35 as resume-api-fileset-wal
+LABEL maintainer="orpiske@apache.org"
+
+RUN dnf install -y java-11-openjdk-headless java-11-openjdk-devel tree pv && dnf clean all
+COPY target/resume-api-*with-dependencies.jar /deployments/example.jar
+COPY src/main/scripts/run.sh /deployments/run.sh
+COPY src/main/scripts/noop.sh /deployments/noop.sh
+
+ENV JAVA_HOME /etc/alternatives/jre
+ENV DATA_DIR /data
+ENV INPUT_DIR ${DATA_DIR}/source
+ENV OUTPUT_DIR ${DATA_DIR}/output
+
+RUN chmod +x /deployments/*.sh
+WORKDIR /deployments/
+CMD [ "sh", "-c", "/deployments/noop.sh" ]
diff --git a/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java b/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java
new file mode 100644
index 00000000..17719a43
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.fileset.wal.main;
+
+import java.io.File;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.caffeine.resume.CaffeineCache;
+import org.apache.camel.component.wal.WriteAheadResumeStrategy;
+import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
+import org.apache.camel.example.resume.strategies.kafka.check.CheckRoute;
+import org.apache.camel.example.resume.strategies.kafka.fileset.LargeDirectoryRouteBuilder;
+import org.apache.camel.main.Main;
+import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
+
+/**
+ * A Camel Application
+ */
+public class MainApp {
+
+ /**
+ * A main() so we can easily run these routing rules in our IDE
+ */
+ public static void main(String... args) throws Exception {
+ Main main = new Main();
+
+ SingleNodeKafkaResumeStrategy resumeStrategy = KafkaUtil.getDefaultStrategy();
+ final String logFile = System.getProperty("wal.log.file");
+ final long delay = Long.parseLong(System.getProperty("processing.delay", "0"));
+
+ WriteAheadResumeStrategy writeAheadResumeStrategy = new WriteAheadResumeStrategy(new File(logFile), resumeStrategy);
+
+ RouteBuilder routeBuilder = new LargeDirectoryRouteBuilder(writeAheadResumeStrategy, new CaffeineCache<>(10000), delay);
+
+ main.configure().addRoutesBuilder(new CheckRoute());
+ main.configure().addRoutesBuilder(routeBuilder);
+
+ main.run(args);
+ }
+}
+
diff --git a/examples/resume-api/resume-api-fileset-wal/src/main/resources/log4j2.properties b/examples/resume-api/resume-api-fileset-wal/src/main/resources/log4j2.properties
new file mode 100644
index 00000000..39d5cd18
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/src/main/resources/log4j2.properties
@@ -0,0 +1,74 @@
+
+# Single file
+#appender.out.type = File
+#appender.out.name = file
+#appender.out.fileName = logs/test.log
+#appender.out.layout.type = PatternLayout
+#appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+
+appender.rolling-out.type = RollingFile
+appender.rolling-out.name = rolling-out
+appender.rolling-out.fileName = logs/managed-resume-restart.log
+appender.rolling-out.filePattern = logs/managed-resume-restart-%d{yyyyMMdd-HHmmss}.log
+appender.rolling-out.layout.type = PatternLayout
+# This logs the thread name and so on, but it's usually not helpful
+#appender.rolling-out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+appender.rolling-out.layout.pattern = %d{DEFAULT} [%-5p] %m%n
+appender.rolling-out.policies.type = Policies
+appender.rolling-out.policies.size.type = OnStartupTriggeringPolicy
+
+# For console
+appender.console.type = Console
+appender.console.name = console
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %style{%d{DEFAULT}}{dim} [%highlight{%-5p}] %m%n
+
+logger.camel.name = org.apache.camel
+logger.camel.level = WARN
+logger.camel.additivity = false
+logger.camel.appenderRef.file.ref = rolling-out
+
+logger.camel-resume.name = org.apache.camel.processor.resume
+logger.camel-resume.level = INFO
+logger.camel-resume.additivity = false
+logger.camel-resume.appenderRef.file.ref = rolling-out
+logger.camel-resume.appenderRef.console.ref = console
+
+logger.tester.name = org.apache.camel.example.resume
+logger.tester.level = DEBUG
+logger.tester.additivity = false
+logger.tester.appenderRef.file.ref = rolling-out
+logger.tester.appenderRef.console.ref = console
+
+logger.camel-file-resume.name = org.apache.camel.component.file.consumer
+logger.camel-file-resume.level = DEBUG
+logger.camel-file-resume.additivity = false
+logger.camel-file-resume.appenderRef.file.ref = rolling-out
+logger.camel-file-resume.appenderRef.console.ref = console
+
+logger.camel-kafka.name = org.apache.camel.component.kafka
+logger.camel-kafka.level = WARN
+logger.camel-kafka.additivity = false
+logger.camel-kafka.appenderRef.file.ref = rolling-out
+logger.camel-kafka.appenderRef.console.ref = console
+
+logger.camel-kafka-resume.name = org.apache.camel.processor.resume.kafka
+logger.camel-kafka-resume.level = DEBUG
+logger.camel-kafka-resume.additivity = false
+logger.camel-kafka-resume.appenderRef.file.ref = rolling-out
+logger.camel-kafka-resume.appenderRef.console.ref = console
+
+logger.camel-wal.name = org.apache.camel.component.wal
+logger.camel-wal.level = TRACE
+logger.camel-wal.additivity = false
+logger.camel-wal.appenderRef.file.ref = rolling-out
+logger.camel-wal.appenderRef.console.ref = console
+
+logger.kafka.name = org.apache.kafka
+logger.kafka.level = INFO
+logger.kafka.additivity = false
+logger.kafka.appenderRef.file.ref = rolling-out
+
+rootLogger.level = DEBUG
+rootLogger.appenderRef.file.ref = rolling-out
+rootLogger.appenderRef.out.ref = console
diff --git a/examples/resume-api/resume-api-fileset-wal/src/main/scripts/noop.sh b/examples/resume-api/resume-api-fileset-wal/src/main/scripts/noop.sh
new file mode 100644
index 00000000..c8fc894e
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/src/main/scripts/noop.sh
@@ -0,0 +1,19 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+echo "Use the terminal to run the demo"
+while true ; do sleep 60s ; done
diff --git a/examples/resume-api/resume-api-fileset-wal/src/main/scripts/run.sh b/examples/resume-api/resume-api-fileset-wal/src/main/scripts/run.sh
new file mode 100644
index 00000000..7c46a0bc
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/src/main/scripts/run.sh
@@ -0,0 +1,83 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+function checkResults() {
+ expectedItems=$((ITERATIONS * BATCH_SIZE))
+ processedRecords=$(cat ${OUTPUT_DIR}/summary.txt | wc -l)
+ repeated=$(cat ${OUTPUT_DIR}/summary.txt | sort | uniq --count --repeated | wc -l)
+
+ echo "###**************************************************************************###"
+ echo "Results: repeated items: ${repeated}"
+ echo "Results: processed items: ${processedRecords} (expected at least ${expectedItems})"
+ echo "###**************************************************************************###"
+ echo "Resume simulation completed"
+ echo "###**************************************************************************###"
+
+}
+
+trap checkResults exit SIGINT SIGABRT SIGHUP
+
+echo "The test will process the following directory tree:"
+
+mkdir -p ${INPUT_DIR}
+
+ITERATIONS=${1:-3}
+BATCH_SIZE=${2:-20}
+FILE_COUNT=${3:-40}
+MAX_IDLE=10
+PROCESSING_DELAY=${4:-1000}
+
+for i in $(seq 0 ${ITERATIONS}) ; do
+ mkdir -p ${INPUT_DIR}/${i}
+
+ echo "********************************************************************************"
+ echo "Running the iteration ${i} of ${ITERATIONS} with a batch of ${BATCH_SIZE} files"
+ echo "********************************************************************************"
+
+ echo "Appending ${FILE_COUNT} files to the data directory (files from the previous execution remain there)"
+ for file in $(seq 1 ${FILE_COUNT}) ; do
+ echo ${RANDOM} > ${INPUT_DIR}/${i}/${file}
+ done
+
+ echo "Only the following files from the directory ${INPUT_DIR}/${i} should processed in this execution"
+
+ java -Dinput.dir=${INPUT_DIR} \
+ -Dwal.log.file=${DATA_DIR}/resume.data \
+ -Doutput.dir=${OUTPUT_DIR} \
+ -Dprocessing.delay=${PROCESSING_DELAY} \
+ -Dresume.type=kafka \
+ -Dresume.type.kafka.topic=dir-offsets \
+ -Dbootstrap.address=kafka:9092 \
+ -jar /deployments/example.jar \
+ -di ${MAX_IDLE}
+ echo "********************************************************************************"
+ echo "Finished the iteration ${i} (press Enter to continue)"
+ echo "********************************************************************************"
+ read -r
+
+ if [[ -f ${OUTPUT_DIR}/summary.txt ]] ; then
+ processedRecords=$(cat ${OUTPUT_DIR}/summary.txt | wc -l)
+ echo "Processed ${processedRecords} so far"
+ fi
+
+
+done
+
+
+
+exit 0