You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/31 07:28:09 UTC

[camel] branch kin created (now 638581882d8)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch kin
in repository https://gitbox.apache.org/repos/asf/camel.git


      at 638581882d8 camel-aws2-kinesis - Consumer already have health-check OOTB. Make KinesisConnection generic and use by both consumer and producer. Polish.

This branch includes the following new commits:

     new 638581882d8 camel-aws2-kinesis - Consumer already have health-check OOTB. Make KinesisConnection generic and use by both consumer and producer. Polish.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 01/01: camel-aws2-kinesis - Consumer already have health-check OOTB. Make KinesisConnection generic and use by both consumer and producer. Polish.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch kin
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 638581882d8cd4541e7a8b0c8fc9f6470cd99d57
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jul 31 09:27:55 2023 +0200

    camel-aws2-kinesis - Consumer already have health-check OOTB. Make KinesisConnection generic and use by both consumer and producer. Polish.
---
 .../component/aws2/kinesis/Kinesis2Component.java  | 18 +++++++
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 38 +++++++------
 .../component/aws2/kinesis/Kinesis2Endpoint.java   | 19 ++-----
 .../component/aws2/kinesis/Kinesis2Producer.java   | 20 ++++++-
 .../kinesis/{consumer => }/KinesisConnection.java  | 42 +++++++--------
 .../aws2/kinesis/consumer/KinesisHealthCheck.java  | 63 ----------------------
 .../Kinesis2ConsumerHealthCheckProfileCredsIT.java |  2 +-
 .../KinesisConsumerClosedShardWithFailTest.java    | 25 ++-------
 .../KinesisConsumerClosedShardWithSilentTest.java  | 24 ++-------
 .../kinesis/integration/KinesisConsumerIT.java     |  2 -
 .../integration/KinesisConsumerResumeIT.java       |  2 -
 .../kinesis/integration/KinesisProducerIT.java     |  3 --
 12 files changed, 97 insertions(+), 161 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
index bfe2c1ab8ab..abd7a245c4c 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
@@ -23,6 +23,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.HealthCheckComponent;
+import org.apache.camel.util.IOHelper;
 
 @Component("aws2-kinesis")
 public class Kinesis2Component extends HealthCheckComponent {
@@ -30,6 +31,8 @@ public class Kinesis2Component extends HealthCheckComponent {
     @Metadata
     private Kinesis2Configuration configuration = new Kinesis2Configuration();
 
+    private KinesisConnection connection = new KinesisConnection();
+
     public Kinesis2Component() {
         this(null);
     }
@@ -65,4 +68,19 @@ public class Kinesis2Component extends HealthCheckComponent {
     public void setConfiguration(Kinesis2Configuration configuration) {
         this.configuration = configuration;
     }
+
+    @Override
+    protected void doStart() throws Exception {
+        // use pre-configured client (if any)
+        connection.setKinesisClient(configuration.getAmazonKinesisClient());
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        IOHelper.close(connection);
+    }
+
+    public KinesisConnection getConnection() {
+        return connection;
+    }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 70481d598dc..f70551aa53f 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
@@ -47,6 +46,8 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements ResumeAware<ResumeStrategy> {
     private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class);
+
+    private KinesisConnection connection;
     private boolean isShardClosed;
     private ResumeStrategy resumeStrategy;
 
@@ -55,21 +56,27 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         super(endpoint, processor);
     }
 
+    public KinesisConnection getConnection() {
+        return connection;
+    }
+
+    public void setConnection(KinesisConnection connection) {
+        this.connection = connection;
+    }
+
     @Override
     protected int poll() throws Exception {
-
         var processedExchangeCount = new AtomicInteger(0);
-        var kinesisConnection = KinesisConnection.getInstance();
 
         if (!getEndpoint().getConfiguration().getShardId().isEmpty()) {
             var request = DescribeStreamRequest
                     .builder()
                     .streamName(getEndpoint().getConfiguration().getStreamName())
                     .build();
-            DescribeStreamResponse response = null;
+            DescribeStreamResponse response;
             if (getEndpoint().getConfiguration().isAsyncClient()) {
                 try {
-                    response = kinesisConnection
+                    response = connection
                             .getAsyncClient(getEndpoint())
                             .describeStream(request)
                             .get();
@@ -77,7 +84,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
                     throw new RuntimeException(e);
                 }
             } else {
-                response = kinesisConnection
+                response = connection
                         .getClient(getEndpoint())
                         .describeStream(request);
             }
@@ -94,13 +101,13 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
                     .findFirst()
                     .orElseThrow(() -> new IllegalStateException("The shard can't be found"));
 
-            fetchAndPrepareRecordsForCamel(shard, kinesisConnection, processedExchangeCount);
+            fetchAndPrepareRecordsForCamel(shard, connection, processedExchangeCount);
 
         } else {
-            getShardList(kinesisConnection)
+            getShardList(connection)
                     .parallelStream()
                     .forEach(shard -> {
-                        fetchAndPrepareRecordsForCamel(shard, kinesisConnection, processedExchangeCount);
+                        fetchAndPrepareRecordsForCamel(shard, connection, processedExchangeCount);
                     });
         }
 
@@ -111,7 +118,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
             final Shard shard,
             final KinesisConnection kinesisConnection,
             AtomicInteger processedExchangeCount) {
-        String shardIterator = null;
+        String shardIterator;
         try {
             shardIterator = getShardIterator(shard, kinesisConnection);
         } catch (ExecutionException | InterruptedException e) {
@@ -131,7 +138,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
                         .getMaxResultsPerRequest())
                 .build();
 
-        GetRecordsResponse result = null;
+        GetRecordsResponse result;
         if (getEndpoint().getConfiguration().isAsyncClient()) {
             try {
                 result = kinesisConnection
@@ -218,7 +225,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
 
         resume(request);
 
-        GetShardIteratorResponse result = null;
+        GetShardIteratorResponse result;
         if (getEndpoint().getConfiguration().isAsyncClient()) {
             try {
                 result = kinesisConnection
@@ -295,6 +302,8 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     protected void doStart() throws Exception {
         super.doStart();
 
+        ObjectHelper.notNull(connection, "connection", this);
+
         if (resumeStrategy != null) {
             resumeStrategy.loadCache();
         }
@@ -305,13 +314,12 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     }
 
     private List<Shard> getShardList(final KinesisConnection kinesisConnection) {
-
         var request = ListShardsRequest
                 .builder()
                 .streamName(getEndpoint().getConfiguration().getStreamName())
                 .build();
 
-        List<Shard> shardList = null;
+        List<Shard> shardList;
         if (getEndpoint().getConfiguration().isAsyncClient()) {
             try {
                 shardList = kinesisConnection
@@ -330,6 +338,6 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         }
 
         return shardList;
-
     }
+
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index b97222fadd7..6ab8d5f7cc4 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -17,14 +17,11 @@
 package org.apache.camel.component.aws2.kinesis;
 
 import java.util.Objects;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisHealthCheck;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.support.ScheduledPollEndpoint;
@@ -58,7 +55,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
     protected void doStart() throws Exception {
         super.doStart();
 
-        var kinesisConnection = KinesisConnection.getInstance();
+        var kinesisConnection = getComponent().getConnection();
 
         if (!configuration.isCborEnabled()) {
             System.setProperty(CBOR_ENABLED.property(), "false");
@@ -96,26 +93,20 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return new Kinesis2Producer(this);
+        Kinesis2Producer producer = new Kinesis2Producer(this);
+        producer.setConnection(getComponent().getConnection());
+        return producer;
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
         final Kinesis2Consumer consumer = new Kinesis2Consumer(this, processor);
+        consumer.setConnection(getComponent().getConnection());
         consumer.setSchedulerProperties(getSchedulerProperties());
-        startHealthChecks();
         configureConsumer(consumer);
         return consumer;
     }
 
-    private void startHealthChecks() {
-        var timeoutCheckerExecutorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
-                CONNECTION_CHECKER_EXECUTOR_NAME);
-        timeoutCheckerExecutorService.scheduleAtFixedRate(new KinesisHealthCheck(this),
-                0, 5 * 1000,
-                TimeUnit.MILLISECONDS);
-    }
-
     @Override
     public Kinesis2Component getComponent() {
         return (Kinesis2Component) super.getComponent();
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
index bbe4dcd9ee8..1763dd3fe83 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
@@ -21,16 +21,27 @@ import java.nio.ByteBuffer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
 import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
 
 public class Kinesis2Producer extends DefaultProducer {
 
+    private KinesisConnection connection;
+
     public Kinesis2Producer(Kinesis2Endpoint endpoint) {
         super(endpoint);
     }
 
+    public KinesisConnection getConnection() {
+        return connection;
+    }
+
+    public void setConnection(KinesisConnection connection) {
+        this.connection = connection;
+    }
+
     @Override
     public Kinesis2Endpoint getEndpoint() {
         return (Kinesis2Endpoint) super.getEndpoint();
@@ -39,7 +50,7 @@ public class Kinesis2Producer extends DefaultProducer {
     @Override
     public void process(Exchange exchange) throws Exception {
         PutRecordRequest request = createRequest(exchange);
-        PutRecordResponse putRecordResult = getEndpoint().getClient().putRecord(request);
+        PutRecordResponse putRecordResult = connection.getClient(getEndpoint()).putRecord(request);
         Message message = getMessageForResponse(exchange);
         message.setHeader(Kinesis2Constants.SEQUENCE_NUMBER, putRecordResult.sequenceNumber());
         message.setHeader(Kinesis2Constants.SHARD_ID, putRecordResult.shardId());
@@ -63,4 +74,11 @@ public class Kinesis2Producer extends DefaultProducer {
     public static Message getMessageForResponse(final Exchange exchange) {
         return exchange.getMessage();
     }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        ObjectHelper.notNull(connection, "connection", this);
+    }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java
similarity index 70%
rename from components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java
rename to components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java
index 4ac3c3046b5..c5c23f1270f 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java
@@ -14,35 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.component.aws2.kinesis.consumer;
+package org.apache.camel.component.aws2.kinesis;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Objects;
 
-import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint;
 import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 
-public class KinesisConnection {
-    private static volatile KinesisConnection instance;
-    private KinesisClient kinesisClient = null;
-    private KinesisAsyncClient kinesisAsyncClient = null;
-
-    private KinesisConnection() {
-    }
+/**
+ * Holds connections to AWS from {@link KinesisClient} and {@link KinesisAsyncClient}.
+ */
+public class KinesisConnection implements Closeable {
 
-    public static synchronized KinesisConnection getInstance() {
-        if (instance == null) {
-            synchronized (KinesisConnection.class) {
-                if (instance == null) {
-                    instance = new KinesisConnection();
-                }
-            }
-        }
-        return instance;
-    }
+    private KinesisClient kinesisClient;
+    private KinesisAsyncClient kinesisAsyncClient;
 
-    public KinesisClient getClient(final Kinesis2Endpoint endpoint) {
+    public synchronized KinesisClient getClient(final Kinesis2Endpoint endpoint) {
         if (Objects.isNull(kinesisClient)) {
             kinesisClient = endpoint.getConfiguration().getAmazonKinesisClient() != null
                     ? endpoint.getConfiguration().getAmazonKinesisClient()
@@ -51,7 +41,7 @@ public class KinesisConnection {
         return kinesisClient;
     }
 
-    public KinesisAsyncClient getAsyncClient(final Kinesis2Endpoint endpoint) {
+    public synchronized KinesisAsyncClient getAsyncClient(final Kinesis2Endpoint endpoint) {
         if (Objects.isNull(kinesisAsyncClient)) {
             kinesisAsyncClient = KinesisClientFactory
                     .getKinesisAsyncClient(endpoint.getConfiguration())
@@ -67,4 +57,14 @@ public class KinesisConnection {
     public void setKinesisAsyncClient(final KinesisAsyncClient kinesisAsyncClient) {
         this.kinesisAsyncClient = kinesisAsyncClient;
     }
+
+    @Override
+    public void close() throws IOException {
+        if (kinesisClient != null) {
+            kinesisClient.close();
+        }
+        if (kinesisAsyncClient != null) {
+            kinesisAsyncClient.close();
+        }
+    }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java
deleted file mode 100644
index 5bf59880dc2..00000000000
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.consumer;
-
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint;
-import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
-
-public class KinesisHealthCheck implements Runnable {
-    private Kinesis2Endpoint endpoint;
-
-    public KinesisHealthCheck(Kinesis2Endpoint endpoint) {
-        this.endpoint = endpoint;
-    }
-
-    @Override
-    public void run() {
-        var kinesisConnection = KinesisConnection.getInstance();
-        if (this.endpoint.getConfiguration().isAsyncClient()) {
-            try {
-                if (Objects.isNull(kinesisConnection.getAsyncClient(this.endpoint)) ||
-                        kinesisConnection.getAsyncClient(this.endpoint)
-                                .listStreams(ListStreamsRequest
-                                        .builder()
-                                        .build())
-                                .get()
-                                .streamNames()
-                                .isEmpty()) {
-                    kinesisConnection.setKinesisAsyncClient(endpoint.getAsyncClient());
-                }
-            } catch (ExecutionException | InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        } else {
-            if (Objects.isNull(kinesisConnection.getClient(this.endpoint)) ||
-                    kinesisConnection.getClient(this.endpoint)
-                            .listStreams(ListStreamsRequest
-                                    .builder()
-                                    .build())
-                            .streamNames()
-                            .isEmpty()) {
-                kinesisConnection.setKinesisClient(endpoint.getClient());
-            }
-        }
-    }
-
-}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsIT.java
index 32795c37d8b..eed9ce5d389 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsIT.java
@@ -47,9 +47,9 @@ public class Kinesis2ConsumerHealthCheckProfileCredsIT extends CamelTestSupport
     protected CamelContext createCamelContext() throws Exception {
         context = super.createCamelContext();
         context.getPropertiesComponent().setLocation("ref:prop");
+
         Kinesis2Component component = new Kinesis2Component(context);
         component.getConfiguration().setAmazonKinesisClient(AWSSDKClientUtils.newKinesisClient());
-        component.init();
         context.addComponent("aws2-kinesis", component);
 
         HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
index 2d7690a78fd..495942d298e 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -16,12 +16,10 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -54,8 +52,6 @@ public class KinesisConsumerClosedShardWithFailTest {
     private KinesisClient kinesisClient;
     @Mock
     private AsyncProcessor processor;
-    @Mock
-    private KinesisConnection kinesisConnection;
 
     private final CamelContext context = new DefaultCamelContext();
     private final Kinesis2Component component = new Kinesis2Component(context);
@@ -64,26 +60,25 @@ public class KinesisConsumerClosedShardWithFailTest {
 
     @BeforeEach
     public void setup() {
+        component.start();
+
         Kinesis2Configuration configuration = new Kinesis2Configuration();
         configuration.setAmazonKinesisClient(kinesisClient);
         configuration.setIteratorType(ShardIteratorType.LATEST);
         configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.fail);
         configuration.setStreamName("streamName");
 
-        setMock(kinesisConnection);
-
-        Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component);
+        Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo", configuration, component);
         endpoint.start();
         underTest = new Kinesis2Consumer(endpoint, processor);
+        underTest.setConnection(component.getConnection());
+        underTest.start();
 
         SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
         Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
         ArrayList<Shard> shardList = new ArrayList<>();
         shardList.add(shard);
 
-        when(kinesisConnection
-                .getClient(any(Kinesis2Endpoint.class))).thenReturn(kinesisClient);
-
         when(kinesisClient
                 .getRecords(any(GetRecordsRequest.class)))
                 .thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator").build());
@@ -95,16 +90,6 @@ public class KinesisConsumerClosedShardWithFailTest {
                 .thenReturn(ListShardsResponse.builder().shards(shardList).build());
     }
 
-    private void setMock(KinesisConnection mock) {
-        try {
-            Field instance = KinesisConnection.class.getDeclaredField("instance");
-            instance.setAccessible(true);
-            instance.set(instance, mock);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     @Test
     public void itObtainsAShardIteratorOnFirstPoll() {
         assertThrows(IllegalStateException.class, () -> {
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 4605cd5b60f..d14e5ebb332 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -16,13 +16,11 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
-import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -62,8 +60,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
     private KinesisClient kinesisClient;
     @Mock
     private AsyncProcessor processor;
-    @Mock
-    private KinesisConnection kinesisConnection;
+
     private final CamelContext context = new DefaultCamelContext();
     private final Kinesis2Component component = new Kinesis2Component(context);
 
@@ -71,26 +68,25 @@ public class KinesisConsumerClosedShardWithSilentTest {
 
     @BeforeEach
     public void setup() {
+        component.start();
+
         Kinesis2Configuration configuration = new Kinesis2Configuration();
         configuration.setAmazonKinesisClient(kinesisClient);
         configuration.setIteratorType(ShardIteratorType.LATEST);
         configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.silent);
         configuration.setStreamName("streamName");
 
-        setMock(kinesisConnection);
-
         Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo", configuration, component);
         endpoint.start();
         underTest = new Kinesis2Consumer(endpoint, processor);
+        underTest.setConnection(component.getConnection());
+        underTest.start();
 
         SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
         Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
         ArrayList<Shard> shardList = new ArrayList<>();
         shardList.add(shard);
 
-        when(kinesisConnection
-                .getClient(any(Kinesis2Endpoint.class))).thenReturn(kinesisClient);
-
         when(kinesisClient
                 .getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder()
                         .nextShardIterator("shardIterator")
@@ -115,16 +111,6 @@ public class KinesisConsumerClosedShardWithSilentTest {
 
     }
 
-    private void setMock(KinesisConnection mock) {
-        try {
-            Field instance = KinesisConnection.class.getDeclaredField("instance");
-            instance.setAccessible(true);
-            instance.set(instance, mock);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     @Test
     public void itObtainsAShardIteratorOnFirstPoll() throws Exception {
         underTest.poll();
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
index 7cc59431eb8..754862b44c1 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
@@ -25,7 +25,6 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.Message;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -80,7 +79,6 @@ public class KinesisConsumerIT extends CamelTestSupport {
     @Override
     protected RouteBuilder createRouteBuilder() {
         client = AWSSDKClientUtils.newKinesisClient();
-        KinesisConnection.getInstance().setKinesisClient(client);
 
         context.getRegistry().bind("amazonKinesisClient", client);
 
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
index 20f508e13bd..26686b3cb33 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
@@ -25,7 +25,6 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.Message;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
@@ -133,7 +132,6 @@ public class KinesisConsumerResumeIT extends CamelTestSupport {
     @Override
     protected RouteBuilder createRouteBuilder() {
         client = AWSSDKClientUtils.newKinesisClient();
-        KinesisConnection.getInstance().setKinesisClient(client);
 
         context.getRegistry().bind("amazonKinesisClient", client);
 
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
index 7c5207ffd70..036f54f90e9 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
@@ -25,7 +25,6 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -120,8 +119,6 @@ public class KinesisProducerIT extends CamelTestSupport {
     protected RouteBuilder createRouteBuilder() {
         client = AWSSDKClientUtils.newKinesisClient();
 
-        KinesisConnection.getInstance().setKinesisClient(client);
-
         context.getRegistry().bind("amazonKinesisClient", client);
 
         return new RouteBuilder() {