You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/31 10:46:45 UTC

[camel] branch main updated: camel-aws2-kinesis - Consumer already have health-check OOTB. Make KinesisConnection generic and use by both consumer and producer. Polish. (#10919)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 872617423c8 camel-aws2-kinesis - Consumer already have health-check OOTB. Make KinesisConnection generic and use by both consumer and producer. Polish. (#10919)
872617423c8 is described below

commit 872617423c853dd7ab0d2f853e33b4c9abbadceb
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jul 31 12:46:38 2023 +0200

    camel-aws2-kinesis - Consumer already have health-check OOTB. Make KinesisConnection generic and use by both consumer and producer. Polish. (#10919)
---
 .../component/aws2/kinesis/Kinesis2Component.java  | 18 +++++++
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 38 +++++++------
 .../component/aws2/kinesis/Kinesis2Endpoint.java   | 19 ++-----
 .../component/aws2/kinesis/Kinesis2Producer.java   | 20 ++++++-
 .../kinesis/{consumer => }/KinesisConnection.java  | 42 +++++++--------
 .../aws2/kinesis/consumer/KinesisHealthCheck.java  | 63 ----------------------
 .../Kinesis2ConsumerHealthCheckProfileCredsIT.java |  2 +-
 .../KinesisConsumerClosedShardWithFailTest.java    | 25 ++-------
 .../KinesisConsumerClosedShardWithSilentTest.java  | 24 ++-------
 .../kinesis/integration/KinesisConsumerIT.java     |  2 -
 .../integration/KinesisConsumerResumeIT.java       |  2 -
 .../kinesis/integration/KinesisProducerIT.java     |  3 --
 12 files changed, 97 insertions(+), 161 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
index bfe2c1ab8ab..abd7a245c4c 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
@@ -23,6 +23,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.HealthCheckComponent;
+import org.apache.camel.util.IOHelper;
 
 @Component("aws2-kinesis")
 public class Kinesis2Component extends HealthCheckComponent {
@@ -30,6 +31,8 @@ public class Kinesis2Component extends HealthCheckComponent {
     @Metadata
     private Kinesis2Configuration configuration = new Kinesis2Configuration();
 
+    private KinesisConnection connection = new KinesisConnection();
+
     public Kinesis2Component() {
         this(null);
     }
@@ -65,4 +68,19 @@ public class Kinesis2Component extends HealthCheckComponent {
     public void setConfiguration(Kinesis2Configuration configuration) {
         this.configuration = configuration;
     }
+
+    @Override
+    protected void doStart() throws Exception {
+        // use pre-configured client (if any)
+        connection.setKinesisClient(configuration.getAmazonKinesisClient());
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        IOHelper.close(connection);
+    }
+
+    public KinesisConnection getConnection() {
+        return connection;
+    }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 70481d598dc..f70551aa53f 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
@@ -47,6 +46,8 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements ResumeAware<ResumeStrategy> {
     private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class);
+
+    private KinesisConnection connection;
     private boolean isShardClosed;
     private ResumeStrategy resumeStrategy;
 
@@ -55,21 +56,27 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         super(endpoint, processor);
     }
 
+    public KinesisConnection getConnection() {
+        return connection;
+    }
+
+    public void setConnection(KinesisConnection connection) {
+        this.connection = connection;
+    }
+
     @Override
     protected int poll() throws Exception {
-
         var processedExchangeCount = new AtomicInteger(0);
-        var kinesisConnection = KinesisConnection.getInstance();
 
         if (!getEndpoint().getConfiguration().getShardId().isEmpty()) {
             var request = DescribeStreamRequest
                     .builder()
                     .streamName(getEndpoint().getConfiguration().getStreamName())
                     .build();
-            DescribeStreamResponse response = null;
+            DescribeStreamResponse response;
             if (getEndpoint().getConfiguration().isAsyncClient()) {
                 try {
-                    response = kinesisConnection
+                    response = connection
                             .getAsyncClient(getEndpoint())
                             .describeStream(request)
                             .get();
@@ -77,7 +84,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
                     throw new RuntimeException(e);
                 }
             } else {
-                response = kinesisConnection
+                response = connection
                         .getClient(getEndpoint())
                         .describeStream(request);
             }
@@ -94,13 +101,13 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
                     .findFirst()
                     .orElseThrow(() -> new IllegalStateException("The shard can't be found"));
 
-            fetchAndPrepareRecordsForCamel(shard, kinesisConnection, processedExchangeCount);
+            fetchAndPrepareRecordsForCamel(shard, connection, processedExchangeCount);
 
         } else {
-            getShardList(kinesisConnection)
+            getShardList(connection)
                     .parallelStream()
                     .forEach(shard -> {
-                        fetchAndPrepareRecordsForCamel(shard, kinesisConnection, processedExchangeCount);
+                        fetchAndPrepareRecordsForCamel(shard, connection, processedExchangeCount);
                     });
         }
 
@@ -111,7 +118,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
             final Shard shard,
             final KinesisConnection kinesisConnection,
             AtomicInteger processedExchangeCount) {
-        String shardIterator = null;
+        String shardIterator;
         try {
             shardIterator = getShardIterator(shard, kinesisConnection);
         } catch (ExecutionException | InterruptedException e) {
@@ -131,7 +138,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
                         .getMaxResultsPerRequest())
                 .build();
 
-        GetRecordsResponse result = null;
+        GetRecordsResponse result;
         if (getEndpoint().getConfiguration().isAsyncClient()) {
             try {
                 result = kinesisConnection
@@ -218,7 +225,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
 
         resume(request);
 
-        GetShardIteratorResponse result = null;
+        GetShardIteratorResponse result;
         if (getEndpoint().getConfiguration().isAsyncClient()) {
             try {
                 result = kinesisConnection
@@ -295,6 +302,8 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     protected void doStart() throws Exception {
         super.doStart();
 
+        ObjectHelper.notNull(connection, "connection", this);
+
         if (resumeStrategy != null) {
             resumeStrategy.loadCache();
         }
@@ -305,13 +314,12 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     }
 
     private List<Shard> getShardList(final KinesisConnection kinesisConnection) {
-
         var request = ListShardsRequest
                 .builder()
                 .streamName(getEndpoint().getConfiguration().getStreamName())
                 .build();
 
-        List<Shard> shardList = null;
+        List<Shard> shardList;
         if (getEndpoint().getConfiguration().isAsyncClient()) {
             try {
                 shardList = kinesisConnection
@@ -330,6 +338,6 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         }
 
         return shardList;
-
     }
+
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index b97222fadd7..6ab8d5f7cc4 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -17,14 +17,11 @@
 package org.apache.camel.component.aws2.kinesis;
 
 import java.util.Objects;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisHealthCheck;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.support.ScheduledPollEndpoint;
@@ -58,7 +55,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
     protected void doStart() throws Exception {
         super.doStart();
 
-        var kinesisConnection = KinesisConnection.getInstance();
+        var kinesisConnection = getComponent().getConnection();
 
         if (!configuration.isCborEnabled()) {
             System.setProperty(CBOR_ENABLED.property(), "false");
@@ -96,26 +93,20 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return new Kinesis2Producer(this);
+        Kinesis2Producer producer = new Kinesis2Producer(this);
+        producer.setConnection(getComponent().getConnection());
+        return producer;
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
         final Kinesis2Consumer consumer = new Kinesis2Consumer(this, processor);
+        consumer.setConnection(getComponent().getConnection());
         consumer.setSchedulerProperties(getSchedulerProperties());
-        startHealthChecks();
         configureConsumer(consumer);
         return consumer;
     }
 
-    private void startHealthChecks() {
-        var timeoutCheckerExecutorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
-                CONNECTION_CHECKER_EXECUTOR_NAME);
-        timeoutCheckerExecutorService.scheduleAtFixedRate(new KinesisHealthCheck(this),
-                0, 5 * 1000,
-                TimeUnit.MILLISECONDS);
-    }
-
     @Override
     public Kinesis2Component getComponent() {
         return (Kinesis2Component) super.getComponent();
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
index bbe4dcd9ee8..1763dd3fe83 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
@@ -21,16 +21,27 @@ import java.nio.ByteBuffer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
 import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
 
 public class Kinesis2Producer extends DefaultProducer {
 
+    private KinesisConnection connection;
+
     public Kinesis2Producer(Kinesis2Endpoint endpoint) {
         super(endpoint);
     }
 
+    public KinesisConnection getConnection() {
+        return connection;
+    }
+
+    public void setConnection(KinesisConnection connection) {
+        this.connection = connection;
+    }
+
     @Override
     public Kinesis2Endpoint getEndpoint() {
         return (Kinesis2Endpoint) super.getEndpoint();
@@ -39,7 +50,7 @@ public class Kinesis2Producer extends DefaultProducer {
     @Override
     public void process(Exchange exchange) throws Exception {
         PutRecordRequest request = createRequest(exchange);
-        PutRecordResponse putRecordResult = getEndpoint().getClient().putRecord(request);
+        PutRecordResponse putRecordResult = connection.getClient(getEndpoint()).putRecord(request);
         Message message = getMessageForResponse(exchange);
         message.setHeader(Kinesis2Constants.SEQUENCE_NUMBER, putRecordResult.sequenceNumber());
         message.setHeader(Kinesis2Constants.SHARD_ID, putRecordResult.shardId());
@@ -63,4 +74,11 @@ public class Kinesis2Producer extends DefaultProducer {
     public static Message getMessageForResponse(final Exchange exchange) {
         return exchange.getMessage();
     }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        ObjectHelper.notNull(connection, "connection", this);
+    }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java
similarity index 70%
rename from components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java
rename to components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java
index 4ac3c3046b5..c5c23f1270f 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java
@@ -14,35 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.aws2.kinesis.consumer;
+package org.apache.camel.component.aws2.kinesis;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Objects;
 
-import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint;
 import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 
-public class KinesisConnection {
-    private static volatile KinesisConnection instance;
-    private KinesisClient kinesisClient = null;
-    private KinesisAsyncClient kinesisAsyncClient = null;
-
-    private KinesisConnection() {
-    }
+/**
+ * Holds connections to AWS from {@link KinesisClient} and {@link KinesisAsyncClient}.
+ */
+public class KinesisConnection implements Closeable {
 
-    public static synchronized KinesisConnection getInstance() {
-        if (instance == null) {
-            synchronized (KinesisConnection.class) {
-                if (instance == null) {
-                    instance = new KinesisConnection();
-                }
-            }
-        }
-        return instance;
-    }
+    private KinesisClient kinesisClient;
+    private KinesisAsyncClient kinesisAsyncClient;
 
-    public KinesisClient getClient(final Kinesis2Endpoint endpoint) {
+    public synchronized KinesisClient getClient(final Kinesis2Endpoint endpoint) {
         if (Objects.isNull(kinesisClient)) {
             kinesisClient = endpoint.getConfiguration().getAmazonKinesisClient() != null
                     ? endpoint.getConfiguration().getAmazonKinesisClient()
@@ -51,7 +41,7 @@ public class KinesisConnection {
         return kinesisClient;
     }
 
-    public KinesisAsyncClient getAsyncClient(final Kinesis2Endpoint endpoint) {
+    public synchronized KinesisAsyncClient getAsyncClient(final Kinesis2Endpoint endpoint) {
         if (Objects.isNull(kinesisAsyncClient)) {
             kinesisAsyncClient = KinesisClientFactory
                     .getKinesisAsyncClient(endpoint.getConfiguration())
@@ -67,4 +57,14 @@ public class KinesisConnection {
     public void setKinesisAsyncClient(final KinesisAsyncClient kinesisAsyncClient) {
         this.kinesisAsyncClient = kinesisAsyncClient;
     }
+
+    @Override
+    public void close() throws IOException {
+        if (kinesisClient != null) {
+            kinesisClient.close();
+        }
+        if (kinesisAsyncClient != null) {
+            kinesisAsyncClient.close();
+        }
+    }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java
deleted file mode 100644
index 5bf59880dc2..00000000000
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.consumer;
-
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint;
-import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
-
-public class KinesisHealthCheck implements Runnable {
-    private Kinesis2Endpoint endpoint;
-
-    public KinesisHealthCheck(Kinesis2Endpoint endpoint) {
-        this.endpoint = endpoint;
-    }
-
-    @Override
-    public void run() {
-        var kinesisConnection = KinesisConnection.getInstance();
-        if (this.endpoint.getConfiguration().isAsyncClient()) {
-            try {
-                if (Objects.isNull(kinesisConnection.getAsyncClient(this.endpoint)) ||
-                        kinesisConnection.getAsyncClient(this.endpoint)
-                                .listStreams(ListStreamsRequest
-                                        .builder()
-                                        .build())
-                                .get()
-                                .streamNames()
-                                .isEmpty()) {
-                    kinesisConnection.setKinesisAsyncClient(endpoint.getAsyncClient());
-                }
-            } catch (ExecutionException | InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        } else {
-            if (Objects.isNull(kinesisConnection.getClient(this.endpoint)) ||
-                    kinesisConnection.getClient(this.endpoint)
-                            .listStreams(ListStreamsRequest
-                                    .builder()
-                                    .build())
-                            .streamNames()
-                            .isEmpty()) {
-                kinesisConnection.setKinesisClient(endpoint.getClient());
-            }
-        }
-    }
-
-}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsIT.java
index 32795c37d8b..eed9ce5d389 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsIT.java
@@ -47,9 +47,9 @@ public class Kinesis2ConsumerHealthCheckProfileCredsIT extends CamelTestSupport
     protected CamelContext createCamelContext() throws Exception {
         context = super.createCamelContext();
         context.getPropertiesComponent().setLocation("ref:prop");
+
         Kinesis2Component component = new Kinesis2Component(context);
         component.getConfiguration().setAmazonKinesisClient(AWSSDKClientUtils.newKinesisClient());
-        component.init();
         context.addComponent("aws2-kinesis", component);
 
         HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
index 2d7690a78fd..495942d298e 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -16,12 +16,10 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -54,8 +52,6 @@ public class KinesisConsumerClosedShardWithFailTest {
     private KinesisClient kinesisClient;
     @Mock
     private AsyncProcessor processor;
-    @Mock
-    private KinesisConnection kinesisConnection;
 
     private final CamelContext context = new DefaultCamelContext();
     private final Kinesis2Component component = new Kinesis2Component(context);
@@ -64,26 +60,25 @@ public class KinesisConsumerClosedShardWithFailTest {
 
     @BeforeEach
     public void setup() {
+        component.start();
+
         Kinesis2Configuration configuration = new Kinesis2Configuration();
         configuration.setAmazonKinesisClient(kinesisClient);
         configuration.setIteratorType(ShardIteratorType.LATEST);
         configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.fail);
         configuration.setStreamName("streamName");
 
-        setMock(kinesisConnection);
-
-        Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component);
+        Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo", configuration, component);
         endpoint.start();
         underTest = new Kinesis2Consumer(endpoint, processor);
+        underTest.setConnection(component.getConnection());
+        underTest.start();
 
         SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
         Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
         ArrayList<Shard> shardList = new ArrayList<>();
         shardList.add(shard);
 
-        when(kinesisConnection
-                .getClient(any(Kinesis2Endpoint.class))).thenReturn(kinesisClient);
-
         when(kinesisClient
                 .getRecords(any(GetRecordsRequest.class)))
                 .thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator").build());
@@ -95,16 +90,6 @@ public class KinesisConsumerClosedShardWithFailTest {
                 .thenReturn(ListShardsResponse.builder().shards(shardList).build());
     }
 
-    private void setMock(KinesisConnection mock) {
-        try {
-            Field instance = KinesisConnection.class.getDeclaredField("instance");
-            instance.setAccessible(true);
-            instance.set(instance, mock);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     @Test
     public void itObtainsAShardIteratorOnFirstPoll() {
         assertThrows(IllegalStateException.class, () -> {
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 4605cd5b60f..d14e5ebb332 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -16,13 +16,11 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
-import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -62,8 +60,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
     private KinesisClient kinesisClient;
     @Mock
     private AsyncProcessor processor;
-    @Mock
-    private KinesisConnection kinesisConnection;
+
     private final CamelContext context = new DefaultCamelContext();
     private final Kinesis2Component component = new Kinesis2Component(context);
 
@@ -71,26 +68,25 @@ public class KinesisConsumerClosedShardWithSilentTest {
 
     @BeforeEach
     public void setup() {
+        component.start();
+
         Kinesis2Configuration configuration = new Kinesis2Configuration();
         configuration.setAmazonKinesisClient(kinesisClient);
         configuration.setIteratorType(ShardIteratorType.LATEST);
         configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.silent);
         configuration.setStreamName("streamName");
 
-        setMock(kinesisConnection);
-
         Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo", configuration, component);
         endpoint.start();
         underTest = new Kinesis2Consumer(endpoint, processor);
+        underTest.setConnection(component.getConnection());
+        underTest.start();
 
         SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
         Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
         ArrayList<Shard> shardList = new ArrayList<>();
         shardList.add(shard);
 
-        when(kinesisConnection
-                .getClient(any(Kinesis2Endpoint.class))).thenReturn(kinesisClient);
-
         when(kinesisClient
                 .getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder()
                         .nextShardIterator("shardIterator")
@@ -115,16 +111,6 @@ public class KinesisConsumerClosedShardWithSilentTest {
 
     }
 
-    private void setMock(KinesisConnection mock) {
-        try {
-            Field instance = KinesisConnection.class.getDeclaredField("instance");
-            instance.setAccessible(true);
-            instance.set(instance, mock);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     @Test
     public void itObtainsAShardIteratorOnFirstPoll() throws Exception {
         underTest.poll();
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
index 7cc59431eb8..754862b44c1 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
@@ -25,7 +25,6 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.Message;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -80,7 +79,6 @@ public class KinesisConsumerIT extends CamelTestSupport {
     @Override
     protected RouteBuilder createRouteBuilder() {
         client = AWSSDKClientUtils.newKinesisClient();
-        KinesisConnection.getInstance().setKinesisClient(client);
 
         context.getRegistry().bind("amazonKinesisClient", client);
 
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
index 20f508e13bd..26686b3cb33 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
@@ -25,7 +25,6 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.Message;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
@@ -133,7 +132,6 @@ public class KinesisConsumerResumeIT extends CamelTestSupport {
     @Override
     protected RouteBuilder createRouteBuilder() {
         client = AWSSDKClientUtils.newKinesisClient();
-        KinesisConnection.getInstance().setKinesisClient(client);
 
         context.getRegistry().bind("amazonKinesisClient", client);
 
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
index 7c5207ffd70..036f54f90e9 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
@@ -25,7 +25,6 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -120,8 +119,6 @@ public class KinesisProducerIT extends CamelTestSupport {
     protected RouteBuilder createRouteBuilder() {
         client = AWSSDKClientUtils.newKinesisClient();
 
-        KinesisConnection.getInstance().setKinesisClient(client);
-
         context.getRegistry().bind("amazonKinesisClient", client);
 
         return new RouteBuilder() {