You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/06/03 11:06:03 UTC

[camel-examples] branch main updated: CAMEL-18127: added Resume API v2 example for AWS Kinesis

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git


The following commit(s) were added to refs/heads/main by this push:
     new 61ec6fd4 CAMEL-18127: added Resume API v2 example for AWS Kinesis
61ec6fd4 is described below

commit 61ec6fd49373e39b86cbe51b4d99f349e9ae9bc8
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Jun 2 17:00:51 2022 +0200

    CAMEL-18127: added Resume API v2 example for AWS Kinesis
---
 examples/resume-api/pom.xml                        |  1 +
 .../resume-api/resume-api-aws2-kinesis/README.md   | 73 ++++++++++++++++
 .../resume-api-aws2-kinesis/docker-compose.yaml    | 63 ++++++++++++++
 .../resume-api/resume-api-aws2-kinesis/pom.xml     | 80 +++++++++++++++++
 .../src/main/docker/Dockerfile                     | 33 ++++++++
 .../resume/aws/kinesis/main/KinesisRoute.java      | 70 +++++++++++++++
 .../example/resume/aws/kinesis/main/MainApp.java   | 99 ++++++++++++++++++++++
 .../src/main/resources/log4j2.properties           | 69 +++++++++++++++
 .../src/main/scripts/run.sh                        | 59 +++++++++++++
 9 files changed, 547 insertions(+)

diff --git a/examples/resume-api/pom.xml b/examples/resume-api/pom.xml
index 1c5b9428..fd3062db 100644
--- a/examples/resume-api/pom.xml
+++ b/examples/resume-api/pom.xml
@@ -42,6 +42,7 @@
         <module>resume-api-file-offset</module>
         <module>resume-api-fileset</module>
         <module>resume-api-fileset-clusterized</module>
+        <module>resume-api-aws2-kinesis</module>
     </modules>
 
     <dependencyManagement>
diff --git a/examples/resume-api/resume-api-aws2-kinesis/README.md b/examples/resume-api/resume-api-aws2-kinesis/README.md
new file mode 100644
index 00000000..a4e125e9
--- /dev/null
+++ b/examples/resume-api/resume-api-aws2-kinesis/README.md
@@ -0,0 +1,73 @@
+Resume API Example: AWS Kinesis
+=========================
+
+This example shows how to use the Resume API for consuming AWS Kinesis data streams. 
+
+It uses LocalStack to simulate the AWS Kinesis instance. 
+
+First the demo load 500 records into the data stream. Then, it consumes then in batches of 50 messages. It starts each batch at the point of last consumption. The offsets are stored in a Kafka topic.
+
+*Note*: this demo runs in a container. Although it is possible to run it outside a container, doing so requires additional infrastructure. Therefore, it's not extensively covered by the documentation.
+
+Building the demo
+===
+
+To build the demo and the containers:
+
+```shell
+mvn clean package && docker-compose build
+```
+
+Run
+===
+
+To run the demo:
+
+```shell
+docker-compose up -d && docker-compose logs --no-log-prefix -f example ; docker-compose down
+```
+
+Advanced / Manual
+===
+
+Launch LocalStack
+====
+
+```
+docker run --rm -e SERVICES=kinesis -e DEBUG=1 -e LS_LOG=trace -it -p 4566:4566 localstack/localstack:latest
+```
+
+Data Load
+======
+
+To load the data run: 
+
+```shell
+  java -Dresume.type=kafka \
+    -Dresume.type.kafka.topic=aws-kinesis-offsets \
+    -Dresume.action=load \
+    -Daws.kinesis.streamName=sample-stream \
+    -Daws-service.kinesis.instance.type=remote \
+    -Daws.host=kinesis:4566 \
+    -Daws.access.key=accesskey \
+    -Daws.secret.key=secretkey \
+    -Daws.cborEnabled=false \
+    -jar /deployments/example.jar
+```
+
+Run the Example
+======
+
+```shell
+java -Dresume.type=kafka \
+           -Dresume.type.kafka.topic=aws-kinesis-offsets \
+           -Dbootstrap.address=REPLACE-WITH-KAFKA-HOST:9092 \
+           -Daws.kinesis.streamName=sample-stream \
+           -Daws-service.kinesis.instance.type=remote \
+           -Daws.host=REPLACE-WITH-KINESIS-HOST:4566 \
+           -Daws.access.key=accesskey \
+           -Daws.secret.key=secretkey \
+           -Daws.cborEnabled=false \
+           -Dbatch.size=50 \
+           -jar /deployments/example.jar
+```
diff --git a/examples/resume-api/resume-api-aws2-kinesis/docker-compose.yaml b/examples/resume-api/resume-api-aws2-kinesis/docker-compose.yaml
new file mode 100644
index 00000000..524d8a16
--- /dev/null
+++ b/examples/resume-api/resume-api-aws2-kinesis/docker-compose.yaml
@@ -0,0 +1,63 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+version: '3.4'
+services:
+  kinesis:
+    image: localstack/localstack:latest
+    logging:
+      driver: "none"
+    ports:
+      - "4566:4566"
+#      - "4571:4571"
+    environment:
+      SERVICES: kinesis
+  zookeeper:
+    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
+    logging:
+      driver: "none"
+    command: [
+      "sh", "-c",
+      "bin/zookeeper-server-start.sh config/zookeeper.properties"
+    ]
+    ports:
+      - "2181:2181"
+    environment:
+      LOG_DIR: /tmp/logs
+  kafka:
+    image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
+    logging:
+      driver: "none"
+    command: [
+      "sh", "-c",
+      "sleep 10s && bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
+    ]
+    depends_on:
+      - zookeeper
+    ports:
+      - "9092:9092"
+    environment:
+      LOG_DIR: "/tmp/logs"
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+  example:
+    build:
+      dockerfile: src/main/docker/Dockerfile
+      context: .
+    depends_on:
+      - kafka
diff --git a/examples/resume-api/resume-api-aws2-kinesis/pom.xml b/examples/resume-api/resume-api-aws2-kinesis/pom.xml
new file mode 100644
index 00000000..9c30824b
--- /dev/null
+++ b/examples/resume-api/resume-api-aws2-kinesis/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>camel-example-resume-api-parent</artifactId>
+        <groupId>org.apache.camel.example</groupId>
+        <version>3.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>resume-api-aws2-kinesis</artifactId>
+    <name>Camel :: Example :: Resume API :: AWS Kinesis</name>
+
+    <properties>
+        <resume.main.class>org.apache.camel.example.resume.aws.kinesis.main.MainApp</resume.main.class>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-log</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-caffeine</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-aws2-kinesis</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel.example</groupId>
+            <artifactId>resume-api-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- test infra: we need it for loading the data and creating the (custom) client -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test-infra-aws-v2</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.camel</groupId>
+                <artifactId>camel-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/examples/resume-api/resume-api-aws2-kinesis/src/main/docker/Dockerfile b/examples/resume-api/resume-api-aws2-kinesis/src/main/docker/Dockerfile
new file mode 100644
index 00000000..50067719
--- /dev/null
+++ b/examples/resume-api/resume-api-aws2-kinesis/src/main/docker/Dockerfile
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+FROM fedora:35 as resume-api-file-offset
+LABEL maintainer="orpiske@apache.org"
+
+
+ENV DATA_DIR /data/source
+ENV DATA_FILE data.txt
+ENV DEPLOYMENT_DIR /deployments
+
+RUN dnf install -y java-11-openjdk-headless tree && dnf clean all
+ENV JAVA_HOME /etc/alternatives/jre
+
+COPY target/resume-api-*with-dependencies.jar ${DEPLOYMENT_DIR}/example.jar
+COPY src/main/scripts/run.sh ${DEPLOYMENT_DIR}/run.sh
+
+RUN mkdir -p ${DATA_DIR} && \
+    cd ${DATA_DIR} && \
+    chmod +x ${DEPLOYMENT_DIR}/*.sh
+WORKDIR ${DEPLOYMENT_DIR}
+CMD [ "sh", "-c", "${DEPLOYMENT_DIR}/run.sh" ]
diff --git a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java
new file mode 100644
index 00000000..7968f2c2
--- /dev/null
+++ b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/KinesisRoute.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.aws.kinesis.main;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
+import org.apache.camel.resume.ResumeStrategy;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.camel.support.resume.Resumables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+
+public class KinesisRoute extends RouteBuilder {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisRoute.class);
+
+    private final String streamName;
+    private final ResumeStrategy resumeStrategy;
+    private final ResumeCache<String> resumeCache;
+    private final KinesisClient client;
+    private final CountDownLatch latch;
+
+    public KinesisRoute(String streamName, ResumeStrategy resumeStrategy, ResumeCache<String> resumeCache, KinesisClient client, CountDownLatch latch) {
+        this.streamName = streamName;
+        this.resumeStrategy = resumeStrategy;
+        this.resumeCache = resumeCache;
+        this.client = client;
+        this.latch = latch;
+    }
+
+    private void addResumeOffset(Exchange exchange) {
+        String sequenceNumber = exchange.getMessage().getHeader(Kinesis2Constants.SEQUENCE_NUMBER, String.class);
+        exchange.getMessage().setHeader(Exchange.OFFSET, Resumables.of(streamName, sequenceNumber));
+
+        String body = exchange.getMessage().getBody(String.class);
+        LOG.warn("Processing: {} with sequence number {}", body, sequenceNumber);
+        latch.countDown();
+    }
+
+    @Override
+    public void configure() {
+        bindToRegistry("testResumeStrategy", resumeStrategy);
+        bindToRegistry("resumeCache", resumeCache);
+        bindToRegistry("amazonKinesisClient", client);
+
+        String kinesisEndpointUri = "aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient";
+
+        fromF(kinesisEndpointUri, streamName)
+                .process(this::addResumeOffset)
+                .resumable("testResumeStrategy");
+    }
+}
diff --git a/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/MainApp.java b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/MainApp.java
new file mode 100644
index 00000000..77bd69b5
--- /dev/null
+++ b/examples/resume-api/resume-api-aws2-kinesis/src/main/java/org/apache/camel/example/resume/aws/kinesis/main/MainApp.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.example.resume.aws.kinesis.main;
+
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.caffeine.resume.CaffeineCache;
+import org.apache.camel.main.Main;
+import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.clients.KinesisUtils;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+
+/**
+ * A Camel Application
+ */
+public class MainApp {
+
+    /**
+     * A main() so we can easily run these routing rules in our IDE
+     */
+    public static void main(String... args) throws Exception {
+        Main main = new Main();
+
+        String streamName = System.getProperty("aws.kinesis.streamName", "aws-kinesis-test");
+
+        String action = System.getProperty("resume.action");
+        KinesisClient client = AWSSDKClientUtils.newKinesisClient();
+        if ("load".equalsIgnoreCase(action)) {
+            // do load
+            loadData(client, streamName, 500);
+            return;
+        }
+
+        SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy = getUpdatableConsumerResumeStrategyForSet();
+        Integer batchSize = Integer.parseInt(System.getProperty("batch.size", "50"));
+        CountDownLatch latch = new CountDownLatch(batchSize);
+
+        Executors.newSingleThreadExecutor().submit(() -> waitForStop(main, latch));
+
+        RouteBuilder routeBuilder = new KinesisRoute(streamName, resumeStrategy, new CaffeineCache<>(100), client, latch);
+
+        main.configure().addRoutesBuilder(routeBuilder);
+        main.start();
+    }
+
+    private static SingleNodeKafkaResumeStrategy<Resumable> getUpdatableConsumerResumeStrategyForSet() {
+        String bootStrapAddress = System.getProperty("bootstrap.address", "localhost:9092");
+        String kafkaTopic = System.getProperty("resume.type.kafka.topic", "offsets");
+
+        final Properties consumerProperties = SingleNodeKafkaResumeStrategy.createConsumer(bootStrapAddress);
+        consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        final Properties producerProperties = SingleNodeKafkaResumeStrategy.createProducer(bootStrapAddress);
+        return new SingleNodeKafkaResumeStrategy<>(kafkaTopic, producerProperties, consumerProperties);
+    }
+
+    private static void loadData(KinesisClient client, String streamName, int recordCount) {
+        KinesisUtils.createStream(client, streamName);
+        KinesisUtils.putRecords(client, streamName, recordCount);
+    }
+
+    private static void waitForStop(Main main, CountDownLatch latch) {
+        try {
+            main.stop();
+
+            while (!main.isStopped()) {
+                Thread.sleep(1000);
+            }
+
+            latch.await();
+            System.exit(0);
+        } catch (InterruptedException e) {
+            System.exit(1);
+        }
+    }
+
+}
+
diff --git a/examples/resume-api/resume-api-aws2-kinesis/src/main/resources/log4j2.properties b/examples/resume-api/resume-api-aws2-kinesis/src/main/resources/log4j2.properties
new file mode 100644
index 00000000..153b7831
--- /dev/null
+++ b/examples/resume-api/resume-api-aws2-kinesis/src/main/resources/log4j2.properties
@@ -0,0 +1,69 @@
+
+# Single file
+#appender.out.type = File
+#appender.out.name = file
+#appender.out.fileName = logs/test.log
+#appender.out.layout.type = PatternLayout
+#appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+
+appender.rolling-out.type = RollingFile
+appender.rolling-out.name = rolling-out
+appender.rolling-out.fileName = logs/managed-resume-restart.log
+appender.rolling-out.filePattern = logs/managed-resume-restart-%d{yyyyMMdd-HHmmss}.log
+appender.rolling-out.layout.type = PatternLayout
+# This logs the thread name and so on, but it's usually not helpful
+#appender.rolling-out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+appender.rolling-out.layout.pattern = %d{DEFAULT} [%-5p] %m%n
+appender.rolling-out.policies.type = Policies
+appender.rolling-out.policies.size.type = OnStartupTriggeringPolicy
+
+# For console
+appender.console.type = Console
+appender.console.name = console
+appender.console.layout.type = PatternLayout
+#appender.console.layout.pattern = [%t] %c --- %style{%d{DEFAULT}}{dim} [%highlight{%-5p}] %m%n
+appender.console.layout.pattern = %style{%d{DEFAULT}}{dim} [%highlight{%-5p}] %m%n
+
+logger.camel.name = org.apache.camel
+logger.camel.level = WARN
+logger.camel.additivity = false
+logger.camel.appenderRef.file.ref = rolling-out
+
+logger.camel-resume.name = org.apache.camel.processor.resume
+logger.camel-resume.level = DEBUG
+logger.camel-resume.additivity = false
+logger.camel-resume.appenderRef.file.ref = rolling-out
+logger.camel-resume.appenderRef.console.ref = console
+
+logger.tester.name = org.apache.camel.example.resume
+logger.tester.level = DEBUG
+logger.tester.additivity = false
+logger.tester.appenderRef.file.ref = rolling-out
+logger.tester.appenderRef.console.ref = console
+
+logger.camel-aws-resume.name = org.apache.camel.component.aws2.kinesis
+logger.camel-aws-resume.level = DEBUG
+logger.camel-aws-resume.additivity = false
+logger.camel-aws-resume.appenderRef.file.ref = rolling-out
+logger.camel-aws-resume.appenderRef.console.ref = console
+
+
+logger.camel-aws-test-infra.name = org.apache.camel.test.infra.aws2.clients
+logger.camel-aws-test-infra.level = DEBUG
+logger.camel-aws-test-infra.additivity = false
+logger.camel-aws-test-infra.appenderRef.file.ref = rolling-out
+logger.camel-aws-test-infra.appenderRef.console.ref = console
+
+logger.kafka.name = org.apache.kafka
+logger.kafka.level = INFO
+logger.kafka.additivity = false
+logger.kafka.appenderRef.file.ref = rolling-out
+
+logger.aws.name = software.amazon.awssdk
+logger.aws.level = INFO
+logger.aws.additivity = false
+logger.aws.appenderRef.file.ref = rolling-out
+
+rootLogger.level = DEBUG
+rootLogger.appenderRef.file.ref = rolling-out
+rootLogger.appenderRef.out.ref = console
diff --git a/examples/resume-api/resume-api-aws2-kinesis/src/main/scripts/run.sh b/examples/resume-api/resume-api-aws2-kinesis/src/main/scripts/run.sh
new file mode 100644
index 00000000..f3ad836d
--- /dev/null
+++ b/examples/resume-api/resume-api-aws2-kinesis/src/main/scripts/run.sh
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+ITERATIONS=${1:-5}
+BATCH_SIZE=${2:-50}
+
+echo "Loading data into Kinesis"
+  java -Dresume.type=kafka \
+    -Dresume.type.kafka.topic=aws-kinesis-offsets \
+    -Dbootstrap.address=kafka:9092 \
+    -Dresume.action=load \
+    -Daws.kinesis.streamName=sample-stream \
+    -Daws-service.kinesis.instance.type=remote \
+    -Daws.host=kinesis:4566 \
+    -Daws.access.key=accesskey \
+    -Daws.secret.key=secretkey \
+    -Daws.cborEnabled=false \
+    -jar /deployments/example.jar
+echo "Done loading"
+sleep 10s
+
+for i in $(seq 0 ${ITERATIONS}) ; do
+  echo "********************************************************************************"
+  echo "Running the iteration ${i} of ${ITERATIONS} with a batch of ${BATCH_SIZE} offsets"
+  echo "********************************************************************************"
+  java -Dresume.type=kafka \
+           -Dresume.type.kafka.topic=aws-kinesis-offsets \
+           -Dbootstrap.address=kafka:9092 \
+           -Daws.kinesis.streamName=sample-stream \
+           -Daws-service.kinesis.instance.type=remote \
+           -Daws.host=kinesis:4566 \
+           -Daws.access.key=accesskey \
+           -Daws.secret.key=secretkey \
+           -Daws.cborEnabled=false \
+           -Dbatch.size=${BATCH_SIZE} \
+           -jar /deployments/example.jar
+    echo "********************************************************************************"
+    echo "Finished the iteration ${i}"
+    echo "********************************************************************************"
+    sleep 2s
+done
+
+echo "###**************************************************************************###"
+echo "Resume simulation completed"
+echo "###**************************************************************************###"
+exit 0