You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/07/19 09:10:19 UTC
[camel] branch main updated: CAMEL-19584 : add KinesisAsyncClient to the camel component (#10697)
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 2a6d9aad9e3 CAMEL-19584 : add KinesisAsyncClient to the camel component (#10697)
2a6d9aad9e3 is described below
commit 2a6d9aad9e3771826f922d1a26df42eb12e69221
Author: Hamed Hatami <ja...@gmail.com>
AuthorDate: Wed Jul 19 11:10:12 2023 +0200
CAMEL-19584 : add KinesisAsyncClient to the camel component (#10697)
* CAMEL-19584 : add KinesisAsyncClient to the camel component
* ed
---------
Co-authored-by: Hamed Hatami <ha...@postnord.com>
---
components/camel-aws/camel-aws2-kinesis/pom.xml | 8 +-
.../aws2/kinesis/Kinesis2Configuration.java | 10 ++
.../component/aws2/kinesis/Kinesis2Consumer.java | 53 ++++++++--
.../component/aws2/kinesis/Kinesis2Endpoint.java | 24 ++++-
.../kinesis/client/KinesisAsyncInternalClient.java | 31 ++++++
.../aws2/kinesis/client/KinesisClientFactory.java | 23 ++++-
.../impl/KinesisAsyncClientIAMOptimizedImpl.java | 93 ++++++++++++++++++
.../KinesisAsyncClientIAMProfileOptimizedImpl.java | 98 ++++++++++++++++++
.../impl/KinesisAsyncClientStandardImpl.java | 109 +++++++++++++++++++++
.../aws2/kinesis/KinesisClientFactoryTest.java | 18 +++-
parent/pom.xml | 1 +
11 files changed, 447 insertions(+), 21 deletions(-)
diff --git a/components/camel-aws/camel-aws2-kinesis/pom.xml b/components/camel-aws/camel-aws2-kinesis/pom.xml
index 034df11b5e3..07811f6045a 100644
--- a/components/camel-aws/camel-aws2-kinesis/pom.xml
+++ b/components/camel-aws/camel-aws2-kinesis/pom.xml
@@ -32,9 +32,6 @@
<name>Camel :: AWS2 Kinesis</name>
<description>A Camel Amazon Kinesis Web Service Component Version 2</description>
- <properties>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
@@ -45,6 +42,11 @@
<artifactId>kinesis</artifactId>
<version>${aws-java-sdk2-version}</version>
</dependency>
+ <dependency>
+ <groupId>software.amazon.kinesis</groupId>
+ <artifactId>amazon-kinesis-client</artifactId>
+ <version>${amazon-kinesis-common-version}</version>
+ </dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>firehose</artifactId>
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
index 1252a229903..942532bbaec 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
@@ -67,6 +67,8 @@ public class Kinesis2Configuration implements Cloneable {
private Integer proxyPort;
@UriParam(defaultValue = "false", description = "If we want to trust all certificates in case of overriding the endpoint")
private boolean trustAllCertificates;
+ @UriParam(label = "common", defaultValue = "false", description = "If we want to a KinesisAsyncClient instance set it to true")
+ private boolean asyncClient;
@UriParam(label = "common", defaultValue = "true",
description = "This option will set the CBOR_ENABLED property during the execution")
private boolean cborEnabled = true;
@@ -249,6 +251,14 @@ public class Kinesis2Configuration implements Cloneable {
this.profileCredentialsName = profileCredentialsName;
}
+ public boolean isAsyncClient() {
+ return asyncClient;
+ }
+
+ public void setAsyncClient(boolean asyncClient) {
+ this.asyncClient = asyncClient;
+ }
+
// *************************************************
//
// *************************************************
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index e3eae1edd2d..196032c9560 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.aws2.kinesis;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.ExecutionException;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
@@ -33,6 +34,7 @@ import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
@@ -76,7 +78,15 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
.getConfiguration()
.getMaxResultsPerRequest())
.build();
- GetRecordsResponse result = getClient().getRecords(req);
+
+ GetRecordsResponse result = null;
+ if (getEndpoint().getConfiguration().isAsyncClient()) {
+ result = getAsyncClient()
+ .getRecords(req)
+ .get();
+ } else {
+ result = getClient().getRecords(req);
+ }
Queue<Exchange> exchanges = createExchanges(result.records());
int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
@@ -125,12 +135,16 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
return getEndpoint().getClient();
}
+ private KinesisAsyncClient getAsyncClient() {
+ return getEndpoint().getAsyncClient();
+ }
+
@Override
public Kinesis2Endpoint getEndpoint() {
return (Kinesis2Endpoint) super.getEndpoint();
}
- private String getShardIterator() {
+ private String getShardIterator() throws ExecutionException, InterruptedException {
// either return a cached one or get a new one via a GetShardIterator
// request.
if (currentShardIterator == null) {
@@ -139,21 +153,36 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
// If ShardId supplied use it, else choose first one
if (!getEndpoint().getConfiguration().getShardId().isEmpty()) {
shardId = getEndpoint().getConfiguration().getShardId();
- DescribeStreamRequest req1
+ DescribeStreamRequest request
= DescribeStreamRequest.builder().streamName(getEndpoint().getConfiguration().getStreamName()).build();
- DescribeStreamResponse res1 = getClient().describeStream(req1);
- for (Shard shard : res1.streamDescription().shards()) {
+
+ DescribeStreamResponse response = null;
+ if (getEndpoint().getConfiguration().isAsyncClient()) {
+ response = getAsyncClient()
+ .describeStream(request)
+ .get();
+ } else {
+ response = getClient().describeStream(request);
+ }
+ for (Shard shard : response.streamDescription().shards()) {
if (shard.shardId().equalsIgnoreCase(getEndpoint().getConfiguration().getShardId())) {
isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
}
}
} else {
- DescribeStreamRequest req1
+ DescribeStreamRequest request
= DescribeStreamRequest.builder().streamName(getEndpoint().getConfiguration().getStreamName()).build();
- DescribeStreamResponse res1 = getClient().describeStream(req1);
- List<Shard> shards = res1.streamDescription().shards();
+ DescribeStreamResponse response = null;
+ if (getEndpoint().getConfiguration().isAsyncClient()) {
+ response = getAsyncClient()
+ .describeStream(request)
+ .get();
+ } else {
+ response = getClient().describeStream(request);
+ }
+ List<Shard> shards = response.streamDescription().shards();
if (shards.isEmpty()) {
LOG.warn("There are no shards in the stream");
@@ -175,7 +204,13 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
resume(req);
- GetShardIteratorResponse result = getClient().getShardIterator(req.build());
+ GetShardIteratorResponse result = null;
+ if (getEndpoint().getConfiguration().isAsyncClient()) {
+ result = getAsyncClient().getShardIterator(req.build()).get();
+ } else {
+ result = getClient().getShardIterator(req.build());
+ }
+
currentShardIterator = result.shardIterator();
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index a8e52ac8d52..f2bcd9c9689 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.aws2.kinesis;
+import java.util.Objects;
+
import org.apache.camel.Category;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
@@ -25,6 +27,7 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.support.ScheduledPollEndpoint;
import org.apache.camel.util.ObjectHelper;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
@@ -41,6 +44,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
private Kinesis2Configuration configuration;
private KinesisClient kinesisClient;
+ private KinesisAsyncClient kinesisAsyncClient;
public Kinesis2Endpoint(String uri, Kinesis2Configuration configuration, Kinesis2Component component) {
super(uri, component);
@@ -53,9 +57,17 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
if (!configuration.isCborEnabled()) {
System.setProperty(CBOR_ENABLED.property(), "false");
}
- kinesisClient = configuration.getAmazonKinesisClient() != null
- ? configuration.getAmazonKinesisClient()
- : KinesisClientFactory.getKinesisClient(configuration).getKinesisClient();
+
+ if (configuration.isAsyncClient() &&
+ Objects.isNull(configuration.getAmazonKinesisClient())) {
+ kinesisAsyncClient = KinesisClientFactory
+ .getKinesisAsyncClient(configuration)
+ .getKinesisAsyncClient();
+ } else {
+ kinesisClient = configuration.getAmazonKinesisClient() != null
+ ? configuration.getAmazonKinesisClient()
+ : KinesisClientFactory.getKinesisClient(configuration).getKinesisClient();
+ }
if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
|| configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER))
@@ -70,6 +82,8 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
if (ObjectHelper.isEmpty(configuration.getAmazonKinesisClient())) {
if (kinesisClient != null) {
kinesisClient.close();
+ } else if (Objects.nonNull(kinesisAsyncClient)) {
+ kinesisAsyncClient.close();
}
}
if (!configuration.isCborEnabled()) {
@@ -95,6 +109,10 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
return kinesisClient;
}
+ public KinesisAsyncClient getAsyncClient() {
+ return kinesisAsyncClient;
+ }
+
public Kinesis2Configuration getConfiguration() {
return configuration;
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisAsyncInternalClient.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisAsyncInternalClient.java
new file mode 100644
index 00000000000..963fe277fb8
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisAsyncInternalClient.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis.client;
+
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+/**
+ * Manage the required actions of a Kinesis Async client for either local or remote.
+ */
+public interface KinesisAsyncInternalClient {
+ /**
+ * Returns a Kinesis Async client.
+ *
+ * @return KinesisAsyncClient client
+ */
+ KinesisAsyncClient getKinesisAsyncClient();
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java
index c6b8abb072d..b2717725fbd 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/KinesisClientFactory.java
@@ -17,6 +17,9 @@
package org.apache.camel.component.aws2.kinesis.client;
import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
+import org.apache.camel.component.aws2.kinesis.client.impl.KinesisAsyncClientIAMOptimizedImpl;
+import org.apache.camel.component.aws2.kinesis.client.impl.KinesisAsyncClientIAMProfileOptimizedImpl;
+import org.apache.camel.component.aws2.kinesis.client.impl.KinesisAsyncClientStandardImpl;
import org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientIAMOptimizedImpl;
import org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientIAMProfileOptimizedImpl;
import org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientStandardImpl;
@@ -32,8 +35,8 @@ public final class KinesisClientFactory {
/**
* Return the correct aws Kinesis client (based on remote vs local).
*
- * @param configuration configuration
- * @return KinesisClient
+ * @param configuration configuration
+ * @return KinesisClient
*/
public static KinesisInternalClient getKinesisClient(Kinesis2Configuration configuration) {
if (Boolean.TRUE.equals(configuration.isUseDefaultCredentialsProvider())) {
@@ -44,4 +47,20 @@ public final class KinesisClientFactory {
return new KinesisClientStandardImpl(configuration);
}
}
+
+ /**
+ * Return the standard aws Kinesis Async client.
+ *
+ * @param configuration configuration
+ * @return KinesisAsyncClient
+ */
+ public static KinesisAsyncInternalClient getKinesisAsyncClient(Kinesis2Configuration configuration) {
+ if (Boolean.TRUE.equals(configuration.isUseDefaultCredentialsProvider())) {
+ return new KinesisAsyncClientIAMOptimizedImpl(configuration);
+ } else if (Boolean.TRUE.equals(configuration.isUseProfileCredentialsProvider())) {
+ return new KinesisAsyncClientIAMProfileOptimizedImpl(configuration);
+ } else {
+ return new KinesisAsyncClientStandardImpl(configuration);
+ }
+ }
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMOptimizedImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMOptimizedImpl.java
new file mode 100644
index 00000000000..0c0865df776
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMOptimizedImpl.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis.client.impl;
+
+import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
+import org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.utils.AttributeMap;
+
+/**
+ * Manage an AWS Kinesis Async client for all users to use (enabling temporary creds). This implementation is for remote instances to manage the
+ * credentials on their own (eliminating credential rotations)
+ */
+public class KinesisAsyncClientIAMOptimizedImpl implements KinesisAsyncInternalClient {
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisAsyncClientIAMOptimizedImpl.class);
+ private Kinesis2Configuration configuration;
+
+ /**
+ * Constructor that uses the config file.
+ */
+ public KinesisAsyncClientIAMOptimizedImpl(Kinesis2Configuration configuration) {
+ LOG.trace("Creating an AWS Kinesis Async client for an ec2 instance with IAM temporary credentials (normal for ec2s).");
+ this.configuration = configuration;
+ }
+
+ /**
+ * Getting the Kinesis Async client that is used.
+ *
+ * @return Amazon Kinesis Async Client.
+ */
+ @Override
+ public KinesisAsyncClient getKinesisAsyncClient() {
+ var clientBuilder = KinesisAsyncClient.builder();
+ SdkAsyncHttpClient.Builder httpClientBuilder = null;
+
+ if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
+ var proxyConfig = ProxyConfiguration
+ .builder()
+ .scheme(configuration.getProxyProtocol().toString())
+ .host(configuration.getProxyHost())
+ .port(configuration.getProxyPort())
+ .build();
+ httpClientBuilder = NettyNioAsyncHttpClient
+ .builder()
+ .proxyConfiguration(proxyConfig);
+ clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
+ clientBuilder = clientBuilder.region(Region.of(configuration.getRegion()));
+ }
+ if (configuration.isOverrideEndpoint()) {
+ clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
+ }
+ if (configuration.isTrustAllCertificates()) {
+ SdkAsyncHttpClient ahc = NettyNioAsyncHttpClient
+ .builder()
+ .buildWithDefaults(AttributeMap
+ .builder()
+ .put(
+ SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
+ Boolean.TRUE)
+ .build());
+ clientBuilder.httpClient(ahc);
+ }
+ return clientBuilder.build();
+ }
+
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMProfileOptimizedImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMProfileOptimizedImpl.java
new file mode 100644
index 00000000000..72597961d33
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientIAMProfileOptimizedImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis.client.impl;
+
+import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
+import org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.utils.AttributeMap;
+
+/**
+ * Manage an AWS Kinesis Async client for all users to use (enabling temporary creds). This implementation is for remote instances to manage
+ * the credentials on their own (eliminating credential rotations)
+ */
+public class KinesisAsyncClientIAMProfileOptimizedImpl implements KinesisAsyncInternalClient {
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisAsyncClientIAMProfileOptimizedImpl.class);
+ private Kinesis2Configuration configuration;
+
+ /**
+ * Constructor that uses the config file.
+ */
+ public KinesisAsyncClientIAMProfileOptimizedImpl(Kinesis2Configuration configuration) {
+ LOG.trace("Creating an AWS Kinesis Async client for an ec2 instance with IAM temporary credentials (normal for ec2s).");
+ this.configuration = configuration;
+ }
+
+ /**
+ * Getting the KinesisAsync client that is used.
+ *
+ * @return Amazon Kinesis Async Client.
+ */
+ @Override
+ public KinesisAsyncClient getKinesisAsyncClient() {
+ var clientBuilder = KinesisAsyncClient.builder();
+ SdkAsyncHttpClient.Builder httpClientBuilder = null;
+
+ if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
+ var proxyConfig = ProxyConfiguration
+ .builder()
+ .scheme(configuration.getProxyProtocol().toString())
+ .host(configuration.getProxyHost())
+ .port(configuration.getProxyPort())
+ .build();
+ httpClientBuilder = NettyNioAsyncHttpClient
+ .builder()
+ .proxyConfiguration(proxyConfig);
+ clientBuilder = clientBuilder.httpClientBuilder(httpClientBuilder);
+ }
+ if (configuration.getProfileCredentialsName() != null) {
+ clientBuilder = clientBuilder
+ .credentialsProvider(ProfileCredentialsProvider.create(configuration.getProfileCredentialsName()));
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
+ clientBuilder = clientBuilder.region(Region.of(configuration.getRegion()));
+ }
+ if (configuration.isOverrideEndpoint()) {
+ clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
+ }
+ if (configuration.isTrustAllCertificates()) {
+ SdkAsyncHttpClient ahc = NettyNioAsyncHttpClient
+ .builder()
+ .buildWithDefaults(AttributeMap
+ .builder()
+ .put(
+ SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
+ Boolean.TRUE)
+ .build());
+ clientBuilder.httpClient(ahc);
+ }
+ return clientBuilder.build();
+ }
+
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientStandardImpl.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientStandardImpl.java
new file mode 100644
index 00000000000..57e14de3f53
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/client/impl/KinesisAsyncClientStandardImpl.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis.client.impl;
+
+import java.net.URI;
+import java.util.Objects;
+
+import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
+import org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.utils.AttributeMap;
+
+/**
+ * Manage an AWS Async Kinesis client for all users to use. This implementation is for local instances to use a static
+ * and solid credential set.
+ */
+public class KinesisAsyncClientStandardImpl implements KinesisAsyncInternalClient {
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisAsyncClientStandardImpl.class);
+ private Kinesis2Configuration configuration;
+
+ /**
+ * Constructor that uses the config file.
+ */
+ public KinesisAsyncClientStandardImpl(Kinesis2Configuration configuration) {
+ LOG.trace("Creating an AWS Async Kinesis manager using static credentials.");
+ this.configuration = configuration;
+ }
+
+ /**
+ * Getting the Kinesis Async client that is used.
+ *
+ * @return Amazon Kinesis Async Client.
+ */
+ @Override
+ public KinesisAsyncClient getKinesisAsyncClient() {
+ var clientBuilder = KinesisAsyncClient.builder();
+ var isClientConfigFound = false;
+ SdkAsyncHttpClient.Builder httpClientBuilder = null;
+
+ if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
+ var proxyConfig = ProxyConfiguration
+ .builder()
+ .scheme(configuration.getProxyProtocol().toString())
+ .host(configuration.getProxyHost())
+ .port(configuration.getProxyPort())
+ .build();
+ httpClientBuilder = NettyNioAsyncHttpClient
+ .builder()
+ .proxyConfiguration(proxyConfig);
+ isClientConfigFound = true;
+ }
+ if (Objects.nonNull(configuration.getAccessKey()) && Objects.nonNull(configuration.getSecretKey())) {
+ var cred = AwsBasicCredentials.create(configuration.getAccessKey(), configuration.getSecretKey());
+ if (isClientConfigFound) {
+ clientBuilder = clientBuilder
+ .httpClientBuilder(httpClientBuilder)
+ .credentialsProvider(StaticCredentialsProvider.create(cred));
+ } else {
+ clientBuilder = clientBuilder.credentialsProvider(StaticCredentialsProvider.create(cred));
+ }
+ } else {
+ if (!isClientConfigFound) {
+ clientBuilder = clientBuilder.httpClientBuilder(null);
+ }
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
+ clientBuilder = clientBuilder.region(Region.of(configuration.getRegion()));
+ }
+ if (configuration.isOverrideEndpoint()) {
+ clientBuilder.endpointOverride(URI.create(configuration.getUriEndpointOverride()));
+ }
+ if (configuration.isTrustAllCertificates()) {
+ SdkAsyncHttpClient ahc = NettyNioAsyncHttpClient
+ .builder()
+ .buildWithDefaults(AttributeMap
+ .builder()
+ .put(
+ SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
+ Boolean.TRUE)
+ .build());
+ clientBuilder.httpClient(ahc);
+ }
+ return clientBuilder.build();
+ }
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java
index 0e8238bc9c8..2a192641ba7 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisClientFactoryTest.java
@@ -16,25 +16,27 @@
*/
package org.apache.camel.component.aws2.kinesis;
+import org.apache.camel.component.aws2.kinesis.client.KinesisAsyncInternalClient;
import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory;
import org.apache.camel.component.aws2.kinesis.client.KinesisInternalClient;
+import org.apache.camel.component.aws2.kinesis.client.impl.KinesisAsyncClientStandardImpl;
import org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientIAMOptimizedImpl;
import org.apache.camel.component.aws2.kinesis.client.impl.KinesisClientStandardImpl;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class KinesisClientFactoryTest {
+class KinesisClientFactoryTest {
@Test
- public void getStandardKinesisClientDefault() {
+ void getStandardKinesisClientDefault() {
Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration();
KinesisInternalClient kinesisClient = KinesisClientFactory.getKinesisClient(kinesis2Configuration);
assertTrue(kinesisClient instanceof KinesisClientStandardImpl);
}
@Test
- public void getStandardKinesisClient() {
+ void getStandardKinesisClient() {
Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration();
kinesis2Configuration.setUseDefaultCredentialsProvider(false);
KinesisInternalClient kinesisClient = KinesisClientFactory.getKinesisClient(kinesis2Configuration);
@@ -42,10 +44,18 @@ public class KinesisClientFactoryTest {
}
@Test
- public void getIAMOptimizedKinesisClient() {
+ void getIAMOptimizedKinesisClient() {
Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration();
kinesis2Configuration.setUseDefaultCredentialsProvider(true);
KinesisInternalClient kinesisClient = KinesisClientFactory.getKinesisClient(kinesis2Configuration);
assertTrue(kinesisClient instanceof KinesisClientIAMOptimizedImpl);
}
+
+ @Test
+ void getStandardKinesisAsyncClient() {
+ Kinesis2Configuration kinesis2Configuration = new Kinesis2Configuration();
+ kinesis2Configuration.setAsyncClient(true);
+ KinesisAsyncInternalClient kinesisClient = KinesisClientFactory.getKinesisAsyncClient(kinesis2Configuration);
+ assertTrue(kinesisClient instanceof KinesisAsyncClientStandardImpl);
+ }
}
diff --git a/parent/pom.xml b/parent/pom.xml
index 526fbb7e785..e1fa0833961 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -33,6 +33,7 @@
<description>Camel Parent POM</description>
<properties>
+ <amazon-kinesis-common-version>2.5.1</amazon-kinesis-common-version>
<maven.build.timestamp.format>yyyy-MM-dd</maven.build.timestamp.format>
<camel.surefire.fork.additional-vmargs></camel.surefire.fork.additional-vmargs><!-- Empty by default -->
<camel.surefire.fork.vmargs>-XX:+ExitOnOutOfMemoryError -XX:+HeapDumpOnOutOfMemoryError ${camel.surefire.fork.additional-vmargs}</camel.surefire.fork.vmargs>