You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/03/16 15:28:17 UTC

[camel-examples] branch main updated: CAMEL-17762: add an example for the Resume API v2

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git


The following commit(s) were added to refs/heads/main by this push:
     new 875c3dc  CAMEL-17762: add an example for the Resume API v2
875c3dc is described below

commit 875c3dc7263e78e981f15460d02a87594cc39020
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Mar 11 15:56:08 2022 +0100

    CAMEL-17762: add an example for the Resume API v2
---
 examples/README.adoc                               |   4 +-
 examples/pom.xml                                   |   3 +-
 examples/resume-api/README.md                      |   8 ++
 examples/resume-api/pom.xml                        | 129 +++++++++++++++++++++
 examples/resume-api/resume-api-common/pom.xml      |  40 +++++++
 .../clients/kafka/ConsumerPropertyFactory.java     |  35 ++++++
 .../kafka/DefaultConsumerPropertyFactory.java      |  73 ++++++++++++
 .../kafka/DefaultProducerPropertyFactory.java      |  60 ++++++++++
 .../resume/clients/kafka/FileDeserializer.java     |  35 ++++++
 .../resume/clients/kafka/FileSerializer.java       |  34 ++++++
 .../clients/kafka/ProducerPropertyFactory.java     |  35 ++++++
 .../resume-api/resume-api-file-offset/README.md    |  89 ++++++++++++++
 .../resume-api-file-offset/docker-compose.yaml     |  54 +++++++++
 examples/resume-api/resume-api-file-offset/pom.xml |  56 +++++++++
 .../src/main/docker/Dockerfile                     |  33 ++++++
 .../example/resume/file/offset/main/MainApp.java   |  76 ++++++++++++
 .../strategies/KafkaFileOffsetResumeStrategy.java  |  79 +++++++++++++
 .../offset/strategies/LargeFileRouteBuilder.java   | 101 ++++++++++++++++
 .../file/offset/strategies/SingleItemCache.java    |  62 ++++++++++
 .../src/main/resources/log4j2.properties           |  56 +++++++++
 .../resume-api-file-offset/src/main/scripts/run.sh |  48 ++++++++
 examples/resume-api/resume-api-fileset/README.md   |  27 +++++
 .../resume-api-fileset/docker-compose.yaml         |  54 +++++++++
 examples/resume-api/resume-api-fileset/pom.xml     |  56 +++++++++
 .../resume-api-fileset/src/main/docker/Dockerfile  |  30 +++++
 .../camel/example/resume/fileset/main/MainApp.java |  71 ++++++++++++
 .../strategies/KafkaFileSetResumeStrategy.java     |  80 +++++++++++++
 .../strategies/LargeDirectoryRouteBuilder.java     |  59 ++++++++++
 .../resume/fileset/strategies/ListBasedCache.java  |  29 +++++
 .../resume/fileset/strategies/MultiItemCache.java  |  81 +++++++++++++
 .../src/main/resources/log4j2.properties           |  56 +++++++++
 .../resume-api-fileset/src/main/scripts/run.sh     |  46 ++++++++
 32 files changed, 1697 insertions(+), 2 deletions(-)

diff --git a/examples/README.adoc b/examples/README.adoc
index a35c815..3b241e9 100644
--- a/examples/README.adoc
+++ b/examples/README.adoc
@@ -11,7 +11,7 @@ View the individual example READMEs for details.
 == Examples
 
 // examples: START
-Number of Examples: 85 (0 deprecated)
+Number of Examples: 86 (0 deprecated)
 
 [width="100%",cols="4,2,4",options="header"]
 |===
@@ -132,6 +132,8 @@ Number of Examples: 85 (0 deprecated)
 
 | link:loan-broker-jms/README.adoc[Loan Broker JMS] (loan-broker-jms) | EIP | An example that shows the EIP's loan broker demo using JMS
 
+| link:resume-api/README.md[Resume Api] (resume-api) | EIP | Multiple Resume API examples
+
 | link:route-throttling/README.adoc[Route Throttling] (route-throttling) | EIP | A client-server example using JMS transport where we on the server side can throttle the Camel
         route dynamically based on the flow of messages
     
diff --git a/examples/pom.xml b/examples/pom.xml
index 0e050e9..34cedf9 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -137,6 +137,7 @@
         <module>oaipmh</module>
         <module>pojo-messaging</module>
         <module>reactive-executor-vertx</module>
+        <module>resume-api</module>
         <module>route-throttling</module>
         <module>routeloader</module>
         <module>routetemplate</module>
@@ -156,7 +157,7 @@
         <module>vertx-kafka</module>
         <module>widget-gadget-java</module>
         <module>widget-gadget-xml</module>
-  </modules>
+    </modules>
 
     <properties>
         <!-- unify the encoding for all the modules -->
diff --git a/examples/resume-api/README.md b/examples/resume-api/README.md
new file mode 100644
index 0000000..a0d00b2
--- /dev/null
+++ b/examples/resume-api/README.md
@@ -0,0 +1,8 @@
+Resume API Examples
+=========================
+
+This module contains examples for the resume API. 
+
+* resume-api-common: common code used for all the resume API examples
+* resume-api-file-offset: an example that shows how to use the Resume API for processing large files 
+* resume-api-fileset: an example that shows how to use the Resume API for processing a large directory
diff --git a/examples/resume-api/pom.xml b/examples/resume-api/pom.xml
new file mode 100644
index 0000000..b0e6923
--- /dev/null
+++ b/examples/resume-api/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.camel.example</groupId>
+        <artifactId>examples</artifactId>
+        <version>3.16.0-SNAPSHOT</version>
+    </parent>
+
+
+    <artifactId>camel-example-resume-api-parent</artifactId>
+    <name>Camel :: Example :: Resume API :: Parent</name>
+    <description>Multiple Resume API examples</description>
+    <packaging>pom</packaging>
+
+    <properties>
+        <category>EIP</category>
+    </properties>
+
+    <modules>
+        <module>resume-api-common</module>
+        <module>resume-api-fileset</module>
+        <module>resume-api-file-offset</module>
+    </modules>
+
+    <dependencyManagement>
+        <dependencies>
+            <!-- Add Camel BOM -->
+            <dependency>
+                <groupId>org.apache.camel</groupId>
+                <artifactId>camel-bom</artifactId>
+                <version>${camel.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-main</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-file</artifactId>
+        </dependency>
+
+
+        <!-- caching -->
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>${caffeine-version}</version>
+        </dependency>
+
+        <!-- logging -->
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <scope>runtime</scope>
+            <version>${log4j2-version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.camel</groupId>
+                    <artifactId>camel-maven-plugin</artifactId>
+                    <version>${camel.version}</version>
+                    <executions>
+                        <execution>
+                            <goals>
+                                <goal>prepare-fatjar</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-assembly-plugin</artifactId>
+                    <configuration>
+                        <descriptorRefs>
+                            <descriptorRef>jar-with-dependencies</descriptorRef>
+                        </descriptorRefs>
+                        <archive>
+                            <manifest>
+                                <mainClass>${resume.main.class}</mainClass>
+                            </manifest>
+                        </archive>
+                    </configuration>
+                    <executions>
+                        <execution>
+                            <id>make-assembly</id>
+                            <phase>package</phase>
+                            <goals>
+                                <goal>single</goal>
+                            </goals>
+                        </execution>
+                    </executions>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
+
+</project>
diff --git a/examples/resume-api/resume-api-common/pom.xml b/examples/resume-api/resume-api-common/pom.xml
new file mode 100644
index 0000000..5f48ac6
--- /dev/null
+++ b/examples/resume-api/resume-api-common/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>camel-example-resume-api-parent</artifactId>
+        <groupId>org.apache.camel.example</groupId>
+        <version>3.16.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>resume-api-common</artifactId>
+    <name>Camel :: Example :: Resume API :: Common</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-kafka</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ConsumerPropertyFactory.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ConsumerPropertyFactory.java
new file mode 100644
index 0000000..7998b82
--- /dev/null
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ConsumerPropertyFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.clients.kafka;
+
+import java.util.Properties;
+
+/**
+ * An interface to produce properties that can be used to configure a Kafka consumer. The
+ * CLI runtime equivalent for this file is the consumer.properties file from the Kafka
+ * provided along with the Kafka deliverable
+ *
+ */
+public interface ConsumerPropertyFactory {
+
+    /**
+     * Gets the properties used to configure the consumer
+     * @return a Properties object containing the set of properties for the consumer
+     */
+    Properties getProperties();
+}
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultConsumerPropertyFactory.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultConsumerPropertyFactory.java
new file mode 100644
index 0000000..4ccae11
--- /dev/null
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultConsumerPropertyFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.clients.kafka;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+
+/**
+ * A property producer that can be used to create a Kafka consumer with a minimum
+ * set of configurations that can consume from a Kafka topic.
+ * <p>
+ * The consumer behavior from using this set of properties causes the consumer to
+ * consumes all published messages "from-beginning".
+ */
+public class DefaultConsumerPropertyFactory implements ConsumerPropertyFactory {
+    private final String bootstrapServer;
+    private String valueDeserializer = StringDeserializer.class.getName();
+    private String keyDeserializer = StringDeserializer.class.getName();
+    private String offsetReset = "earliest";
+
+    /**
+     * Constructs the properties using the given bootstrap server
+     *
+     * @param bootstrapServer the address of the server in the format
+     *                        PLAINTEXT://${address}:${port}
+     */
+    public DefaultConsumerPropertyFactory(String bootstrapServer) {
+        this.bootstrapServer = bootstrapServer;
+    }
+
+    public void setValueDeserializer(String valueDeserializer) {
+        this.valueDeserializer = valueDeserializer;
+    }
+
+    public void setKeyDeserializer(String keyDeserializer) {
+        this.keyDeserializer = keyDeserializer;
+    }
+
+    public void setOffsetReset(String offsetReset) {
+        this.offsetReset = offsetReset;
+    }
+
+    @Override
+    public Properties getProperties() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+
+        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
+        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
+        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
+        return props;
+    }
+}
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultProducerPropertyFactory.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultProducerPropertyFactory.java
new file mode 100644
index 0000000..6aa3ad3
--- /dev/null
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/DefaultProducerPropertyFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.clients.kafka;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+public class DefaultProducerPropertyFactory implements ProducerPropertyFactory {
+    private final String bootstrapServer;
+    private String valueSerializer = StringDeserializer.class.getName();
+    private String keySerializer = StringDeserializer.class.getName();
+
+    /**
+     * Constructs the properties using the given bootstrap server
+     * @param bootstrapServer the address of the server in the format
+     *                       PLAINTEXT://${address}:${port}
+     */
+    public DefaultProducerPropertyFactory(String bootstrapServer) {
+        this.bootstrapServer = bootstrapServer;
+    }
+
+    public void setValueSerializer(String valueSerializer) {
+        this.valueSerializer = valueSerializer;
+    }
+
+    public void setKeySerializer(String keySerializer) {
+        this.keySerializer = keySerializer;
+    }
+
+    @Override
+    public Properties getProperties() {
+        Properties props = new Properties();
+
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
+
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
+
+        return props;
+    }
+}
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileDeserializer.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileDeserializer.java
new file mode 100644
index 0000000..f7caa69
--- /dev/null
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileDeserializer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.clients.kafka;
+
+import java.io.File;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileDeserializer implements Deserializer<File> {
+    private static final Logger LOG = LoggerFactory.getLogger(FileDeserializer.class);
+
+    @Override
+    public File deserialize(String s, byte[] bytes) {
+        String name = new String(bytes);
+        LOG.trace("Deserializing {} from topic {}", name, s);
+        return new File(name);
+    }
+}
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileSerializer.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileSerializer.java
new file mode 100644
index 0000000..a991ff1
--- /dev/null
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/FileSerializer.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.clients.kafka;
+
+import java.io.File;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileSerializer implements Serializer<File> {
+    private static final Logger LOG = LoggerFactory.getLogger(FileSerializer.class);
+
+    @Override
+    public byte[] serialize(String s, File file) {
+        LOG.trace("Serializing file: {}", file.getPath());
+        return file.getPath().getBytes();
+    }
+}
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ProducerPropertyFactory.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ProducerPropertyFactory.java
new file mode 100644
index 0000000..45ffda1
--- /dev/null
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/clients/kafka/ProducerPropertyFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.clients.kafka;
+
+import java.util.Properties;
+
+/**
+ * An interface to produce properties that can be used to configure a Kafka consumer. The
+ * CLI runtime equivalent for this file is the consumer.properties file from the Kafka
+ * provided along with the Kafka deliverable
+ *
+ */
+public interface ProducerPropertyFactory {
+
+    /**
+     * Gets the properties used to configure the consumer
+     * @return a Properties object containing the set of properties for the consumer
+     */
+    Properties getProperties();
+}
diff --git a/examples/resume-api/resume-api-file-offset/README.md b/examples/resume-api/resume-api-file-offset/README.md
new file mode 100644
index 0000000..1c996e0
--- /dev/null
+++ b/examples/resume-api/resume-api-file-offset/README.md
@@ -0,0 +1,89 @@
+Resume API Example: File Offsets
+=========================
+
+This example shows how to use the Resume API for processing a large file. It uses the file component to read a large text file. The processing reads 30 lines of the file and then stop. Then, when it restarts, it continues from the last line it processed in the previous execution. The offsets are stored in a Kafka topic.
+
+
+*Note*: this demo runs in a container. Although it is possible to run it outside a container, doing so requires additional infrastructure. Therefore, it's not covered by the documentation.
+
+*Note 2*: the code is deliberately slowed down for a better display of the execution.
+
+Building the demo
+===
+
+To build the demo and the containers:
+
+```shell
+mvn clean package && docker-compose build && 
+```
+
+
+
+Run
+===
+
+To run the demo:
+
+```shell
+docker-compose up -d && docker-compose logs --no-log-prefix -f example ; docker-compose down
+```
+
+Advanced / Manual
+===
+
+Prepare the data sets
+====
+
+To create a large file for processing, run:
+
+Large file
+======
+```shell
+cat /dev/null > data.txt ; for i in $(seq -f '%010g' 1 10000000) ; do echo $i >> data.txt ; done
+```
+
+Verify command
+```
+cat summary.txt | sort | uniq --count --repeated | wc -l
+```
+
+
+Large directories
+==== 
+
+To create a large directory tree for processing, run:
+
+**Very Small (500 files)**
+
+```shell
+mkdir very-small && cd very-small
+for dir in $(seq 1 5) ; do mkdir $dir && (cd $dir && (for file in $(seq 1 100) ; do echo $RANDOM > $file ; done) ; cd ..) ; done
+```
+
+**Small (25000 files)**
+
+```shell
+mkdir small && cd small
+for dir in $(seq 1 5) ; do mkdir $dir && (cd $dir && (for file in $(seq 1 5000) ; do echo $RANDOM > $file ; done) ; cd ..) ; done
+```
+
+**Medium (50000 files)**
+
+```shell
+mkdir medium && cd medium
+for dir in $(seq 1 5) ; do mkdir $dir && (cd $dir && (for file in $(seq 1 10000) ; do echo $RANDOM > $file ; done) ; cd ..) ; done
+```
+
+**Large (100000 files)**
+
+```shell
+mkdir large && cd large
+for dir in $(seq 1 10) ; do mkdir $dir && (cd $dir && (for file in $(seq 1 10000) ; do echo $RANDOM > $file ; done) ; cd ..) ; done
+```
+
+**Very Large (2000000 files)**
+
+```shell
+mkdir very-large && cd very-large
+for dir in $(seq 1 2) ; do mkdir $dir && (cd $dir && (for file in $(seq 1 1000000) ; do echo $RANDOM > $file ; done) ; cd ..) ; done
+```
diff --git a/examples/resume-api/resume-api-file-offset/docker-compose.yaml b/examples/resume-api/resume-api-file-offset/docker-compose.yaml
new file mode 100644
index 0000000..e1ee1bd
--- /dev/null
+++ b/examples/resume-api/resume-api-file-offset/docker-compose.yaml
@@ -0,0 +1,54 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+version: '3.4'
+services:
+  zookeeper:
+    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
+    logging:
+      driver: "none"
+    command: [
+      "sh", "-c",
+      "bin/zookeeper-server-start.sh config/zookeeper.properties"
+    ]
+    ports:
+      - "2181:2181"
+    environment:
+      LOG_DIR: /tmp/logs
+  kafka:
+    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
+    logging:
+      driver: "none"
+    command: [
+      "sh", "-c",
+      "sleep 10s && bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
+    ]
+    depends_on:
+      - zookeeper
+    ports:
+      - "9092:9092"
+    environment:
+      LOG_DIR: "/tmp/logs"
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+  example:
+    build:
+      dockerfile: src/main/docker/Dockerfile
+      context: .
+    depends_on:
+      - kafka
diff --git a/examples/resume-api/resume-api-file-offset/pom.xml b/examples/resume-api/resume-api-file-offset/pom.xml
new file mode 100644
index 0000000..b75a2eb
--- /dev/null
+++ b/examples/resume-api/resume-api-file-offset/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>camel-example-resume-api-parent</artifactId>
+        <groupId>org.apache.camel.example</groupId>
+        <version>3.16.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>resume-api-file-offset</artifactId>
+    <name>Camel :: Example :: Resume API :: File Offset</name>
+
+    <properties>
+        <resume.main.class>org.apache.camel.example.resume.file.offset.main.MainApp</resume.main.class>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.example</groupId>
+            <artifactId>resume-api-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.camel</groupId>
+                <artifactId>camel-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/examples/resume-api/resume-api-file-offset/src/main/docker/Dockerfile b/examples/resume-api/resume-api-file-offset/src/main/docker/Dockerfile
new file mode 100644
index 0000000..5006771
--- /dev/null
+++ b/examples/resume-api/resume-api-file-offset/src/main/docker/Dockerfile
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+FROM fedora:35 as resume-api-file-offset
+LABEL maintainer="orpiske@apache.org"
+
+
+ENV DATA_DIR /data/source
+ENV DATA_FILE data.txt
+ENV DEPLOYMENT_DIR /deployments
+
+RUN dnf install -y java-11-openjdk-headless tree && dnf clean all
+ENV JAVA_HOME /etc/alternatives/jre
+
+COPY target/resume-api-*with-dependencies.jar ${DEPLOYMENT_DIR}/example.jar
+COPY src/main/scripts/run.sh ${DEPLOYMENT_DIR}/run.sh
+
+RUN mkdir -p ${DATA_DIR} && \
+    cd ${DATA_DIR} && \
+    chmod +x ${DEPLOYMENT_DIR}/*.sh
+WORKDIR ${DEPLOYMENT_DIR}
+CMD [ "sh", "-c", "${DEPLOYMENT_DIR}/run.sh" ]
diff --git a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
new file mode 100644
index 0000000..60145ba
--- /dev/null
+++ b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.file.offset.main;
+
+import java.io.File;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.example.resume.clients.kafka.DefaultConsumerPropertyFactory;
+import org.apache.camel.example.resume.clients.kafka.DefaultProducerPropertyFactory;
+import org.apache.camel.example.resume.clients.kafka.FileDeserializer;
+import org.apache.camel.example.resume.clients.kafka.FileSerializer;
+import org.apache.camel.example.resume.file.offset.strategies.KafkaFileOffsetResumeStrategy;
+import org.apache.camel.example.resume.file.offset.strategies.LargeFileRouteBuilder;
+import org.apache.camel.example.resume.file.offset.strategies.SingleItemCache;
+import org.apache.camel.main.Main;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+
+/**
+ * A Camel Application
+ */
+public class MainApp {
+
+    /**
+     * A main() so we can easily run these routing rules in our IDE
+     */
+    public static void main(String... args) throws Exception {
+        Main main = new Main();
+
+        KafkaFileOffsetResumeStrategy<File> resumeStrategy = getUpdatableConsumerResumeStrategy();
+
+        RouteBuilder routeBuilder = new LargeFileRouteBuilder(resumeStrategy);
+        main.configure().addRoutesBuilder(routeBuilder);
+        main.run(args);
+    }
+
+    private static KafkaFileOffsetResumeStrategy<File> getUpdatableConsumerResumeStrategy() {
+        String bootStrapAddress = System.getProperty("bootstrap.address", "localhost:9092");
+        String kafkaTopic = System.getProperty("resume.type.kafka.topic", "offsets");
+
+        final DefaultConsumerPropertyFactory consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootStrapAddress);
+
+        consumerPropertyFactory.setKeyDeserializer(FileDeserializer.class.getName());
+        consumerPropertyFactory.setValueDeserializer(LongDeserializer.class.getName());
+
+        consumerPropertyFactory.setOffsetReset("earliest");
+
+        final DefaultProducerPropertyFactory producerPropertyFactory = new DefaultProducerPropertyFactory(bootStrapAddress);
+
+        producerPropertyFactory.setKeySerializer(FileSerializer.class.getName());
+        producerPropertyFactory.setValueSerializer(LongSerializer.class.getName());
+
+        SingleItemCache<String> cache = new SingleItemCache<>();
+
+        return new KafkaFileOffsetResumeStrategy(kafkaTopic, cache, producerPropertyFactory, consumerPropertyFactory);
+    }
+
+
+
+}
+
diff --git a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/strategies/KafkaFileOffsetResumeStrategy.java b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/strategies/KafkaFileOffsetResumeStrategy.java
new file mode 100644
index 0000000..b63b248
--- /dev/null
+++ b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/strategies/KafkaFileOffsetResumeStrategy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.file.offset.strategies;
+
+import java.io.File;
+import java.util.Optional;
+
+import org.apache.camel.ResumeCache;
+import org.apache.camel.component.file.consumer.GenericFileResumable;
+import org.apache.camel.component.file.consumer.GenericFileResumeStrategy;
+import org.apache.camel.example.resume.clients.kafka.DefaultConsumerPropertyFactory;
+import org.apache.camel.example.resume.clients.kafka.DefaultProducerPropertyFactory;
+import org.apache.camel.processor.resume.kafka.AbstractKafkaResumeStrategy;
+
+public class KafkaFileOffsetResumeStrategy<K> extends AbstractKafkaResumeStrategy<K, Long> implements GenericFileResumeStrategy<File> {
+    public static final int CACHE_SIZE = 100;
+
+    private final String topic;
+    private final ResumeCache<K, Long> cache;
+
+    // NOTE: To have data type flexibility, we need to allow callers to set up the consumer and producer property
+    // factories. See MainApp.
+    public KafkaFileOffsetResumeStrategy(String topic,
+                                         ResumeCache<K, Long> cache,
+                                         DefaultProducerPropertyFactory producerPropertyFactory,
+                                         DefaultConsumerPropertyFactory consumerPropertyFactory)
+    {
+        super(topic, cache, producerPropertyFactory.getProperties(), consumerPropertyFactory.getProperties());
+        this.topic = topic;
+        this.cache = cache;
+    }
+
+
+    private Optional<Long> getLastOffset(GenericFileResumable<File> resumable) {
+        final File addressable = resumable.getAddressable();
+        return getLastOffset((K) addressable);
+    }
+
+    public Optional<Long> getLastOffset(K addressable) {
+        return cache.get(addressable);
+    }
+
+    @Override
+    public void subscribe() {
+        checkAndSubscribe(topic, 1);
+    }
+
+    @Override
+    public void resume(GenericFileResumable<File> resumable) {
+        final Optional<Long> lastOffsetOpt = getLastOffset(resumable);
+
+        if (!lastOffsetOpt.isPresent()) {
+            return;
+        }
+
+        final long lastOffset = lastOffsetOpt.get();
+        resumable.updateLastOffset(lastOffset);
+    }
+
+    @Override
+    public void resume() {
+        throw new UnsupportedOperationException("Cannot perform blind resume");
+    }
+}
diff --git a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/strategies/LargeFileRouteBuilder.java b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/strategies/LargeFileRouteBuilder.java
new file mode 100644
index 0000000..3d8d026
--- /dev/null
+++ b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/strategies/LargeFileRouteBuilder.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.file.offset.strategies;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.Reader;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Resumable;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.resume.Resumables;
+import org.apache.camel.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Camel Java DSL Router
+ */
+public class LargeFileRouteBuilder extends RouteBuilder {
+    private static final Logger LOG = LoggerFactory.getLogger(LargeFileRouteBuilder.class);
+
+    private KafkaFileOffsetResumeStrategy<File> testResumeStrategy;
+    private long lastOffset;
+    private int batchSize;
+
+    public LargeFileRouteBuilder(KafkaFileOffsetResumeStrategy resumeStrategy) {
+        this.testResumeStrategy = resumeStrategy;
+        String tmp = System.getProperty("resume.batch.size", "30");
+        this.batchSize = Integer.valueOf(tmp);
+    }
+
+    private void process(Exchange exchange) throws Exception {
+        Reader reader = exchange.getIn().getBody(Reader.class);
+        BufferedReader br = IOHelper.buffered(reader);
+
+        File path = exchange.getMessage().getHeader("CamelFilePath", File.class);
+        LOG.debug("Path: {} ", path);
+
+        lastOffset = testResumeStrategy.getLastOffset(path).orElse(0L);
+        LOG.debug("Starting to read at offset {}", lastOffset);
+
+        String line = br.readLine();
+        int count = 0;
+        while (count < batchSize) {
+            if (line == null || line.isEmpty()) {
+                LOG.debug("End of file");
+                // EOF, therefore reset the offset
+                final Resumable<File, Long> resumable = Resumables.of(path, 0L);
+                exchange.getMessage().setHeader(Exchange.OFFSET, resumable);
+
+                break;
+            }
+
+            LOG.debug("Read line at offset {} from the file: {}", lastOffset, line);
+            testResumeStrategy.updateLastOffset(Resumables.of(path, lastOffset));
+
+            // It sums w/ 1 in order to account for the newline that is removed by readLine
+            lastOffset += line.length() + 1;
+            // Simulate slow processing
+            Thread.sleep(50);
+            count++;
+
+            line = br.readLine();
+        }
+
+        if (count == batchSize) {
+            getCamelContext().stop();
+            System.exit(0);
+        }
+    }
+
+    /**
+     * Let's configure the Camel routing rules using Java code...
+     */
+    public void configure() {
+        getCamelContext().getRegistry().bind("testResumeStrategy", testResumeStrategy);
+
+        from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
+                .resumable("testResumeStrategy")
+                .convertBodyTo(Reader.class)
+                .process(this::process);
+
+    }
+
+}
diff --git a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/strategies/SingleItemCache.java b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/strategies/SingleItemCache.java
new file mode 100644
index 0000000..fdabae5
--- /dev/null
+++ b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/strategies/SingleItemCache.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.file.offset.strategies;
+
+import java.util.Optional;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.camel.ResumeCache;
+
+/**
+ * This is a simple cache implementation that uses Caffeine to store
+ * the 100 offsets.
+ *
+ * @param <K> The type of the data to cache
+ */
+public class SingleItemCache<K> implements ResumeCache<K, Long> {
+    public static final int CACHE_SIZE = 100;
+    private final Cache<K, Long> cache = Caffeine.newBuilder()
+            .maximumSize(CACHE_SIZE)
+            .build();
+
+    @Override
+    public void add(K key, Long offsetValue) {
+        cache.put(key, offsetValue);
+    }
+
+    @Override
+    public Optional<Long> get(K key) {
+        Long entry = cache.getIfPresent(key);
+
+        if (entry == null) {
+            return Optional.empty();
+        }
+
+        return Optional.of(entry.longValue());
+    }
+
+    @Override
+    public boolean isFull() {
+        if (cache.estimatedSize() >= CACHE_SIZE) {
+            return true;
+        }
+
+        return false;
+    }
+}
diff --git a/examples/resume-api/resume-api-file-offset/src/main/resources/log4j2.properties b/examples/resume-api/resume-api-file-offset/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..fb11b71
--- /dev/null
+++ b/examples/resume-api/resume-api-file-offset/src/main/resources/log4j2.properties
@@ -0,0 +1,56 @@
+
+# Single file
+#appender.out.type = File
+#appender.out.name = file
+#appender.out.fileName = logs/test.log
+#appender.out.layout.type = PatternLayout
+#appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+
+appender.rolling-out.type = RollingFile
+appender.rolling-out.name = rolling-out
+appender.rolling-out.fileName = logs/managed-resume-restart.log
+appender.rolling-out.filePattern = logs/managed-resume-restart-%d{yyyyMMdd-HHmmss}.log
+appender.rolling-out.layout.type = PatternLayout
+# This logs the thread name and so on, but it's usually not helpful
+#appender.rolling-out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+appender.rolling-out.layout.pattern = %d{DEFAULT} [%-5p] %m%n
+appender.rolling-out.policies.type = Policies
+appender.rolling-out.policies.size.type = OnStartupTriggeringPolicy
+
+# For console
+appender.console.type = Console
+appender.console.name = console
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %style{%d{DEFAULT}}{dim} [%highlight{%-5p}] %m%n
+
+logger.camel.name = org.apache.camel
+logger.camel.level = WARN
+logger.camel.additivity = false
+logger.camel.appenderRef.file.ref = rolling-out
+
+logger.camel-resume.name = org.apache.camel.processor.resume
+logger.camel-resume.level = DEBUG
+logger.camel-resume.additivity = false
+logger.camel-resume.appenderRef.file.ref = rolling-out
+logger.camel-resume.appenderRef.console.ref = console
+
+logger.tester.name = org.apache.camel.example.resume
+logger.tester.level = DEBUG
+logger.tester.additivity = false
+logger.tester.appenderRef.file.ref = rolling-out
+logger.tester.appenderRef.console.ref = console
+
+logger.camel-file-resume.name = org.apache.camel.component.file.consumer
+logger.camel-file-resume.level = DEBUG
+logger.camel-file-resume.additivity = false
+logger.camel-file-resume.appenderRef.file.ref = rolling-out
+logger.camel-file-resume.appenderRef.console.ref = console
+
+logger.kafka.name = org.apache.kafka
+logger.kafka.level = INFO
+logger.kafka.additivity = false
+logger.kafka.appenderRef.file.ref = rolling-out
+
+rootLogger.level = DEBUG
+rootLogger.appenderRef.file.ref = rolling-out
+rootLogger.appenderRef.out.ref = console
diff --git a/examples/resume-api/resume-api-file-offset/src/main/scripts/run.sh b/examples/resume-api/resume-api-file-offset/src/main/scripts/run.sh
new file mode 100644
index 0000000..4fe58a4
--- /dev/null
+++ b/examples/resume-api/resume-api-file-offset/src/main/scripts/run.sh
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+ITERATIONS=${1:-5}
+BATCH_SIZE=${2:-50}
+
+echo "Creating data file"
+for i in $(seq -f '%010g' 1 100000) ; do
+  echo $i >> ${DATA_DIR}/data.txt ;
+done
+echo "Done"
+sleep 10s
+
+for i in $(seq 0 ${ITERATIONS}) ; do
+  echo "********************************************************************************"
+  echo "Running the iteration ${i} of ${ITERATIONS} with a batch of ${BATCH_SIZE} files"
+  echo "********************************************************************************"
+  java -Dinput.dir=${DATA_DIR} \
+    -Doutput.dir=/tmp/out \
+    -Dinput.file=${DATA_FILE} \
+    -Dresume.type=kafka \
+    -Dresume.type.kafka.topic=file-offsets \
+    -Dbootstrap.address=kafka:9092 \
+    -Dresume.batch.size=${BATCH_SIZE} \
+    -jar /deployments/example.jar
+    echo "********************************************************************************"
+    echo "Finished the iteration ${i}"
+    echo "********************************************************************************"
+    sleep 2s
+done
+
+echo "###**************************************************************************###"
+echo "Resume simulation completed"
+echo "###**************************************************************************###"
+exit 0
diff --git a/examples/resume-api/resume-api-fileset/README.md b/examples/resume-api/resume-api-fileset/README.md
new file mode 100644
index 0000000..b949a85
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset/README.md
@@ -0,0 +1,27 @@
+Resume API Example: File Set
+=========================
+
+This example shows how to use the Resume API for processing a large directory. It uses the file component to read a large directory and subdirectories, consisting of about 500 files. The processing reads a batch of 50 files and then stop. Then, when it restarts, it continues from the last file it processed in the previous execution. The offsets are stored in a Kafka topic. 
+
+
+*Note*: this demo runs in a container. Although it is possible to run it outside a container, doing so requires additional infrastructure. Therefore, it's not covered by the documentation.
+
+*Note 2*: the code is deliberately slowed down for a better display of the execution.
+
+Building the demo
+===
+
+To build the demo and the containers:
+
+```shell
+mvn clean package && docker-compose build && 
+```
+
+Run
+===
+
+To run the demo:
+
+```shell
+docker-compose up -d && docker-compose logs --no-log-prefix -f example ; docker-compose down
+```
diff --git a/examples/resume-api/resume-api-fileset/docker-compose.yaml b/examples/resume-api/resume-api-fileset/docker-compose.yaml
new file mode 100644
index 0000000..e1ee1bd
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset/docker-compose.yaml
@@ -0,0 +1,54 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+version: '3.4'
+services:
+  zookeeper:
+    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
+    logging:
+      driver: "none"
+    command: [
+      "sh", "-c",
+      "bin/zookeeper-server-start.sh config/zookeeper.properties"
+    ]
+    ports:
+      - "2181:2181"
+    environment:
+      LOG_DIR: /tmp/logs
+  kafka:
+    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
+    logging:
+      driver: "none"
+    command: [
+      "sh", "-c",
+      "sleep 10s && bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
+    ]
+    depends_on:
+      - zookeeper
+    ports:
+      - "9092:9092"
+    environment:
+      LOG_DIR: "/tmp/logs"
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+  example:
+    build:
+      dockerfile: src/main/docker/Dockerfile
+      context: .
+    depends_on:
+      - kafka
diff --git a/examples/resume-api/resume-api-fileset/pom.xml b/examples/resume-api/resume-api-fileset/pom.xml
new file mode 100644
index 0000000..9aeb512
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset/pom.xml
@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>camel-example-resume-api-parent</artifactId>
+        <groupId>org.apache.camel.example</groupId>
+        <version>3.16.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>resume-api-fileset</artifactId>
+    <name>Camel :: Example :: Resume API :: File Set</name>
+
+    <properties>
+        <resume.main.class>org.apache.camel.example.resume.fileset.main.MainApp</resume.main.class>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.example</groupId>
+            <artifactId>resume-api-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.camel</groupId>
+                <artifactId>camel-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile b/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile
new file mode 100644
index 0000000..72c319c
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+FROM fedora:35 as resume-api-fileset
+LABEL maintainer="orpiske@apache.org"
+
+RUN dnf install -y java-11-openjdk-headless tree pv && dnf clean all
+COPY target/resume-api-*with-dependencies.jar /deployments/example.jar
+COPY src/main/scripts/run.sh /deployments/run.sh
+
+ENV JAVA_HOME /etc/alternatives/jre
+ENV DATA_DIR /data/source
+
+RUN mkdir -p ${DATA_DIR} && \
+    cd ${DATA_DIR} && \
+    for dir in $(seq 1 5) ; do mkdir $dir && (cd $dir && (for file in $(seq 1 100) ; do echo $RANDOM > $file ; done) ; cd ..) ; done && \
+    chmod +x /deployments/*.sh
+WORKDIR /deployments/
+CMD [ "sh", "-c", "/deployments/run.sh" ]
diff --git a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
new file mode 100644
index 0000000..076d92a
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.fileset.main;
+
+import java.io.File;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.example.resume.clients.kafka.DefaultConsumerPropertyFactory;
+import org.apache.camel.example.resume.clients.kafka.DefaultProducerPropertyFactory;
+import org.apache.camel.example.resume.clients.kafka.FileDeserializer;
+import org.apache.camel.example.resume.clients.kafka.FileSerializer;
+import org.apache.camel.example.resume.fileset.strategies.KafkaFileSetResumeStrategy;
+import org.apache.camel.example.resume.fileset.strategies.LargeDirectoryRouteBuilder;
+import org.apache.camel.example.resume.fileset.strategies.MultiItemCache;
+import org.apache.camel.main.Main;
+
+/**
+ * A Camel Application
+ */
+public class MainApp {
+
+    /**
+     * A main() so we can easily run these routing rules in our IDE
+     */
+    public static void main(String... args) throws Exception {
+        Main main = new Main();
+
+        KafkaFileSetResumeStrategy resumeStrategy = getUpdatableConsumerResumeStrategyForSet();
+        RouteBuilder routeBuilder = new LargeDirectoryRouteBuilder(resumeStrategy);
+
+        main.configure().addRoutesBuilder(routeBuilder);
+        main.run(args);
+    }
+
+    private static KafkaFileSetResumeStrategy getUpdatableConsumerResumeStrategyForSet() {
+        String bootStrapAddress = System.getProperty("bootstrap.address", "localhost:9092");
+        String kafkaTopic = System.getProperty("resume.type.kafka.topic", "offsets");
+
+        final DefaultConsumerPropertyFactory consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootStrapAddress);
+
+        consumerPropertyFactory.setKeyDeserializer(FileDeserializer.class.getName());
+        consumerPropertyFactory.setValueDeserializer(FileDeserializer.class.getName());
+        consumerPropertyFactory.setOffsetReset("earliest");
+
+        final DefaultProducerPropertyFactory producerPropertyFactory = new DefaultProducerPropertyFactory(bootStrapAddress);
+
+        producerPropertyFactory.setKeySerializer(FileSerializer.class.getName());
+        producerPropertyFactory.setValueSerializer(FileSerializer.class.getName());
+
+        MultiItemCache<File, File> cache = new MultiItemCache<>();
+
+        return new KafkaFileSetResumeStrategy(kafkaTopic, cache, producerPropertyFactory, consumerPropertyFactory);
+    }
+
+}
+
diff --git a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/strategies/KafkaFileSetResumeStrategy.java b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/strategies/KafkaFileSetResumeStrategy.java
new file mode 100644
index 0000000..f6a4def
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/strategies/KafkaFileSetResumeStrategy.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.fileset.strategies;
+
+import java.io.File;
+
+import org.apache.camel.component.file.consumer.FileResumeSet;
+import org.apache.camel.component.file.consumer.FileSetResumeStrategy;
+import org.apache.camel.example.resume.clients.kafka.DefaultConsumerPropertyFactory;
+import org.apache.camel.example.resume.clients.kafka.DefaultProducerPropertyFactory;
+import org.apache.camel.processor.resume.kafka.AbstractKafkaResumeStrategy;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaFileSetResumeStrategy<K, V> extends AbstractKafkaResumeStrategy<K, V> implements FileSetResumeStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaFileSetResumeStrategy.class);
+
+    private final MultiItemCache<K, V> cache;
+    private final String topic;
+
+    public KafkaFileSetResumeStrategy(String topic, MultiItemCache<K, V> cache,
+                                      DefaultProducerPropertyFactory producerPropertyFactory,
+                                      DefaultConsumerPropertyFactory consumerPropertyFactory)
+    {
+        super(topic, cache, producerPropertyFactory.getProperties(), consumerPropertyFactory.getProperties());
+        this.topic = topic;
+        this.cache = cache;
+    }
+
+    @Override
+    public ConsumerRecords<K, V> consume() {
+        return consume(10);
+    }
+
+    private boolean notProcessed(File file) {
+        File key = file.getParentFile();
+
+        // if the file is in the cache, then it's already processed
+        boolean ret = !cache.contains((K) key, (V) file);
+        return ret;
+    }
+
+    @Override
+    public void subscribe() {
+        checkAndSubscribe(topic);
+    }
+
+    @Override
+    public void resume(FileResumeSet resumable) {
+        if (resumable != null) {
+            resumable.resumeEach(this::notProcessed);
+            if (resumable.hasResumables()) {
+                LOG.debug("There's {} files to still to be processed", resumable.resumed().length);
+            }
+        } else {
+            LOG.trace("Nothing to resume");
+        }
+    }
+
+    @Override
+    public void resume() {
+        throw new UnsupportedOperationException("Cannot perform blind resume");
+    }
+}
diff --git a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/strategies/LargeDirectoryRouteBuilder.java b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/strategies/LargeDirectoryRouteBuilder.java
new file mode 100644
index 0000000..d2ea53a
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/strategies/LargeDirectoryRouteBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.fileset.strategies;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Resumable;
+import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.resume.Resumables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LargeDirectoryRouteBuilder extends RouteBuilder {
+    private static final Logger LOG = LoggerFactory.getLogger(LargeDirectoryRouteBuilder.class);
+    private UpdatableConsumerResumeStrategy<File, File, Resumable<File, File>> testResumeStrategy;
+
+    public LargeDirectoryRouteBuilder(UpdatableConsumerResumeStrategy resumeStrategy) {
+        this.testResumeStrategy = resumeStrategy;
+    }
+
+    private void process(Exchange exchange) throws Exception {
+        File path = exchange.getMessage().getHeader("CamelFilePath", File.class);
+        LOG.debug("Processing {}", path.getPath());
+        exchange.getMessage().setHeader(Exchange.OFFSET, Resumables.of(path.getParentFile(), path));
+
+        // Put a dealy to simulate slow processing
+        Thread.sleep(50);
+    }
+
+    /**
+     * Let's configure the Camel routing rules using Java code...
+     */
+    public void configure() {
+        getCamelContext().getRegistry().bind("testResumeStrategy", testResumeStrategy);
+
+        from("file:{{input.dir}}?noop=true&recursive=true")
+                .resumable("testResumeStrategy")
+                .process(this::process)
+                .to("file:{{output.dir}}");
+    }
+
+}
diff --git a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/strategies/ListBasedCache.java b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/strategies/ListBasedCache.java
new file mode 100644
index 0000000..64fdc2d
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/strategies/ListBasedCache.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.fileset.strategies;
+
+import org.apache.camel.ResumeCache;
+
+/**
+ * A cache for list based items
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public interface ListBasedCache<K,V> extends ResumeCache<K, V> {
+    boolean contains(K key, V entry);
+}
diff --git a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/strategies/MultiItemCache.java b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/strategies/MultiItemCache.java
new file mode 100644
index 0000000..4966255
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/strategies/MultiItemCache.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.fileset.strategies;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A cache that can store multiple key-based resumables
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class MultiItemCache<K, V> implements ListBasedCache<K, V> {
+    private static final Logger LOG = LoggerFactory.getLogger(MultiItemCache.class);
+    public static final int CACHE_SIZE = 10000;
+
+    private final Cache<K, List<V>> cache = Caffeine.newBuilder()
+            .maximumSize(CACHE_SIZE)
+            .build();
+
+    @Override
+    public synchronized void add(K key, V offsetValue) {
+        LOG.trace("Adding entry to the cache (k/v): {}/{}", key, offsetValue);
+        LOG.trace("Adding entry to the cache (k/v) with types: {}/{}", key.getClass(), offsetValue.getClass());
+        List<V> entries = cache.get(key, k -> new ArrayList<>());
+
+        entries.add(offsetValue);
+    }
+
+    @Override
+    public Optional<V> get(K key) {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+
+    @Override
+    public synchronized boolean contains(K key, V entry) {
+        final List<V> entries = cache.getIfPresent(key);
+
+        if (entries == null) {
+            return false;
+        }
+
+
+        boolean ret = entries.contains(entry);
+        LOG.trace("Checking if cache contains key {} with value {} ({})", key, entry, ret);
+
+        return ret;
+    }
+
+
+    @Override
+    public boolean isFull() {
+        if (cache.estimatedSize() >= CACHE_SIZE) {
+            return true;
+        }
+
+        return false;
+    }
+}
diff --git a/examples/resume-api/resume-api-fileset/src/main/resources/log4j2.properties b/examples/resume-api/resume-api-fileset/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..fb11b71
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset/src/main/resources/log4j2.properties
@@ -0,0 +1,56 @@
+
+# Single file
+#appender.out.type = File
+#appender.out.name = file
+#appender.out.fileName = logs/test.log
+#appender.out.layout.type = PatternLayout
+#appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+
+appender.rolling-out.type = RollingFile
+appender.rolling-out.name = rolling-out
+appender.rolling-out.fileName = logs/managed-resume-restart.log
+appender.rolling-out.filePattern = logs/managed-resume-restart-%d{yyyyMMdd-HHmmss}.log
+appender.rolling-out.layout.type = PatternLayout
+# This logs the thread name and so on, but it's usually not helpful
+#appender.rolling-out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+appender.rolling-out.layout.pattern = %d{DEFAULT} [%-5p] %m%n
+appender.rolling-out.policies.type = Policies
+appender.rolling-out.policies.size.type = OnStartupTriggeringPolicy
+
+# For console
+appender.console.type = Console
+appender.console.name = console
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %style{%d{DEFAULT}}{dim} [%highlight{%-5p}] %m%n
+
+logger.camel.name = org.apache.camel
+logger.camel.level = WARN
+logger.camel.additivity = false
+logger.camel.appenderRef.file.ref = rolling-out
+
+logger.camel-resume.name = org.apache.camel.processor.resume
+logger.camel-resume.level = DEBUG
+logger.camel-resume.additivity = false
+logger.camel-resume.appenderRef.file.ref = rolling-out
+logger.camel-resume.appenderRef.console.ref = console
+
+logger.tester.name = org.apache.camel.example.resume
+logger.tester.level = DEBUG
+logger.tester.additivity = false
+logger.tester.appenderRef.file.ref = rolling-out
+logger.tester.appenderRef.console.ref = console
+
+logger.camel-file-resume.name = org.apache.camel.component.file.consumer
+logger.camel-file-resume.level = DEBUG
+logger.camel-file-resume.additivity = false
+logger.camel-file-resume.appenderRef.file.ref = rolling-out
+logger.camel-file-resume.appenderRef.console.ref = console
+
+logger.kafka.name = org.apache.kafka
+logger.kafka.level = INFO
+logger.kafka.additivity = false
+logger.kafka.appenderRef.file.ref = rolling-out
+
+rootLogger.level = DEBUG
+rootLogger.appenderRef.file.ref = rolling-out
+rootLogger.appenderRef.out.ref = console
diff --git a/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh b/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
new file mode 100644
index 0000000..f060b48
--- /dev/null
+++ b/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+echo "The test will process the following directory tree:"
+
+sleep 2s
+tree ${DATA_DIR} | pv -q -L 512
+sleep 8s
+
+ITERATIONS=${1:-5}
+BATCH_SIZE=${2:-50}
+
+for i in $(seq 0 ${ITERATIONS}) ; do
+  echo "********************************************************************************"
+  echo "Running the iteration ${i} of ${ITERATIONS} with a batch of ${BATCH_SIZE} files"
+  echo "********************************************************************************"
+  java -Dinput.dir=${DATA_DIR} \
+    -Doutput.dir=/tmp/out \
+    -Dresume.type=kafka \
+    -Dresume.type.kafka.topic=dir-offsets \
+    -Dbootstrap.address=kafka:9092 \
+    -jar /deployments/example.jar \
+    -dm ${BATCH_SIZE}
+    echo "********************************************************************************"
+    echo "Finished the iteration ${i}"
+    echo "********************************************************************************"
+    sleep 2s
+done
+
+echo "###**************************************************************************###"
+echo "Resume simulation completed"
+echo "###**************************************************************************###"
+exit 0