You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/07/19 09:10:19 UTC

[camel] branch main updated: CAMEL-19584 : add KinesisAsyncClient to the camel component (#10697)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a6d9aad9e3 CAMEL-19584 : add KinesisAsyncClient to the camel component (#10697)
2a6d9aad9e3 is described below

commit 2a6d9aad9e3771826f922d1a26df42eb12e69221
Author: Hamed Hatami <ja...@gmail.com>
AuthorDate: Wed Jul 19 11:10:12 2023 +0200

    CAMEL-19584 : add KinesisAsyncClient to the camel component (#10697)
    
    * CAMEL-19584 : add KinesisAsyncClient to the camel component
    
    * ed
    
    ---------
    
    Co-authored-by: Hamed Hatami <ha...@postnord.com>
---
 components/camel-aws/camel-aws2-kinesis/pom.xml    |   8 +-
 .../aws2/kinesis/Kinesis2Configuration.java        |  10 ++
 .../component/aws2/kinesis/Kinesis2Consumer.java   |  53 ++++++++--
 .../component/aws2/kinesis/Kinesis2Endpoint.java   |  24 ++++-
 .../kinesis/client/KinesisAsyncInternalClient.java |  31 ++++++
 .../aws2/kinesis/client/KinesisClientFactory.java  |  23 ++++-
 .../impl/KinesisAsyncClientIAMOptimizedImpl.java   |  93 ++++++++++++++++++
 .../KinesisAsyncClientIAMProfileOptimizedImpl.java |  98 ++++++++++++++++++
 .../impl/KinesisAsyncClientStandardImpl.java       | 109 +++++++++++++++++++++
 .../aws2/kinesis/KinesisClientFactoryTest.java     |  18 +++-
 parent/pom.xml                                     |   1 +
 11 files changed, 447 insertions(+), 21 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/pom.xml b/components/camel-aws/camel-aws2-kinesis/pom.xml
index 034df11b5e3..07811f6045a 100644
--- a/components/camel-aws/camel-aws2-kinesis/pom.xml
+++ b/components/camel-aws/camel-aws2-kinesis/pom.xml
@@ -32,9 +32,6 @@
     <name>Camel :: AWS2 Kinesis</name>
     <description>A Camel Amazon Kinesis Web Service Component Version 2</description>
 
-    <properties>
-    </properties>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.camel</groupId>
@@ -45,6 +42,11 @@
             <artifactId>kinesis</artifactId>
             <version>${aws-java-sdk2-version}</version>
         </dependency>
+        <dependency>
+            <groupId>software.amazon.kinesis</groupId>
+            <artifactId>amazon-kinesis-client</artifactId>
+            <version>${amazon-kinesis-common-version}</version>
+        </dependency>
         <dependency>
             <groupId>software.amazon.awssdk</groupId>
             <artifactId>firehose</artifactId>
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
index 1252a229903..942532bbaec 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
@@ -67,6 +67,8 @@ public class Kinesis2Configuration implements Cloneable {
     private Integer proxyPort;
     @UriParam(defaultValue = "false", description = "If we want to trust all certificates in case of overriding the endpoint")
     private boolean trustAllCertificates;
+    @UriParam(label = "common", defaultValue = "false", description = "If we want to a KinesisAsyncClient instance set it to true")
+    private boolean asyncClient;
     @UriParam(label = "common", defaultValue = "true",
               description = "This option will set the CBOR_ENABLED property during the execution")
     private boolean cborEnabled = true;
@@ -249,6 +251,14 @@ public class Kinesis2Configuration implements Cloneable {
         this.profileCredentialsName = profileCredentialsName;
     }
 
+    public boolean isAsyncClient() {
+        return asyncClient;
+    }
+
+    public void setAsyncClient(boolean asyncClient) {
+        this.asyncClient = asyncClient;
+    }
+
     // *************************************************
     //
     // *************************************************
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index e3eae1edd2d..196032c9560 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.aws2.kinesis;
 import java.util.ArrayDeque;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -33,6 +34,7 @@ import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
 import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
@@ -76,7 +78,15 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
                         .getConfiguration()
                         .getMaxResultsPerRequest())
                 .build();
-        GetRecordsResponse result = getClient().getRecords(req);
+
+        GetRecordsResponse result = null;
+        if (getEndpoint().getConfiguration().isAsyncClient()) {
+            result = getAsyncClient()
+                    .getRecords(req)
+                    .get();
+        } else {
+            result = getClient().getRecords(req);
+        }
 
         Queue<Exchange> exchanges = createExchanges(result.records());
         int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
@@ -125,12 +135,16 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         return getEndpoint().getClient();
     }
 
+    private KinesisAsyncClient getAsyncClient() {
+        return getEndpoint().getAsyncClient();
+    }
+
     @Override
     public Kinesis2Endpoint getEndpoint() {
         return (Kinesis2Endpoint) super.getEndpoint();
     }
 
-    private String getShardIterator() {
+    private String getShardIterator() throws ExecutionException, InterruptedException {
         // either return a cached one or get a new one via a GetShardIterator
         // request.
         if (currentShardIterator == null) {
@@ -139,21 +153,36 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
             // If ShardId supplied use it, else choose first one
             if (!getEndpoint().getConfiguration().getShardId().isEmpty()) {
                 shardId = getEndpoint().getConfiguration().getShardId();
-                DescribeStreamRequest req1
+                DescribeStreamRequest request
                         = DescribeStreamRequest.builder().streamName(getEndpoint().getConfiguration().getStreamName()).build();
-                DescribeStreamResponse res1 = getClient().describeStream(req1);
-                for (Shard shard : res1.streamDescription().shards()) {
+
+                DescribeStreamResponse response = null;
+                if (getEndpoint().getConfiguration().isAsyncClient()) {
+                    response = getAsyncClient()
+                            .describeStream(request)
+                            .get();
+                } else {
+                    response = getClient().describeStream(request);
+                }
+                for (Shard shard : response.streamDescription().shards()) {
                     if (shard.shardId().equalsIgnoreCase(getEndpoint().getConfiguration().getShardId())) {
                         isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
                     }
                 }
 
             } else {
-                DescribeStreamRequest req1
+                DescribeStreamRequest request
                         = DescribeStreamRequest.builder().streamName(getEndpoint().getConfiguration().getStreamName()).build();
-                DescribeStreamResponse res1 = getClient().describeStream(req1);
 
-                List<Shard> shards = res1.streamDescription().shards();
+                DescribeStreamResponse response = null;
+                if (getEndpoint().getConfiguration().isAsyncClient()) {
+                    response = getAsyncClient()
+                            .describeStream(request)
+                            .get();
+                } else {
+                    response = getClient().describeStream(request);
+                }
+                List<Shard> shards = response.streamDescription().shards();
 
                 if (shards.isEmpty()) {
                     LOG.warn("There are no shards in the stream");
@@ -175,7 +204,13 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
 
             resume(req);
 
-            GetShardIteratorResponse result = getClient().getShardIterator(req.build());
+            GetShardIteratorResponse result = null;
+            if (getEndpoint().getConfiguration().isAsyncClient()) {
+                result = getAsyncClient().getShardIterator(req.build()).get();
+            } else {
+                result = getClient().getShardIterator(req.build());
+            }
+
             currentShardIterator = result.shardIterator();
         }
 
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index a8e52ac8d52..f2bcd9c9689 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
+import java.util.Objects;
+
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
@@ -25,6 +27,7 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.support.ScheduledPollEndpoint;
 import org.apache.camel.util.ObjectHelper;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
@@ -41,6 +44,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
     private Kinesis2Configuration configuration;
 
     private KinesisClient kinesisClient;
+    private KinesisAsyncClient kinesisAsyncClient;
 
     public Kinesis2Endpoint(String uri, Kinesis2Configuration configuration, Kinesis2Component component) {
         super(uri, component);
@@ -53,9 +57,17 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
         if (!configuration.isCborEnabled()) {
             System.setProperty(CBOR_ENABLED.property(), "false");
         }
-        kinesisClient = configuration.getAmazonKinesisClient() != null
-                ? configuration.getAmazonKinesisClient()
-                : KinesisClientFactory.getKinesisClient(configuration).getKinesisClient();
+
+        if (configuration.isAsyncClient() &&
+                Objects.isNull(configuration.getAmazonKinesisClient())) {
+            kinesisAsyncClient = KinesisClientFactory
+                    .getKinesisAsyncClient(configuration)
+                    .getKinesisAsyncClient();
+        } else {
+            kinesisClient = configuration.getAmazonKinesisClient() != null
+                    ? configuration.getAmazonKinesisClient()
+                    : KinesisClientFactory.getKinesisClient(configuration).getKinesisClient();
+        }
 
         if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
                 || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER))
@@ -70,6 +82,8 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
         if (ObjectHelper.isEmpty(configuration.getAmazonKinesisClient())) {
             if (kinesisClient != null) {
                 kinesisClient.close();
+            } else if (Objects.nonNull(kinesisAsyncClient)) {
+                kinesisAsyncClient.close();
             }
         }
         if (!configuration.isCborEnabled()) {
@@ -95,6 +109,10 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
         return kinesisClient;
     }
 
+    public KinesisAsyncClient getAsyncClient() {
+        return kinesisAsyncClient;
+    }
+
     public Kinesis2Configuration getConfiguration() {
         return configuration;
     }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisAsyncInternalClient.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisAsyncInternalClient.java
new file mode 100644
index 00000000000..963fe277fb8
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisAsyncInternalClient.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis.client;
+
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+/**
+ * Manage the required actions of a Kinesis Async client for either local or remote.
+ */
+public interface KinesisAsyncInternalClient {
+    /**
+     * Returns a Kinesis Async client.
+     *
+     * @return KinesisAsyncClient client
+     */
+    KinesisAsyncClient getKinesisAsyncClient();
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java
index c6b8abb072d..b2717725fbd 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java
@@ -17,6 +17,9 @@
 package org.apache.camel.component.aws2.kinesis.client;
 
 import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
+import org.apache.camel.component.aws2.kinesis.client.impl.KinesisAsyncClientIAMOptimizedImpl;
+import org.apache.camel.component.aws2.kinesis.client.impl.KinesisAsyncClientIAMProfileOptimizedImpl;
+import org.apache.camel.component.aws2.kinesis.client.impl.KinesisAsyncClientStandardImpl;
 import org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientIAMOptimizedImpl;
 import org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientIAMProfileOptimizedImpl;
 import org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientStandardImpl;
@@ -32,8 +35,8 @@ public final class KinesisClientFactory {
     /**
      * Return the correct aws Kinesis client (based on remote vs local).
      *
-     * @param  configuration configuration
-     * @return               KinesisClient
+     * @param configuration configuration
+     * @return KinesisClient
      */
     public static KinesisInternalClient getKinesisClient(Kinesis2Configuration configuration) {
         if (Boolean.TRUE.equals(configuration.isUseDefaultCredentialsProvider())) {
@@ -44,4 +47,20 @@ public final class KinesisClientFactory {
             return new KinesisClientStandardImpl(configuration);
         }
     }
+
+    /**
+     * Return the standard aws Kinesis Async client.
+     *
+     * @param configuration configuration
+     * @return KinesisAsyncClient
+     */
+    public static KinesisAsyncInternalClient getKinesisAsyncClient(Kinesis2Configuration configuration) {
+        if (Boolean.TRUE.equals(configuration.isUseDefaultCredentialsProvider())) {
+            return new KinesisAsyncClientIAMOptimizedImpl(configuration);
+        } else if (Boolean.TRUE.equals(configuration.isUseProfileCredentialsProvider())) {
+            return new KinesisAsyncClientIAMProfileOptimizedImpl(configuration);
+        } else {
+            return new KinesisAsyncClientStandardImpl(configuration);
+        }
+    }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMOptimizedImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMOptimizedImpl.java
new file mode 100644
index 00000000000..0c0865df776
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMOptimizedImpl.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis.client.impl;
+
+import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
+import org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.utils.AttributeMap;
+
+/**
+ * Manage an AWS Kinesis Async client for all users to use (enabling temporary creds). This implementation is for remote instances to manage the
+ * credentials on their own (eliminating credential rotations)
+ */
+public class KinesisAsyncClientIAMOptimizedImpl implements KinesisAsyncInternalClient {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisAsyncClientIAMOptimizedImpl.class);
+    private Kinesis2Configuration configuration;
+
+    /**
+     * Constructor that uses the config file.
+     */
+    public KinesisAsyncClientIAMOptimizedImpl(Kinesis2Configuration configuration) {
+        LOG.trace("Creating an AWS Kinesis Async client for an ec2 instance with IAM temporary credentials (normal for ec2s).");
+        this.configuration = configuration;
+    }
+
+    /**
+     * Getting the Kinesis Async client that is used.
+     *
+     * @return Amazon Kinesis Async Client.
+     */
+    @Override
+    public KinesisAsyncClient getKinesisAsyncClient() {
+        var clientBuilder = KinesisAsyncClient.builder();
+        SdkAsyncHttpClient.Builder httpClientBuilder = null;
+
+        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
+            var proxyConfig = ProxyConfiguration
+                    .builder()
+                    .scheme(configuration.getProxyProtocol().toString())
+                    .host(configuration.getProxyHost())
+                    .port(configuration.getProxyPort())
+                    .build();
+            httpClientBuilder = NettyNioAsyncHttpClient
+                    .builder()
+                    .proxyConfiguration(proxyConfig);
+            clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
+            clientBuilder = clientBuilder.region(Region.of(configuration.getRegion()));
+        }
+        if (configuration.isOverrideEndpoint()) {
+            clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
+        }
+        if (configuration.isTrustAllCertificates()) {
+            SdkAsyncHttpClient ahc = NettyNioAsyncHttpClient
+                    .builder()
+                    .buildWithDefaults(AttributeMap
+                            .builder()
+                            .put(
+                                    SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
+                                    Boolean.TRUE)
+                            .build());
+            clientBuilder.httpClient(ahc);
+        }
+        return clientBuilder.build();
+    }
+
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMProfileOptimizedImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMProfileOptimizedImpl.java
new file mode 100644
index 00000000000..72597961d33
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMProfileOptimizedImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis.client.impl;
+
+import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
+import org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.utils.AttributeMap;
+
+/**
+ * Manage an AWS Kinesis Async client for all users to use (enabling temporary creds). This implementation is for remote instances to manage
+ * the credentials on their own (eliminating credential rotations)
+ */
+public class KinesisAsyncClientIAMProfileOptimizedImpl implements KinesisAsyncInternalClient {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisAsyncClientIAMProfileOptimizedImpl.class);
+    private Kinesis2Configuration configuration;
+
+    /**
+     * Constructor that uses the config file.
+     */
+    public KinesisAsyncClientIAMProfileOptimizedImpl(Kinesis2Configuration configuration) {
+        LOG.trace("Creating an AWS Kinesis Async client for an ec2 instance with IAM temporary credentials (normal for ec2s).");
+        this.configuration = configuration;
+    }
+
+    /**
+     * Getting the KinesisAsync  client that is used.
+     *
+     * @return Amazon Kinesis Async Client.
+     */
+    @Override
+    public KinesisAsyncClient getKinesisAsyncClient() {
+        var clientBuilder = KinesisAsyncClient.builder();
+        SdkAsyncHttpClient.Builder httpClientBuilder = null;
+
+        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
+            var proxyConfig = ProxyConfiguration
+                    .builder()
+                    .scheme(configuration.getProxyProtocol().toString())
+                    .host(configuration.getProxyHost())
+                    .port(configuration.getProxyPort())
+                    .build();
+            httpClientBuilder = NettyNioAsyncHttpClient
+                    .builder()
+                    .proxyConfiguration(proxyConfig);
+            clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
+        }
+        if (configuration.getProfileCredentialsName() != null) {
+            clientBuilder = clientBuilder
+                    .credentialsProvider(ProfileCredentialsProvider.create(configuration.getProfileCredentialsName()));
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
+            clientBuilder = clientBuilder.region(Region.of(configuration.getRegion()));
+        }
+        if (configuration.isOverrideEndpoint()) {
+            clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
+        }
+        if (configuration.isTrustAllCertificates()) {
+            SdkAsyncHttpClient ahc = NettyNioAsyncHttpClient
+                    .builder()
+                    .buildWithDefaults(AttributeMap
+                            .builder()
+                            .put(
+                                    SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
+                                    Boolean.TRUE)
+                            .build());
+            clientBuilder.httpClient(ahc);
+        }
+        return clientBuilder.build();
+    }
+
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientStandardImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientStandardImpl.java
new file mode 100644
index 00000000000..57e14de3f53
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientStandardImpl.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis.client.impl;
+
+import java.net.URI;
+import java.util.Objects;
+
+import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
+import org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.utils.AttributeMap;
+
+/**
+ * Manage an AWS Async Kinesis client for all users to use. This implementation is for local instances to use a static
+ * and solid credential set.
+ */
+public class KinesisAsyncClientStandardImpl implements KinesisAsyncInternalClient {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisAsyncClientStandardImpl.class);
+    private Kinesis2Configuration configuration;
+
+    /**
+     * Constructor that uses the config file.
+     */
+    public KinesisAsyncClientStandardImpl(Kinesis2Configuration configuration) {
+        LOG.trace("Creating an AWS Async Kinesis manager using static credentials.");
+        this.configuration = configuration;
+    }
+
+    /**
+     * Getting the Kinesis Async client that is used.
+     *
+     * @return Amazon Kinesis Async Client.
+     */
+    @Override
+    public KinesisAsyncClient getKinesisAsyncClient() {
+        var clientBuilder = KinesisAsyncClient.builder();
+        var isClientConfigFound = false;
+        SdkAsyncHttpClient.Builder httpClientBuilder = null;
+
+        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
+            var proxyConfig = ProxyConfiguration
+                    .builder()
+                    .scheme(configuration.getProxyProtocol().toString())
+                    .host(configuration.getProxyHost())
+                    .port(configuration.getProxyPort())
+                    .build();
+            httpClientBuilder = NettyNioAsyncHttpClient
+                    .builder()
+                    .proxyConfiguration(proxyConfig);
+            isClientConfigFound = true;
+        }
+        if (Objects.nonNull(configuration.getAccessKey()) && Objects.nonNull(configuration.getSecretKey())) {
+            var cred = AwsBasicCredentials.create(configuration.getAccessKey(), configuration.getSecretKey());
+            if (isClientConfigFound) {
+                clientBuilder = clientBuilder
+                        .httpClientBuilder(httpClientBuilder)
+                        .credentialsProvider(StaticCredentialsProvider.create(cred));
+            } else {
+                clientBuilder = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred));
+            }
+        } else {
+            if (!isClientConfigFound) {
+                clientBuilder = clientBuilder.httpClientBuilder(null);
+            }
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
+            clientBuilder = clientBuilder.region(Region.of(configuration.getRegion()));
+        }
+        if (configuration.isOverrideEndpoint()) {
+            clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
+        }
+        if (configuration.isTrustAllCertificates()) {
+            SdkAsyncHttpClient ahc = NettyNioAsyncHttpClient
+                    .builder()
+                    .buildWithDefaults(AttributeMap
+                            .builder()
+                            .put(
+                                    SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
+                                    Boolean.TRUE)
+                            .build());
+            clientBuilder.httpClient(ahc);
+        }
+        return clientBuilder.build();
+    }
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java
index 0e8238bc9c8..2a192641ba7 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java
@@ -16,25 +16,27 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
+import org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient;
 import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory;
 import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient;
+import org.apache.camel.component.aws2.kinesis.client.impl.KinesisAsyncClientStandardImpl;
 import org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientIAMOptimizedImpl;
 import org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientStandardImpl;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class KinesisClientFactoryTest {
+class KinesisClientFactoryTest {
 
     @Test
-    public void getStandardKinesisClientDefault() {
+    void getStandardKinesisClientDefault() {
         Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration();
         KinesisInternalClient kinesisClient = KinesisClientFactory.getKinesisClient(kinesis2Configuration);
         assertTrue(kinesisClient instanceof KinesisClientStandardImpl);
     }
 
     @Test
-    public void getStandardKinesisClient() {
+    void getStandardKinesisClient() {
         Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration();
         kinesis2Configuration.setUseDefaultCredentialsProvider(false);
         KinesisInternalClient kinesisClient = KinesisClientFactory.getKinesisClient(kinesis2Configuration);
@@ -42,10 +44,18 @@ public class KinesisClientFactoryTest {
     }
 
     @Test
-    public void getIAMOptimizedKinesisClient() {
+    void getIAMOptimizedKinesisClient() {
         Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration();
         kinesis2Configuration.setUseDefaultCredentialsProvider(true);
         KinesisInternalClient kinesisClient = KinesisClientFactory.getKinesisClient(kinesis2Configuration);
         assertTrue(kinesisClient instanceof KinesisClientIAMOptimizedImpl);
     }
+
+    @Test
+    void getStandardKinesisAsyncClient() {
+        Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration();
+        kinesis2Configuration.setAsyncClient(true);
+        KinesisAsyncInternalClient kinesisClient = KinesisClientFactory.getKinesisAsyncClient(kinesis2Configuration);
+        assertTrue(kinesisClient instanceof KinesisAsyncClientStandardImpl);
+    }
 }
diff --git a/parent/pom.xml b/parent/pom.xml
index 526fbb7e785..e1fa0833961 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -33,6 +33,7 @@
     <description>Camel Parent POM</description>
 
     <properties>
+        <amazon-kinesis-common-version>2.5.1</amazon-kinesis-common-version>
         <maven.build.timestamp.format>yyyy-MM-dd</maven.build.timestamp.format>
         <camel.surefire.fork.additional-vmargs></camel.surefire.fork.additional-vmargs><!-- Empty by default -->
         <camel.surefire.fork.vmargs>-XX:+ExitOnOutOfMemoryError -XX:+HeapDumpOnOutOfMemoryError ${camel.surefire.fork.additional-vmargs}</camel.surefire.fork.vmargs>