You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/11/03 08:29:18 UTC

[camel-examples] branch main updated: CAMEL-16653: added an example with WAL

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git


The following commit(s) were added to refs/heads/main by this push:
     new 00831ca7 CAMEL-16653: added an example with WAL
00831ca7 is described below

commit 00831ca7cee6db9acccda8e9a310ba223dfad5a2
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Oct 26 15:49:28 2022 +0200

    CAMEL-16653: added an example with WAL
---
 examples/resume-api/README.md                      |  1 +
 examples/resume-api/pom.xml                        |  1 +
 .../example/resume/strategies/kafka/KafkaUtil.java |  4 ++
 .../kafka/fileset/LargeDirectoryRouteBuilder.java  | 24 ++++---
 .../resume-api/resume-api-fileset-wal/README.md    | 66 +++++++++++++++++
 .../resume-api-fileset-wal/docker-compose.yaml     | 54 ++++++++++++++
 examples/resume-api/resume-api-fileset-wal/pom.xml | 65 +++++++++++++++++
 .../src/main/docker/Dockerfile                     | 30 ++++++++
 .../example/resume/fileset/wal/main/MainApp.java   | 56 +++++++++++++++
 .../src/main/resources/log4j2.properties           | 74 +++++++++++++++++++
 .../src/main/scripts/noop.sh                       | 19 +++++
 .../resume-api-fileset-wal/src/main/scripts/run.sh | 83 ++++++++++++++++++++++
 12 files changed, 469 insertions(+), 8 deletions(-)

diff --git a/examples/resume-api/README.md b/examples/resume-api/README.md
index a0d00b23..0fba98de 100644
--- a/examples/resume-api/README.md
+++ b/examples/resume-api/README.md
@@ -6,3 +6,4 @@ This module contains examples for the resume API.
 * resume-api-common: common code used for all the resume API examples
 * resume-api-file-offset: an example that shows how to use the Resume API for processing large files 
 * resume-api-fileset: an example that shows how to use the Resume API for processing a large directory
+* resume-api-fileset-wal: an example that shows how to use the Resume API with the Write Ahead Log for processing a large directory
diff --git a/examples/resume-api/pom.xml b/examples/resume-api/pom.xml
index b7faef35..92125f7a 100644
--- a/examples/resume-api/pom.xml
+++ b/examples/resume-api/pom.xml
@@ -44,6 +44,7 @@
         <module>resume-api-fileset-clusterized</module>
         <module>resume-api-aws2-kinesis</module>
         <module>resume-api-cassandraql</module>
+        <module>resume-api-fileset-wal</module>
     </modules>
 
     <dependencyManagement>
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
index 051c704e..6c6eb0a8 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
@@ -38,6 +38,10 @@ public final class KafkaUtil {
                         .withTopic(kafkaTopic)
                         .withProducerProperty("max.block.ms", "10000")
                         .withMaxInitializationDuration(Duration.ofSeconds(5))
+                        .withProducerProperty("delivery.timeout.ms", "30000")
+                        .withProducerProperty("session.timeout.ms", "15000")
+                        .withProducerProperty("request.timeout.ms", "15000")
+                        .withConsumerProperty("session.timeout.ms", "20000")
                         .build();
 
         return new SingleNodeKafkaResumeStrategy(resumeStrategyConfiguration);
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
index 2b537381..2d6a67bd 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
@@ -21,8 +21,6 @@ import java.io.File;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.processor.resume.kafka.KafkaResumeStrategy;
-import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.support.resume.Resumables;
@@ -31,28 +29,38 @@ import org.slf4j.LoggerFactory;
 
 public class LargeDirectoryRouteBuilder extends RouteBuilder {
     private static final Logger LOG = LoggerFactory.getLogger(LargeDirectoryRouteBuilder.class);
-    private final KafkaResumeStrategy testResumeStrategy;
+    private final ResumeStrategy resumeStrategy;
     private final ResumeCache<File> cache;
+    private final long delay;
 
-    public LargeDirectoryRouteBuilder(KafkaResumeStrategy resumeStrategy, ResumeCache<File> cache) {
-        this.testResumeStrategy = resumeStrategy;
+    public LargeDirectoryRouteBuilder(ResumeStrategy resumeStrategy, ResumeCache<File> cache) {
+        this(resumeStrategy, cache, 0);
+    }
+
+    public LargeDirectoryRouteBuilder(ResumeStrategy resumeStrategy, ResumeCache<File> cache, long delay) {
+        this.resumeStrategy = resumeStrategy;
         this.cache = cache;
+        this.delay = delay;
     }
 
-    private void process(Exchange exchange) {
+    private void process(Exchange exchange) throws InterruptedException {
         File path = exchange.getMessage().getHeader("CamelFilePath", File.class);
         LOG.debug("Processing {}", path.getPath());
         exchange.getMessage().setHeader(Exchange.OFFSET, Resumables.of(path.getParentFile(), path));
+
+        if (delay > 0) {
+            Thread.sleep(delay);
+        }
     }
 
     /**
      * Let's configure the Camel routing rules using Java code...
      */
     public void configure() {
-        getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, testResumeStrategy);
+        getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, resumeStrategy);
         getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, cache);
 
-        from("file:{{input.dir}}?noop=true&recursive=true")
+        from("file:{{input.dir}}?noop=true&recursive=true&preSort=true")
                 .resumable(ResumeStrategy.DEFAULT_NAME)
                 .process(this::process)
                 .to("file:{{output.dir}}");
diff --git a/examples/resume-api/resume-api-fileset-wal/README.md b/examples/resume-api/resume-api-fileset-wal/README.md
new file mode 100644
index 00000000..1c9d38e7
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/README.md
@@ -0,0 +1,66 @@
+Resume API Example: File Set with Write Ahead Log
+=========================
+
+This example shows how to use the Resume API for processing a large directory using a the write ahead log. It uses the file component to read a large directory and subdirectories, consisting of about 500 files. The processing reads a batch of 50 files and then stop. Then, when it restarts, it continues from the last file it processed in the previous execution. The offsets are stored in a Kafka topic.
+
+
+*Note*: this demo runs in a container. Although it is possible to run it outside a container, doing so requires additional infrastructure. Therefore, it's not covered by the documentation.
+
+*Note 2*: the code is deliberately slowed down for a better display of the execution.
+
+Building the demo
+===
+
+To build the demo and the containers:
+
+```shell
+mvn clean package && docker-compose build
+```
+
+Run
+===
+
+This demo runs manually. First, start the demo using the command:
+
+```shell
+docker-compose up -d && docker exec -it resume-api-fileset-wal-example-1 /bin/bash ; docker-compose down
+```
+
+This will take you to another shell. Open another window or terminal and leave that shell open for now. 
+
+In the new terminal, connect to the Kafka topic used for the offsets:
+
+```shell
+docker exec -it resume-api-fileset-wal-kafka-1 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dir-offsets
+```
+
+Go back to the first terminal and run the demo:
+
+```shell
+./run.sh
+```
+
+You'll need to press Enter after every iteration.
+
+Open one more terminal to simulate outages in the Kafka instance used for the offsets. 
+
+For instance, use the following command to disconnect that instance from the network:
+
+```shell
+docker network disconnect --force resume-api-fileset-wal_default resume-api-fileset-wal-kafka-1
+```
+
+Kafka can be reconnected to the network using the following command: 
+
+```shell
+docker network connect --alias kafka resume-api-fileset-wal_default resume-api-fileset-wal-kafka-1
+```
+
+While running the demo, note how the application performs the recovery prior to starting the execution. 
+
+
+*Note*: it assumes docker-compose will create the containers using its defualt name pattern. If that's not the case, 
+please adjust the names accordingly.
+
+
+
diff --git a/examples/resume-api/resume-api-fileset-wal/docker-compose.yaml b/examples/resume-api/resume-api-fileset-wal/docker-compose.yaml
new file mode 100644
index 00000000..e1ee1bd6
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/docker-compose.yaml
@@ -0,0 +1,54 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+version: '3.4'
+services:
+  zookeeper:
+    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
+    logging:
+      driver: "none"
+    command: [
+      "sh", "-c",
+      "bin/zookeeper-server-start.sh config/zookeeper.properties"
+    ]
+    ports:
+      - "2181:2181"
+    environment:
+      LOG_DIR: /tmp/logs
+  kafka:
+    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
+    logging:
+      driver: "none"
+    command: [
+      "sh", "-c",
+      "sleep 10s && bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
+    ]
+    depends_on:
+      - zookeeper
+    ports:
+      - "9092:9092"
+    environment:
+      LOG_DIR: "/tmp/logs"
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+  example:
+    build:
+      dockerfile: src/main/docker/Dockerfile
+      context: .
+    depends_on:
+      - kafka
diff --git a/examples/resume-api/resume-api-fileset-wal/pom.xml b/examples/resume-api/resume-api-fileset-wal/pom.xml
new file mode 100644
index 00000000..be786777
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>camel-example-resume-api-parent</artifactId>
+        <groupId>org.apache.camel.example</groupId>
+        <version>3.20.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>resume-api-fileset-wal</artifactId>
+    <name>Camel :: Example :: Resume API :: File Set (w/ WAL)</name>
+
+    <properties>
+        <resume.main.class>org.apache.camel.example.resume.fileset.wal.main.MainApp</resume.main.class>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-caffeine</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-wal</artifactId>
+            <version>3.20.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.example</groupId>
+            <artifactId>resume-api-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.camel</groupId>
+                <artifactId>camel-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/examples/resume-api/resume-api-fileset-wal/src/main/docker/Dockerfile b/examples/resume-api/resume-api-fileset-wal/src/main/docker/Dockerfile
new file mode 100644
index 00000000..3f1fce64
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/src/main/docker/Dockerfile
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+FROM fedora:35 as resume-api-fileset-wal
+LABEL maintainer="orpiske@apache.org"
+
+RUN dnf install -y java-11-openjdk-headless java-11-openjdk-devel tree pv && dnf clean all
+COPY target/resume-api-*with-dependencies.jar /deployments/example.jar
+COPY src/main/scripts/run.sh /deployments/run.sh
+COPY src/main/scripts/noop.sh /deployments/noop.sh
+
+ENV JAVA_HOME /etc/alternatives/jre
+ENV DATA_DIR /data
+ENV INPUT_DIR ${DATA_DIR}/source
+ENV OUTPUT_DIR ${DATA_DIR}/output
+
+RUN chmod +x /deployments/*.sh
+WORKDIR /deployments/
+CMD [ "sh", "-c", "/deployments/noop.sh" ]
diff --git a/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java b/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java
new file mode 100644
index 00000000..17719a43
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/src/main/java/org/apache/camel/example/resume/fileset/wal/main/MainApp.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.fileset.wal.main;
+
+import java.io.File;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.caffeine.resume.CaffeineCache;
+import org.apache.camel.component.wal.WriteAheadResumeStrategy;
+import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
+import org.apache.camel.example.resume.strategies.kafka.check.CheckRoute;
+import org.apache.camel.example.resume.strategies.kafka.fileset.LargeDirectoryRouteBuilder;
+import org.apache.camel.main.Main;
+import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
+
+/**
+ * A Camel Application
+ */
+public class MainApp {
+
+    /**
+     * A main() so we can easily run these routing rules in our IDE
+     */
+    public static void main(String... args) throws Exception {
+        Main main = new Main();
+
+        SingleNodeKafkaResumeStrategy resumeStrategy = KafkaUtil.getDefaultStrategy();
+        final String logFile = System.getProperty("wal.log.file");
+        final long delay = Long.parseLong(System.getProperty("processing.delay", "0"));
+
+        WriteAheadResumeStrategy writeAheadResumeStrategy = new WriteAheadResumeStrategy(new File(logFile), resumeStrategy);
+
+        RouteBuilder routeBuilder = new LargeDirectoryRouteBuilder(writeAheadResumeStrategy, new CaffeineCache<>(10000), delay);
+
+        main.configure().addRoutesBuilder(new CheckRoute());
+        main.configure().addRoutesBuilder(routeBuilder);
+
+        main.run(args);
+    }
+}
+
diff --git a/examples/resume-api/resume-api-fileset-wal/src/main/resources/log4j2.properties b/examples/resume-api/resume-api-fileset-wal/src/main/resources/log4j2.properties
new file mode 100644
index 00000000..39d5cd18
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/src/main/resources/log4j2.properties
@@ -0,0 +1,74 @@
+
+# Single file
+#appender.out.type = File
+#appender.out.name = file
+#appender.out.fileName = logs/test.log
+#appender.out.layout.type = PatternLayout
+#appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+
+appender.rolling-out.type = RollingFile
+appender.rolling-out.name = rolling-out
+appender.rolling-out.fileName = logs/managed-resume-restart.log
+appender.rolling-out.filePattern = logs/managed-resume-restart-%d{yyyyMMdd-HHmmss}.log
+appender.rolling-out.layout.type = PatternLayout
+# This logs the thread name and so on, but it's usually not helpful
+#appender.rolling-out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+appender.rolling-out.layout.pattern = %d{DEFAULT} [%-5p] %m%n
+appender.rolling-out.policies.type = Policies
+appender.rolling-out.policies.size.type = OnStartupTriggeringPolicy
+
+# For console
+appender.console.type = Console
+appender.console.name = console
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %style{%d{DEFAULT}}{dim} [%highlight{%-5p}] %m%n
+
+logger.camel.name = org.apache.camel
+logger.camel.level = WARN
+logger.camel.additivity = false
+logger.camel.appenderRef.file.ref = rolling-out
+
+logger.camel-resume.name = org.apache.camel.processor.resume
+logger.camel-resume.level = INFO
+logger.camel-resume.additivity = false
+logger.camel-resume.appenderRef.file.ref = rolling-out
+logger.camel-resume.appenderRef.console.ref = console
+
+logger.tester.name = org.apache.camel.example.resume
+logger.tester.level = DEBUG
+logger.tester.additivity = false
+logger.tester.appenderRef.file.ref = rolling-out
+logger.tester.appenderRef.console.ref = console
+
+logger.camel-file-resume.name = org.apache.camel.component.file.consumer
+logger.camel-file-resume.level = DEBUG
+logger.camel-file-resume.additivity = false
+logger.camel-file-resume.appenderRef.file.ref = rolling-out
+logger.camel-file-resume.appenderRef.console.ref = console
+
+logger.camel-kafka.name = org.apache.camel.component.kafka
+logger.camel-kafka.level = WARN
+logger.camel-kafka.additivity = false
+logger.camel-kafka.appenderRef.file.ref = rolling-out
+logger.camel-kafka.appenderRef.console.ref = console
+
+logger.camel-kafka-resume.name = org.apache.camel.processor.resume.kafka
+logger.camel-kafka-resume.level = DEBUG
+logger.camel-kafka-resume.additivity = false
+logger.camel-kafka-resume.appenderRef.file.ref = rolling-out
+logger.camel-kafka-resume.appenderRef.console.ref = console
+
+logger.camel-wal.name = org.apache.camel.component.wal
+logger.camel-wal.level = TRACE
+logger.camel-wal.additivity = false
+logger.camel-wal.appenderRef.file.ref = rolling-out
+logger.camel-wal.appenderRef.console.ref = console
+
+logger.kafka.name = org.apache.kafka
+logger.kafka.level = INFO
+logger.kafka.additivity = false
+logger.kafka.appenderRef.file.ref = rolling-out
+
+rootLogger.level = DEBUG
+rootLogger.appenderRef.file.ref = rolling-out
+rootLogger.appenderRef.out.ref = console
diff --git a/examples/resume-api/resume-api-fileset-wal/src/main/scripts/noop.sh b/examples/resume-api/resume-api-fileset-wal/src/main/scripts/noop.sh
new file mode 100644
index 00000000..c8fc894e
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/src/main/scripts/noop.sh
@@ -0,0 +1,19 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+echo "Use the terminal to run the demo"
+while true ; do sleep 60s ; done
diff --git a/examples/resume-api/resume-api-fileset-wal/src/main/scripts/run.sh b/examples/resume-api/resume-api-fileset-wal/src/main/scripts/run.sh
new file mode 100644
index 00000000..7c46a0bc
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset-wal/src/main/scripts/run.sh
@@ -0,0 +1,83 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+function checkResults() {
+  expectedItems=$((ITERATIONS * BATCH_SIZE))
+  processedRecords=$(cat ${OUTPUT_DIR}/summary.txt | wc -l)
+  repeated=$(cat ${OUTPUT_DIR}/summary.txt | sort | uniq --count --repeated | wc -l)
+
+  echo "###**************************************************************************###"
+  echo "Results: repeated items: ${repeated}"
+  echo "Results: processed items: ${processedRecords} (expected at least ${expectedItems})"
+  echo "###**************************************************************************###"
+  echo "Resume simulation completed"
+  echo "###**************************************************************************###"
+
+}
+
+trap checkResults exit SIGINT SIGABRT SIGHUP
+
+echo "The test will process the following directory tree:"
+
+mkdir -p ${INPUT_DIR}
+
+ITERATIONS=${1:-3}
+BATCH_SIZE=${2:-20}
+FILE_COUNT=${3:-40}
+MAX_IDLE=10
+PROCESSING_DELAY=${4:-1000}
+
+for i in $(seq 0 ${ITERATIONS}) ; do
+  mkdir -p ${INPUT_DIR}/${i}
+
+  echo "********************************************************************************"
+  echo "Running the iteration ${i} of ${ITERATIONS} with a batch of ${BATCH_SIZE} files"
+  echo "********************************************************************************"
+
+  echo "Appending ${FILE_COUNT} files to the data directory (files from the previous execution remain there)"
+  for file in $(seq 1 ${FILE_COUNT}) ; do
+    echo ${RANDOM} > ${INPUT_DIR}/${i}/${file}
+  done
+
+  echo "Only the following files from the directory ${INPUT_DIR}/${i} should processed in this execution"
+
+  java -Dinput.dir=${INPUT_DIR} \
+    -Dwal.log.file=${DATA_DIR}/resume.data \
+    -Doutput.dir=${OUTPUT_DIR} \
+    -Dprocessing.delay=${PROCESSING_DELAY} \
+    -Dresume.type=kafka \
+    -Dresume.type.kafka.topic=dir-offsets \
+    -Dbootstrap.address=kafka:9092 \
+    -jar /deployments/example.jar \
+    -di ${MAX_IDLE}
+    echo "********************************************************************************"
+    echo "Finished the iteration ${i} (press Enter to continue)"
+    echo "********************************************************************************"
+    read -r
+
+    if [[ -f ${OUTPUT_DIR}/summary.txt ]] ; then
+      processedRecords=$(cat ${OUTPUT_DIR}/summary.txt | wc -l)
+      echo "Processed ${processedRecords} so far"
+    fi
+
+
+done
+
+
+
+exit 0