You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/06/13 09:04:45 UTC

[camel-examples] branch main updated: CAMEL-18127: added adapter auto-configuration for Cassandra

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git


The following commit(s) were added to refs/heads/main by this push:
     new f606dadc CAMEL-18127: added adapter auto-configuration for Cassandra
f606dadc is described below

commit f606dadc1e9cfbbf8325b81479a3574e97f1f745
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Mon Jun 6 18:42:38 2022 +0200

    CAMEL-18127: added adapter auto-configuration for Cassandra
---
 examples/resume-api/pom.xml                        |   1 +
 .../resume-api/resume-api-cassandraql/README.md    |  64 +++++++++++
 .../resume-api-cassandraql/docker-compose.yaml     |  61 ++++++++++
 examples/resume-api/resume-api-cassandraql/pom.xml |  67 +++++++++++
 .../src/main/docker/Dockerfile                     |  33 ++++++
 .../resume/cassandra/main/CassandraClient.java     |  76 +++++++++++++
 .../resume/cassandra/main/CassandraRoute.java      |  94 ++++++++++++++++
 .../example/resume/cassandra/main/ExampleDao.java  | 123 +++++++++++++++++++++
 .../resume/cassandra/main/ExampleEntry.java        |  77 +++++++++++++
 .../main/ExampleResultSetConversionStrategy.java   |  56 ++++++++++
 .../example/resume/cassandra/main/MainApp.java     | 108 ++++++++++++++++++
 .../src/main/resources/log4j2.properties           |  57 ++++++++++
 .../resume-api-cassandraql/src/main/scripts/run.sh |  51 +++++++++
 13 files changed, 868 insertions(+)

diff --git a/examples/resume-api/pom.xml b/examples/resume-api/pom.xml
index fd3062db..b36e9823 100644
--- a/examples/resume-api/pom.xml
+++ b/examples/resume-api/pom.xml
@@ -43,6 +43,7 @@
         <module>resume-api-fileset</module>
         <module>resume-api-fileset-clusterized</module>
         <module>resume-api-aws2-kinesis</module>
+        <module>resume-api-cassandraql</module>
     </modules>
 
     <dependencyManagement>
diff --git a/examples/resume-api/resume-api-cassandraql/README.md b/examples/resume-api/resume-api-cassandraql/README.md
new file mode 100644
index 00000000..36176186
--- /dev/null
+++ b/examples/resume-api/resume-api-cassandraql/README.md
@@ -0,0 +1,64 @@
+Resume API Example: Cassandra
+=========================
+
+This example shows how to use the Resume API for consuming data from Apache Cassandra.
+
+First the demo load 500 records into the database. Then, it consumes then in batches of 50 messages. It starts each batch at the point of last consumption. The offsets are stored in a Kafka topic.
+
+*Note*: this demo runs in a container. Although it is possible to run it outside a container, doing so requires additional infrastructure. Therefore, it's not extensively covered by the documentation.
+
+Building the demo
+===
+
+To build the demo and the containers:
+
+```shell
+mvn clean package && docker-compose build
+```
+
+Run
+===
+
+To run the demo:
+
+```shell
+docker-compose up -d && docker-compose logs --no-log-prefix -f example ; docker-compose down
+```
+
+Advanced / Manual
+===
+
+Launch Cassandra
+====
+
+```
+docker run -p 9042:9042 -it cassandra:4
+```
+
+Data Load
+======
+
+To load the data run: 
+
+```shell
+  java -Dcassandra.host=cassandra-host.com \
+       -Dcassandra.cql3.port=9042 \
+       -Dresume.action=load \
+       -jar /deployments/example.jar
+```
+
+Note: make sure you have copied the jar file generated during the build to `/deployments/example.jar`.
+
+Run the Example
+======
+
+```shell
+java -Dresume.type=kafka \
+           -Dresume.type=kafka \
+           -Dresume.type.kafka.topic=cassandra-offsets \
+           -Dcassandra.host=cassandra-host.com \
+           -Dcassandra.cql3.port=9042 \
+           -Dbootstrap.address=kafka-host:9092 \
+           -Dbatch.size=50 \
+           -jar /deployments/example.jar
+```
diff --git a/examples/resume-api/resume-api-cassandraql/docker-compose.yaml b/examples/resume-api/resume-api-cassandraql/docker-compose.yaml
new file mode 100644
index 00000000..2de66f5c
--- /dev/null
+++ b/examples/resume-api/resume-api-cassandraql/docker-compose.yaml
@@ -0,0 +1,61 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+version: '3.4'
+services:
+  cassandra:
+    image: cassandra:4
+    logging:
+      driver: "none"
+    ports:
+      - "9042:9042"
+  zookeeper:
+    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
+    logging:
+      driver: "none"
+    command: [
+      "sh", "-c",
+      "bin/zookeeper-server-start.sh config/zookeeper.properties"
+    ]
+    ports:
+      - "2181:2181"
+    environment:
+      LOG_DIR: /tmp/logs
+  kafka:
+    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
+    logging:
+      driver: "none"
+    command: [
+      "sh", "-c",
+      "sleep 10s && bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
+    ]
+    depends_on:
+      - zookeeper
+    ports:
+      - "9092:9092"
+    environment:
+      LOG_DIR: "/tmp/logs"
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+  example:
+    build:
+      dockerfile: src/main/docker/Dockerfile
+      context: .
+    depends_on:
+      - kafka
+      - cassandra
diff --git a/examples/resume-api/resume-api-cassandraql/pom.xml b/examples/resume-api/resume-api-cassandraql/pom.xml
new file mode 100644
index 00000000..a3612a42
--- /dev/null
+++ b/examples/resume-api/resume-api-cassandraql/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>camel-example-resume-api-parent</artifactId>
+        <groupId>org.apache.camel.example</groupId>
+        <version>3.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>resume-api-cassandraql</artifactId>
+    <name>Camel :: Example :: Resume API :: Cassandra</name>
+
+    <properties>
+        <resume.main.class>org.apache.camel.example.resume.cassandra.main.MainApp</resume.main.class>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-caffeine</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-cassandraql</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.example</groupId>
+            <artifactId>resume-api-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.camel</groupId>
+                <artifactId>camel-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/examples/resume-api/resume-api-cassandraql/src/main/docker/Dockerfile b/examples/resume-api/resume-api-cassandraql/src/main/docker/Dockerfile
new file mode 100644
index 00000000..50067719
--- /dev/null
+++ b/examples/resume-api/resume-api-cassandraql/src/main/docker/Dockerfile
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+FROM fedora:35 as resume-api-file-offset
+LABEL maintainer="orpiske@apache.org"
+
+
+ENV DATA_DIR /data/source
+ENV DATA_FILE data.txt
+ENV DEPLOYMENT_DIR /deployments
+
+RUN dnf install -y java-11-openjdk-headless tree && dnf clean all
+ENV JAVA_HOME /etc/alternatives/jre
+
+COPY target/resume-api-*with-dependencies.jar ${DEPLOYMENT_DIR}/example.jar
+COPY src/main/scripts/run.sh ${DEPLOYMENT_DIR}/run.sh
+
+RUN mkdir -p ${DATA_DIR} && \
+    cd ${DATA_DIR} && \
+    chmod +x ${DEPLOYMENT_DIR}/*.sh
+WORKDIR ${DEPLOYMENT_DIR}
+CMD [ "sh", "-c", "${DEPLOYMENT_DIR}/run.sh" ]
diff --git a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraClient.java b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraClient.java
new file mode 100644
index 00000000..783bb922
--- /dev/null
+++ b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraClient.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.cassandra.main;
+
+import java.net.InetSocketAddress;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple client for Cassandra for testing purposes
+ */
+class CassandraClient implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class);
+
+    private CqlSession session;
+
+    public CassandraClient(String host, int port) {
+        tryConnect(host, port);
+    }
+
+    private void tryConnect(String host, int port) {
+        InetSocketAddress socketAddress = new InetSocketAddress(host, port);
+
+        int i = 12;
+        do {
+            try {
+                LOG.info("Trying to connect to: {}:{}", host, port);
+                session = CqlSession.builder()
+                        .addContactPoint(socketAddress)
+                        .withLocalDatacenter("datacenter1")
+                        .build();
+                return;
+            } catch (Exception e) {
+                LOG.error("Failed to connect: {}", e.getMessage());
+                i--;
+
+                if (i == 0) {
+                    throw e;
+                }
+
+                try {
+                    Thread.sleep(10000);
+                } catch (InterruptedException ex) {
+                    return;
+                }
+            }
+        } while (i > 0);
+    }
+
+
+    public ExampleDao newExampleDao() {
+        return new ExampleDao(this.session);
+    }
+
+    @Override
+    public void close() {
+        session.close();
+    }
+}
diff --git a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraRoute.java b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraRoute.java
new file mode 100644
index 00000000..5eadeecb
--- /dev/null
+++ b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/CassandraRoute.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.cassandra.main;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.cassandra.CassandraConstants;
+import org.apache.camel.resume.ResumeAction;
+import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.camel.support.resume.Resumables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CassandraRoute extends RouteBuilder {
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraRoute.class);
+    private final CountDownLatch latch;
+    private final int batchSize;
+    private final ResumeStrategy resumeStrategy;
+    private final ResumeCache<?> resumeCache;
+    private final CassandraClient client;
+
+    public CassandraRoute(CountDownLatch latch, int batchSize, ResumeStrategy resumeStrategy, ResumeCache<?> resumeCache, CassandraClient client) {
+        this.latch = latch;
+        this.batchSize = batchSize;
+        this.resumeStrategy = resumeStrategy;
+        this.resumeCache = resumeCache;
+        this.client = client;
+    }
+
+    private class CustomResumeAction implements ResumeAction {
+        private final ExampleDao dao;
+
+        public CustomResumeAction(ExampleDao dao) {
+            this.dao = dao;
+            dao.useKeySpace();
+        }
+
+        @Override
+        public boolean evalEntry(Object key, Object value) {
+            LOG.trace("Evaluating entry {} with value {}", key, value);
+            dao.delete(UUID.fromString((String) value));
+
+            return false;
+        }
+    }
+
+    private void addResumeInfo(Exchange exchange) {
+        ExampleEntry bodyEntry = exchange.getMessage().getBody(ExampleEntry.class);
+
+        LOG.info("Received record number: {}", bodyEntry.getNumber());
+        exchange.getMessage().setHeader(Exchange.OFFSET, Resumables.of(bodyEntry.getId().toString(), bodyEntry.getId().toString()));
+        if (latch.getCount() == 1) {
+            exchange.setRouteStop(true);
+        }
+
+        latch.countDown();
+    }
+
+
+    @Override
+    public void configure() {
+        bindToRegistry("testResumeStrategy", resumeStrategy);
+        bindToRegistry("resumeCache", resumeCache);
+        bindToRegistry(CassandraConstants.CASSANDRA_RESUME_ACTION, new CustomResumeAction(client.newExampleDao()));
+
+        fromF("cql:{{cassandra.host}}:{{cassandra.cql3.port}}/camel_ks?cql=%s&resultSetConversionStrategy=#class:%s",
+                ExampleDao.getSelectStatement(batchSize), ExampleResultSetConversionStrategy.class.getName())
+                .split(body()) // We receive a list of records so, for each
+                .resumable()
+                    .resumeStrategy("testResumeStrategy")
+                    .intermittent(true) // Set to ignore empty data sets that will generate exchanges w/ no offset information
+                .process(this::addResumeInfo);
+
+    }
+}
diff --git a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/ExampleDao.java b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/ExampleDao.java
new file mode 100644
index 00000000..f19de0c6
--- /dev/null
+++ b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/ExampleDao.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.cassandra.main;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.SimpleStatement;
+import com.datastax.oss.driver.api.core.type.DataTypes;
+import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ExampleDao {
+    public static final String KEY_SPACE = "camel_ks";
+    public static final String TABLE_NAME = "example";
+
+    private static final Logger LOG = LoggerFactory.getLogger(ExampleDao.class);
+
+    private final CqlSession session;
+
+    public ExampleDao(CqlSession session) {
+        this.session = session;
+    }
+
+    public void createKeySpace() {
+        Map<String, Object> replication = new HashMap<>();
+
+        replication.put("class", "SimpleStrategy");
+        replication.put("replication_factor", 3);
+
+        String statement = SchemaBuilder.createKeyspace(KEY_SPACE)
+                .ifNotExists()
+                .withReplicationOptions(replication)
+                .asCql();
+
+        LOG.info("Executing {}", statement);
+
+        ResultSet rs = session.execute(statement);
+
+        if (!rs.wasApplied()) {
+            LOG.warn("The create key space statement did not execute");
+        }
+    }
+
+    public void useKeySpace() {
+        // Use String.format because "Bind variables cannot be used for keyspace names"
+        String statement = String.format("USE %s", KEY_SPACE);
+
+        session.execute(statement);
+    }
+
+    public void createTable() {
+        SimpleStatement statement = SchemaBuilder.createTable(TABLE_NAME)
+                .ifNotExists()
+                .withPartitionKey("id", DataTypes.TIMEUUID)
+                .withClusteringColumn("number", DataTypes.BIGINT)
+                .withColumn("text", DataTypes.TEXT)
+                .builder()
+                .setTimeout(Duration.ofSeconds(10)).build();
+
+        LOG.info("Executing create table {}", statement);
+
+        ResultSet rs = session.execute(statement);
+        if (!rs.wasApplied()) {
+            LOG.warn("The create table statement did not execute");
+        }
+    }
+
+    public static String getInsertStatement() {
+        return "insert into example(id, number, text) values (now(), ?, ?)";
+    }
+
+    public static String getSelectStatement(int limitSize) {
+        return String.format("select id, dateOf(id) as insertion_date,number,text from example limit %d", limitSize);
+    }
+
+    public void delete(UUID id) {
+        session.execute("delete from camel_ks.example where id=?", id);
+    }
+
+    public void getData(Consumer<String> consumer) {
+        ResultSet rs = session.execute("select * from example");
+
+        if (rs != null) {
+            Iterator<Row> iterator = rs.iterator();
+            while (iterator.hasNext()) {
+                Row row = iterator.next();
+                String data = row.getString("text");
+                LOG.trace("Retrieved data: {}", data);
+                consumer.accept(data);
+            }
+        } else {
+            LOG.warn("No records were returned");
+        }
+    }
+
+    public void insert(long number, String text) {
+        session.execute(getInsertStatement(), number, text);
+    }
+}
diff --git a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/ExampleEntry.java b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/ExampleEntry.java
new file mode 100644
index 00000000..88b95ba5
--- /dev/null
+++ b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/ExampleEntry.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.cassandra.main;
+
+
+import java.time.Instant;
+import java.util.UUID;
+
+public class ExampleEntry {
+    private UUID id;
+    private Instant insertionDate;
+    private long number;
+    private String text;
+
+    public ExampleEntry() {
+    }
+
+    public ExampleEntry(UUID id, Instant insertionDate, long number, String text) {
+        this.id = id;
+        this.insertionDate = insertionDate;
+        this.number = number;
+        this.text = text;
+    }
+
+    public Instant getInsertionDate() {
+        return insertionDate;
+    }
+
+    public void setInsertionDate(Instant insertionDate) {
+        this.insertionDate = insertionDate;
+    }
+
+    public long getNumber() {
+        return number;
+    }
+
+    public void setNumber(long number) {
+        this.number = number;
+    }
+
+    public String getText() {
+        return text;
+    }
+
+    public void setText(String text) {
+        this.text = text;
+    }
+
+    public UUID getId() {
+        return id;
+    }
+
+    @Override
+    public String toString() {
+        return "ExampleEntry{" +
+                "id=" + id +
+                ", insertionDate=" + insertionDate +
+                ", number=" + number +
+                ", text='" + text + '\'' +
+                '}';
+    }
+}
diff --git a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/ExampleResultSetConversionStrategy.java b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/ExampleResultSetConversionStrategy.java
new file mode 100644
index 00000000..e0c9a99b
--- /dev/null
+++ b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/ExampleResultSetConversionStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.cassandra.main;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import org.apache.camel.component.cassandra.ResultSetConversionStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ExampleResultSetConversionStrategy implements ResultSetConversionStrategy {
+    private static final Logger LOG = LoggerFactory.getLogger(ExampleResultSetConversionStrategy.class);
+
+    @Override
+    public Object getBody(ResultSet resultSet) {
+        List<ExampleEntry> ret = new ArrayList<>();
+
+        Iterator<Row> iterator = resultSet.iterator();
+        while (iterator.hasNext()) {
+            Row row = iterator.next();
+
+            UUID id = row.getUuid("id");
+
+            final Instant insertion_date = row.getInstant("insertion_date");
+            long number = row.getLong("number");
+            String text = row.getString("text");
+
+            ret.add(new ExampleEntry(id, insertion_date, number, text));
+            LOG.trace("Retrieved number {}, insertion date: {}. text: {}", number, insertion_date, text);
+        }
+
+        return ret;
+    }
+}
diff --git a/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/MainApp.java b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/MainApp.java
new file mode 100644
index 00000000..8eef1c9f
--- /dev/null
+++ b/examples/resume-api/resume-api-cassandraql/src/main/java/org/apache/camel/example/resume/cassandra/main/MainApp.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.cassandra.main;
+
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.component.caffeine.resume.CaffeineCache;
+import org.apache.camel.main.Main;
+import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+public class MainApp {
+    public static void main(String[] args) {
+        String host = System.getProperty("cassandra.host", "localhost");
+        String txtPort = System.getProperty("cassandra.cql3.port", "9042");
+
+        int port = Integer.valueOf(txtPort);
+
+        // Runs the load action
+        String action = System.getProperty("resume.action");
+        if ("load".equalsIgnoreCase(action)) {
+            try {
+                loadData(host, port);
+                System.exit(0);
+            } catch (Exception e) {
+                System.err.println("Unable to load data: " + e.getMessage());
+                e.printStackTrace();
+                System.exit(1);
+            }
+        }
+
+        // Normal code path for consuming from Cassandra
+        SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy = getUpdatableConsumerResumeStrategyForSet();
+
+        Main main = new Main();
+
+        Integer batchSize = Integer.parseInt(System.getProperty("batch.size", "50"));
+        CountDownLatch latch = new CountDownLatch(batchSize);
+        Executors.newSingleThreadExecutor().submit(() -> waitForStop(main, latch));
+
+
+        try (CassandraClient client = new CassandraClient(host, port)) {
+            main.configure().addRoutesBuilder(new CassandraRoute(latch, batchSize, resumeStrategy, new CaffeineCache<>(10240), client));
+            main.start();
+        }
+    }
+
+    public static void loadData(String host, int port) {
+        try (CassandraClient cassandraClient = new CassandraClient(host, port)) {
+            ExampleDao exampleDao = cassandraClient.newExampleDao();
+
+            exampleDao.createKeySpace();
+            exampleDao.useKeySpace();
+            exampleDao.createTable();
+
+            int dataSize = Integer.parseInt(System.getProperty("data.size", "500"));
+
+            for (long i = 0; i < dataSize; i++) {
+                exampleDao.insert(i, UUID.randomUUID().toString());
+            }
+        }
+    }
+
+    private static SingleNodeKafkaResumeStrategy<Resumable> getUpdatableConsumerResumeStrategyForSet() {
+        String bootStrapAddress = System.getProperty("bootstrap.address", "localhost:9092");
+        String kafkaTopic = System.getProperty("resume.type.kafka.topic", "offsets");
+
+        final Properties consumerProperties = SingleNodeKafkaResumeStrategy.createConsumer(bootStrapAddress);
+        consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        final Properties producerProperties = SingleNodeKafkaResumeStrategy.createProducer(bootStrapAddress);
+        return new SingleNodeKafkaResumeStrategy<>(kafkaTopic, producerProperties, consumerProperties);
+    }
+
+    private static void waitForStop(Main main, CountDownLatch latch) {
+        try {
+            latch.await();
+
+            main.stop();
+            while (!main.isStopped()) {
+                Thread.sleep(1000);
+            }
+
+            System.exit(0);
+        } catch (InterruptedException e) {
+            System.exit(1);
+        }
+    }
+}
diff --git a/examples/resume-api/resume-api-cassandraql/src/main/resources/log4j2.properties b/examples/resume-api/resume-api-cassandraql/src/main/resources/log4j2.properties
new file mode 100644
index 00000000..01c6addc
--- /dev/null
+++ b/examples/resume-api/resume-api-cassandraql/src/main/resources/log4j2.properties
@@ -0,0 +1,57 @@
+
+# Single file
+#appender.out.type = File
+#appender.out.name = file
+#appender.out.fileName = logs/test.log
+#appender.out.layout.type = PatternLayout
+#appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+
+appender.rolling-out.type = RollingFile
+appender.rolling-out.name = rolling-out
+appender.rolling-out.fileName = logs/managed-resume-restart.log
+appender.rolling-out.filePattern = logs/managed-resume-restart-%d{yyyyMMdd-HHmmss}.log
+appender.rolling-out.layout.type = PatternLayout
+# This logs the thread name and so on, but it's usually not helpful
+#appender.rolling-out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+appender.rolling-out.layout.pattern = %d{DEFAULT} [%-5p] %m%n
+appender.rolling-out.policies.type = Policies
+appender.rolling-out.policies.size.type = OnStartupTriggeringPolicy
+
+# For console
+appender.console.type = Console
+appender.console.name = console
+appender.console.layout.type = PatternLayout
+#appender.console.layout.pattern = [%t] %c --- %style{%d{DEFAULT}}{dim} [%highlight{%-5p}] %m%n
+appender.console.layout.pattern = %style{%d{DEFAULT}}{dim} [%highlight{%-5p}] %m%n
+
+logger.camel.name = org.apache.camel
+logger.camel.level = WARN
+logger.camel.additivity = false
+logger.camel.appenderRef.file.ref = rolling-out
+
+logger.camel-resume.name = org.apache.camel.processor.resume
+logger.camel-resume.level = INFO
+logger.camel-resume.additivity = false
+logger.camel-resume.appenderRef.file.ref = rolling-out
+logger.camel-resume.appenderRef.console.ref = console
+
+logger.tester.name = org.apache.camel.example.resume
+logger.tester.level = INFO
+logger.tester.additivity = false
+logger.tester.appenderRef.file.ref = rolling-out
+logger.tester.appenderRef.console.ref = console
+
+logger.camel-cassandra.name = org.apache.camel.component.cassandra
+logger.camel-cassandra.level = INFO
+logger.camel-cassandra.additivity = false
+logger.camel-cassandra.appenderRef.file.ref = rolling-out
+logger.camel-cassandra.appenderRef.console.ref = console
+
+logger.kafka.name = org.apache.kafka
+logger.kafka.level = INFO
+logger.kafka.additivity = false
+logger.kafka.appenderRef.file.ref = rolling-out
+
+rootLogger.level = INFO
+rootLogger.appenderRef.file.ref = rolling-out
+rootLogger.appenderRef.out.ref = console
diff --git a/examples/resume-api/resume-api-cassandraql/src/main/scripts/run.sh b/examples/resume-api/resume-api-cassandraql/src/main/scripts/run.sh
new file mode 100644
index 00000000..c02ed011
--- /dev/null
+++ b/examples/resume-api/resume-api-cassandraql/src/main/scripts/run.sh
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+ITERATIONS=${1:-5}
+BATCH_SIZE=${2:-50}
+DATA_SIZE=${2:-500}
+
+sleep 30s
+echo "Loading data into Kinesis"
+java -Dcassandra.host=cassandra \
+    -Dcassandra.cql3.port=9042 \
+    -Dresume.action=load \
+    -Ddata.size=${DATA_SIZE} \
+    -jar /deployments/example.jar
+echo "Done loading"
+
+for i in $(seq 0 ${ITERATIONS}) ; do
+  echo "********************************************************************************"
+  echo "Running the iteration ${i} of ${ITERATIONS} with a batch of ${BATCH_SIZE} offsets"
+  echo "********************************************************************************"
+  java -Dresume.type=kafka \
+           -Dresume.type.kafka.topic=cassandra-offsets \
+           -Dbootstrap.address=kafka:9092 \
+           -Dbatch.size=${BATCH_SIZE} \
+           -Dcassandra.host=cassandra \
+           -Dcassandra.cql3.port=9042 \
+           -Dbatch.size=${BATCH_SIZE} \
+           -jar /deployments/example.jar
+    echo "********************************************************************************"
+    echo "Finished the iteration ${i}"
+    echo "********************************************************************************"
+    sleep 2s
+done
+
+echo "###**************************************************************************###"
+echo "Resume simulation completed"
+echo "###**************************************************************************###"
+exit 0