You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/07/28 13:55:06 UTC

[camel] branch main updated: kinesis connection retry mechanism added into the current consumer (#10870)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new c08104ee967 kinesis connection retry mechanism added into the current consumer (#10870)
c08104ee967 is described below

commit c08104ee967dbe112af83b7813084a61d0bbd89f
Author: Hamed Hatami <ja...@gmail.com>
AuthorDate: Fri Jul 28 15:55:00 2023 +0200

    kinesis connection retry mechanism added into the current consumer (#10870)
    
    * kinesis connection retry mechanism added into the current consumer
    
    * kinesis connection retry mechanism added into the current consumer
    
    * kinesis connection retry mechanism added into the current consumer
    
    * kinesis connection retry mechanism added into the current consumer
    
    * kinesis connection retry mechanism added into the current consumer
    
    * kinesis connection retry mechanism added into the current consumer
    
    * kinesis connection retry mechanism added into the current consumer
    
    ---------
    
    Co-authored-by: Hamed Hatami <ha...@postnord.com>
---
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 60 +++++++++++--------
 .../component/aws2/kinesis/Kinesis2Endpoint.java   | 25 +++++---
 .../aws2/kinesis/consumer/KinesisConnection.java   | 70 ++++++++++++++++++++++
 .../aws2/kinesis/consumer/KinesisHealthCheck.java  | 63 +++++++++++++++++++
 .../KinesisConsumerClosedShardWithFailTest.java    | 29 ++++++++-
 .../KinesisConsumerClosedShardWithSilentTest.java  | 48 +++++++++++----
 .../kinesis/integration/KinesisConsumerIT.java     | 23 ++++---
 .../integration/KinesisConsumerResumeIT.java       |  7 ++-
 .../kinesis/integration/KinesisProducerIT.java     |  3 +
 9 files changed, 266 insertions(+), 62 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 6aef226e5ed..70481d598dc 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
@@ -33,8 +34,6 @@ import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
 import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
@@ -51,7 +50,8 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     private boolean isShardClosed;
     private ResumeStrategy resumeStrategy;
 
-    public Kinesis2Consumer(Kinesis2Endpoint endpoint, Processor processor) {
+    public Kinesis2Consumer(Kinesis2Endpoint endpoint,
+                            Processor processor) {
         super(endpoint, processor);
     }
 
@@ -59,6 +59,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     protected int poll() throws Exception {
 
         var processedExchangeCount = new AtomicInteger(0);
+        var kinesisConnection = KinesisConnection.getInstance();
 
         if (!getEndpoint().getConfiguration().getShardId().isEmpty()) {
             var request = DescribeStreamRequest
@@ -68,14 +69,17 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
             DescribeStreamResponse response = null;
             if (getEndpoint().getConfiguration().isAsyncClient()) {
                 try {
-                    response = getAsyncClient()
+                    response = kinesisConnection
+                            .getAsyncClient(getEndpoint())
                             .describeStream(request)
                             .get();
                 } catch (ExecutionException | InterruptedException e) {
                     throw new RuntimeException(e);
                 }
             } else {
-                response = getClient().describeStream(request);
+                response = kinesisConnection
+                        .getClient(getEndpoint())
+                        .describeStream(request);
             }
 
             var shard = response
@@ -90,13 +94,13 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
                     .findFirst()
                     .orElseThrow(() -> new IllegalStateException("The shard can't be found"));
 
-            fetchAndPrepareRecordsForCamel(shard, processedExchangeCount);
+            fetchAndPrepareRecordsForCamel(shard, kinesisConnection, processedExchangeCount);
 
         } else {
-            getShardList()
+            getShardList(kinesisConnection)
                     .parallelStream()
                     .forEach(shard -> {
-                        fetchAndPrepareRecordsForCamel(shard, processedExchangeCount);
+                        fetchAndPrepareRecordsForCamel(shard, kinesisConnection, processedExchangeCount);
                     });
         }
 
@@ -105,10 +109,11 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
 
     private void fetchAndPrepareRecordsForCamel(
             final Shard shard,
+            final KinesisConnection kinesisConnection,
             AtomicInteger processedExchangeCount) {
         String shardIterator = null;
         try {
-            shardIterator = getShardIterator(shard);
+            shardIterator = getShardIterator(shard, kinesisConnection);
         } catch (ExecutionException | InterruptedException e) {
             throw new RuntimeException(e);
         }
@@ -129,14 +134,17 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         GetRecordsResponse result = null;
         if (getEndpoint().getConfiguration().isAsyncClient()) {
             try {
-                result = getAsyncClient()
+                result = kinesisConnection
+                        .getAsyncClient(getEndpoint())
                         .getRecords(req)
                         .get();
             } catch (ExecutionException | InterruptedException e) {
                 throw new RuntimeException(e);
             }
         } else {
-            result = getClient().getRecords(req);
+            result = kinesisConnection
+                    .getClient(getEndpoint())
+                    .getRecords(req);
         }
 
         try {
@@ -186,20 +194,15 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         return processedExchanges;
     }
 
-    private KinesisClient getClient() {
-        return getEndpoint().getClient();
-    }
-
-    private KinesisAsyncClient getAsyncClient() {
-        return getEndpoint().getAsyncClient();
-    }
-
     @Override
     public Kinesis2Endpoint getEndpoint() {
         return (Kinesis2Endpoint) super.getEndpoint();
     }
 
-    private String getShardIterator(final Shard shard) throws ExecutionException, InterruptedException {
+    private String getShardIterator(
+            final Shard shard,
+            final KinesisConnection kinesisConnection)
+            throws ExecutionException, InterruptedException {
 
         var shardId = shard.shardId();
         isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
@@ -218,14 +221,17 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         GetShardIteratorResponse result = null;
         if (getEndpoint().getConfiguration().isAsyncClient()) {
             try {
-                result = getAsyncClient()
+                result = kinesisConnection
+                        .getAsyncClient(getEndpoint())
                         .getShardIterator(request.build())
                         .get();
             } catch (ExecutionException | InterruptedException e) {
                 throw new RuntimeException(e);
             }
         } else {
-            result = getClient().getShardIterator(request.build());
+            result = kinesisConnection
+                    .getClient(getEndpoint())
+                    .getShardIterator(request.build());
         }
 
         return result.shardIterator();
@@ -298,7 +304,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         return getEndpoint().getConfiguration();
     }
 
-    private List<Shard> getShardList() {
+    private List<Shard> getShardList(final KinesisConnection kinesisConnection) {
 
         var request = ListShardsRequest
                 .builder()
@@ -308,7 +314,8 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         List<Shard> shardList = null;
         if (getEndpoint().getConfiguration().isAsyncClient()) {
             try {
-                shardList = getAsyncClient()
+                shardList = kinesisConnection
+                        .getAsyncClient(getEndpoint())
                         .listShards(request)
                         .get()
                         .shards();
@@ -316,7 +323,10 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
                 throw new RuntimeException(e);
             }
         } else {
-            shardList = getClient().listShards(request).shards();
+            shardList = kinesisConnection
+                    .getClient(getEndpoint())
+                    .listShards(request)
+                    .shards();
         }
 
         return shardList;
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index e4d4d122c96..b97222fadd7 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -17,12 +17,14 @@
 package org.apache.camel.component.aws2.kinesis;
 
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisHealthCheck;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.support.ScheduledPollEndpoint;
@@ -45,6 +47,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
 
     private KinesisClient kinesisClient;
     private KinesisAsyncClient kinesisAsyncClient;
+    private static final String CONNECTION_CHECKER_EXECUTOR_NAME = "Kinesis_Streaming_Connection_Checker";
 
     public Kinesis2Endpoint(String uri, Kinesis2Configuration configuration, Kinesis2Component component) {
         super(uri, component);
@@ -54,19 +57,18 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
+
+        var kinesisConnection = KinesisConnection.getInstance();
+
         if (!configuration.isCborEnabled()) {
             System.setProperty(CBOR_ENABLED.property(), "false");
         }
 
         if (configuration.isAsyncClient() &&
                 Objects.isNull(configuration.getAmazonKinesisClient())) {
-            kinesisAsyncClient = KinesisClientFactory
-                    .getKinesisAsyncClient(configuration)
-                    .getKinesisAsyncClient();
+            kinesisAsyncClient = kinesisConnection.getAsyncClient(this);
         } else {
-            kinesisClient = configuration.getAmazonKinesisClient() != null
-                    ? configuration.getAmazonKinesisClient()
-                    : KinesisClientFactory.getKinesisClient(configuration).getKinesisClient();
+            kinesisClient = kinesisConnection.getClient(this);
         }
 
         if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
@@ -101,10 +103,19 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
     public Consumer createConsumer(Processor processor) throws Exception {
         final Kinesis2Consumer consumer = new Kinesis2Consumer(this, processor);
         consumer.setSchedulerProperties(getSchedulerProperties());
+        startHealthChecks();
         configureConsumer(consumer);
         return consumer;
     }
 
+    private void startHealthChecks() {
+        var timeoutCheckerExecutorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
+                CONNECTION_CHECKER_EXECUTOR_NAME);
+        timeoutCheckerExecutorService.scheduleAtFixedRate(new KinesisHealthCheck(this),
+                0, 5 * 1000,
+                TimeUnit.MILLISECONDS);
+    }
+
     @Override
     public Kinesis2Component getComponent() {
         return (Kinesis2Component) super.getComponent();
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java
new file mode 100644
index 00000000000..4ac3c3046b5
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis.consumer;
+
+import java.util.Objects;
+
+import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint;
+import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+
+public class KinesisConnection {
+    private static volatile KinesisConnection instance;
+    private KinesisClient kinesisClient = null;
+    private KinesisAsyncClient kinesisAsyncClient = null;
+
+    private KinesisConnection() {
+    }
+
+    public static synchronized KinesisConnection getInstance() {
+        if (instance == null) {
+            synchronized (KinesisConnection.class) {
+                if (instance == null) {
+                    instance = new KinesisConnection();
+                }
+            }
+        }
+        return instance;
+    }
+
+    public KinesisClient getClient(final Kinesis2Endpoint endpoint) {
+        if (Objects.isNull(kinesisClient)) {
+            kinesisClient = endpoint.getConfiguration().getAmazonKinesisClient() != null
+                    ? endpoint.getConfiguration().getAmazonKinesisClient()
+                    : KinesisClientFactory.getKinesisClient(endpoint.getConfiguration()).getKinesisClient();
+        }
+        return kinesisClient;
+    }
+
+    public KinesisAsyncClient getAsyncClient(final Kinesis2Endpoint endpoint) {
+        if (Objects.isNull(kinesisAsyncClient)) {
+            kinesisAsyncClient = KinesisClientFactory
+                    .getKinesisAsyncClient(endpoint.getConfiguration())
+                    .getKinesisAsyncClient();
+        }
+        return kinesisAsyncClient;
+    }
+
+    public void setKinesisClient(final KinesisClient kinesisClient) {
+        this.kinesisClient = kinesisClient;
+    }
+
+    public void setKinesisAsyncClient(final KinesisAsyncClient kinesisAsyncClient) {
+        this.kinesisAsyncClient = kinesisAsyncClient;
+    }
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java
new file mode 100644
index 00000000000..5bf59880dc2
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis.consumer;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint;
+import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
+
+public class KinesisHealthCheck implements Runnable {
+    private Kinesis2Endpoint endpoint;
+
+    public KinesisHealthCheck(Kinesis2Endpoint endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public void run() {
+        var kinesisConnection = KinesisConnection.getInstance();
+        if (this.endpoint.getConfiguration().isAsyncClient()) {
+            try {
+                if (Objects.isNull(kinesisConnection.getAsyncClient(this.endpoint)) ||
+                        kinesisConnection.getAsyncClient(this.endpoint)
+                                .listStreams(ListStreamsRequest
+                                        .builder()
+                                        .build())
+                                .get()
+                                .streamNames()
+                                .isEmpty()) {
+                    kinesisConnection.setKinesisAsyncClient(endpoint.getAsyncClient());
+                }
+            } catch (ExecutionException | InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            if (Objects.isNull(kinesisConnection.getClient(this.endpoint)) ||
+                    kinesisConnection.getClient(this.endpoint)
+                            .listStreams(ListStreamsRequest
+                                    .builder()
+                                    .build())
+                            .streamNames()
+                            .isEmpty()) {
+                kinesisConnection.setKinesisClient(endpoint.getClient());
+            }
+        }
+    }
+
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
index 48e739e423f..2d7690a78fd 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -16,10 +16,12 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -52,6 +54,8 @@ public class KinesisConsumerClosedShardWithFailTest {
     private KinesisClient kinesisClient;
     @Mock
     private AsyncProcessor processor;
+    @Mock
+    private KinesisConnection kinesisConnection;
 
     private final CamelContext context = new DefaultCamelContext();
     private final Kinesis2Component component = new Kinesis2Component(context);
@@ -65,6 +69,9 @@ public class KinesisConsumerClosedShardWithFailTest {
         configuration.setIteratorType(ShardIteratorType.LATEST);
         configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.fail);
         configuration.setStreamName("streamName");
+
+        setMock(kinesisConnection);
+
         Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component);
         endpoint.start();
         underTest = new Kinesis2Consumer(endpoint, processor);
@@ -74,14 +81,30 @@ public class KinesisConsumerClosedShardWithFailTest {
         ArrayList<Shard> shardList = new ArrayList<>();
         shardList.add(shard);
 
-        when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
+        when(kinesisConnection
+                .getClient(any(Kinesis2Endpoint.class))).thenReturn(kinesisClient);
+
+        when(kinesisClient
+                .getRecords(any(GetRecordsRequest.class)))
                 .thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator").build());
-        when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
+        when(kinesisClient
+                .getShardIterator(any(GetShardIteratorRequest.class)))
                 .thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
-        when(kinesisClient.listShards(any(ListShardsRequest.class)))
+        when(kinesisClient
+                .listShards(any(ListShardsRequest.class)))
                 .thenReturn(ListShardsResponse.builder().shards(shardList).build());
     }
 
+    private void setMock(KinesisConnection mock) {
+        try {
+            Field instance = KinesisConnection.class.getDeclaredField("instance");
+            instance.setAccessible(true);
+            instance.set(instance, mock);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Test
     public void itObtainsAShardIteratorOnFirstPoll() {
         assertThrows(IllegalStateException.class, () -> {
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 2b34ca315ed..4605cd5b60f 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -16,11 +16,13 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
+import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -60,7 +62,8 @@ public class KinesisConsumerClosedShardWithSilentTest {
     private KinesisClient kinesisClient;
     @Mock
     private AsyncProcessor processor;
-
+    @Mock
+    private KinesisConnection kinesisConnection;
     private final CamelContext context = new DefaultCamelContext();
     private final Kinesis2Component component = new Kinesis2Component(context);
 
@@ -73,6 +76,9 @@ public class KinesisConsumerClosedShardWithSilentTest {
         configuration.setIteratorType(ShardIteratorType.LATEST);
         configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.silent);
         configuration.setStreamName("streamName");
+
+        setMock(kinesisConnection);
+
         Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo", configuration, component);
         endpoint.start();
         underTest = new Kinesis2Consumer(endpoint, processor);
@@ -82,21 +88,41 @@ public class KinesisConsumerClosedShardWithSilentTest {
         ArrayList<Shard> shardList = new ArrayList<>();
         shardList.add(shard);
 
-        when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder()
-                .nextShardIterator("shardIterator")
-                .records(
-                        Record.builder().sequenceNumber("1").data(SdkBytes.fromString("Hello", Charset.defaultCharset()))
-                                .build(),
-                        Record.builder().sequenceNumber("2").data(SdkBytes.fromString("Hello", Charset.defaultCharset()))
-                                .build())
-                .build());
-        when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
+        when(kinesisConnection
+                .getClient(any(Kinesis2Endpoint.class))).thenReturn(kinesisClient);
+
+        when(kinesisClient
+                .getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder()
+                        .nextShardIterator("shardIterator")
+                        .records(
+                                Record.builder().sequenceNumber("1")
+                                        .data(SdkBytes.fromString("Hello", Charset.defaultCharset()))
+                                        .build(),
+                                Record.builder().sequenceNumber("2")
+                                        .data(SdkBytes.fromString("Hello", Charset.defaultCharset()))
+                                        .build())
+                        .build());
+
+        when(kinesisClient
+                .getShardIterator(any(GetShardIteratorRequest.class)))
                 .thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
-        when(kinesisClient.listShards(any(ListShardsRequest.class)))
+        when(kinesisClient
+                .listShards(any(ListShardsRequest.class)))
                 .thenReturn(ListShardsResponse.builder().shards(shardList).build());
 
         context.start();
         underTest.start();
+
+    }
+
+    private void setMock(KinesisConnection mock) {
+        try {
+            Field instance = KinesisConnection.class.getDeclaredField("instance");
+            instance.setAccessible(true);
+            instance.set(instance, mock);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Test
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
index beebc63905e..7cc59431eb8 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
@@ -22,11 +22,10 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.EndpointInject;
-import org.apache.camel.Exchange;
 import org.apache.camel.Message;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -81,6 +80,7 @@ public class KinesisConsumerIT extends CamelTestSupport {
     @Override
     protected RouteBuilder createRouteBuilder() {
         client = AWSSDKClientUtils.newKinesisClient();
+        KinesisConnection.getInstance().setKinesisClient(client);
 
         context.getRegistry().bind("amazonKinesisClient", client);
 
@@ -90,20 +90,17 @@ public class KinesisConsumerIT extends CamelTestSupport {
                 String kinesisEndpointUri = "aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient";
 
                 fromF(kinesisEndpointUri, streamName)
-                        .process(new Processor() {
-                            @Override
-                            public void process(Exchange exchange) {
-                                KinesisData data = new KinesisData();
+                        .process(exchange -> {
+                            KinesisData data = new KinesisData();
 
-                                final Message message = exchange.getMessage();
+                            final Message message = exchange.getMessage();
 
-                                if (message != null) {
-                                    data.body = message.getBody(String.class);
-                                    data.partition = message.getHeader(Kinesis2Constants.PARTITION_KEY, String.class);
-                                }
-
-                                receivedMessages.add(data);
+                            if (message != null) {
+                                data.body = message.getBody(String.class);
+                                data.partition = message.getHeader(Kinesis2Constants.PARTITION_KEY, String.class);
                             }
+
+                            receivedMessages.add(data);
                         })
                         .to("mock:result");
             }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
index 2e6a9a8fe4a..20f508e13bd 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
@@ -25,6 +25,7 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.Message;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
@@ -132,6 +133,7 @@ public class KinesisConsumerResumeIT extends CamelTestSupport {
     @Override
     protected RouteBuilder createRouteBuilder() {
         client = AWSSDKClientUtils.newKinesisClient();
+        KinesisConnection.getInstance().setKinesisClient(client);
 
         context.getRegistry().bind("amazonKinesisClient", client);
 
@@ -145,7 +147,6 @@ public class KinesisConsumerResumeIT extends CamelTestSupport {
                 fromF(kinesisEndpointUri, streamName)
                         .process(exchange -> {
                             KinesisData data = new KinesisData();
-
                             final Message message = exchange.getMessage();
 
                             if (message != null) {
@@ -183,12 +184,12 @@ public class KinesisConsumerResumeIT extends CamelTestSupport {
     }
 
     @DisplayName("Tests that the component can resume messages from AWS Kinesis")
-    @Timeout(value = 2, unit = TimeUnit.MINUTES)
+    @Timeout(value = 3, unit = TimeUnit.MINUTES)
     @Test
     void testProduceMessages() {
         result.expectedMessageCount(expectedCount);
 
-        await().atMost(1, TimeUnit.MINUTES)
+        await().atMost(2, TimeUnit.MINUTES)
                 .untilAsserted(() -> result.assertIsSatisfied());
 
         assertEquals(expectedCount, receivedMessages.size());
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
index 036f54f90e9..7c5207ffd70 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
@@ -25,6 +25,7 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -119,6 +120,8 @@ public class KinesisProducerIT extends CamelTestSupport {
     protected RouteBuilder createRouteBuilder() {
         client = AWSSDKClientUtils.newKinesisClient();
 
+        KinesisConnection.getInstance().setKinesisClient(client);
+
         context.getRegistry().bind("amazonKinesisClient", client);
 
         return new RouteBuilder() {