You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/03 13:17:04 UTC
[flink] 01/06: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency
and KinesisProxyV2
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2967414a14d37a10d354ae00e3e84e035e3a7e4e
Author: Danny Cranmer <cr...@amazon.com>
AuthorDate: Tue Jul 21 09:40:22 2020 +0100
[FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2
---
flink-connectors/flink-connector-kinesis/pom.xml | 42 +++
.../connectors/kinesis/proxy/KinesisProxyV2.java | 60 +++
.../kinesis/proxy/KinesisProxyV2Interface.java | 28 ++
.../streaming/connectors/kinesis/util/AWSUtil.java | 53 ++-
.../connectors/kinesis/util/AwsV2Util.java | 237 ++++++++++++
.../connectors/kinesis/util/AWSUtilTest.java | 122 +++++-
.../connectors/kinesis/util/AwsV2UtilTest.java | 407 +++++++++++++++++++++
.../src/test/resources/profile | 7 +
8 files changed, 938 insertions(+), 18 deletions(-)
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 8e0bf7c..8b43028 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -34,6 +34,7 @@ under the License.
<name>Flink : Connectors : Kinesis</name>
<properties>
<aws.sdk.version>1.11.754</aws.sdk.version>
+ <aws.sdkv2.version>2.13.52</aws.sdkv2.version>
<aws.kinesis-kcl.version>1.11.2</aws.kinesis-kcl.version>
<aws.kinesis-kpl.version>0.14.0</aws.kinesis-kpl.version>
<aws.dynamodbstreams-kinesis-adapter.version>1.5.0</aws.dynamodbstreams-kinesis-adapter.version>
@@ -161,6 +162,25 @@ under the License.
<scope>provided</scope>
</dependency>
+ <!-- Amazon AWS SDK v2.x dependencies -->
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>kinesis</artifactId>
+ <version>${aws.sdkv2.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>netty-nio-client</artifactId>
+ <version>${aws.sdkv2.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sts</artifactId>
+ <version>${aws.sdkv2.version}</version>
+ </dependency>
+
</dependencies>
<dependencyManagement>
@@ -211,6 +231,12 @@ under the License.
<include>com.amazonaws:*</include>
<include>com.google.protobuf:*</include>
<include>org.apache.httpcomponents:*</include>
+ <include>software.amazon.awssdk:*</include>
+ <include>software.amazon.eventstream:*</include>
+ <include>software.amazon.ion:*</include>
+ <include>org.reactivestreams:*</include>
+ <include>io.netty:*</include>
+ <include>com.typesafe.netty:*</include>
</includes>
</artifactSet>
<relocations combine.children="override">
@@ -228,6 +254,22 @@ under the License.
<pattern>org.apache.http</pattern>
<shadedPattern>org.apache.flink.kinesis.shaded.org.apache.http</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>software.amazon</pattern>
+ <shadedPattern>org.apache.flink.kinesis.shaded.software.amazon</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>org.apache.flink.kinesis.shaded.io.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.typesafe.netty</pattern>
+ <shadedPattern>org.apache.flink.kinesis.shaded.com.typesafe.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.reactivestreams</pattern>
+ <shadedPattern>org.apache.flink.kinesis.shaded.org.reactivestreams</shadedPattern>
+ </relocation>
</relocations>
<filters>
<filter>
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
new file mode 100644
index 0000000..8c7fb52
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+import java.util.Properties;
+
+/**
+ * Kinesis proxy implementation using AWS SDK v2.x - a utility class that is used as a proxy to make
+ * calls to AWS Kinesis for several EFO (Enhanced Fan Out) functions, such as de-/registering stream consumers,
+ * subscribing to a shard and receiving records from a shard.
+ */
+@Internal
+public class KinesisProxyV2 implements KinesisProxyV2Interface {
+
+ private final KinesisAsyncClient kinesisAsyncClient;
+
+ /**
+ * Create a new KinesisProxyV2 based on the supplied configuration properties.
+ *
+ * @param configProps configuration properties containing AWS credential and AWS region info
+ */
+ public KinesisProxyV2(final Properties configProps) {
+ this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+ }
+
+ /**
+ * Create the Kinesis client, using the provided configuration properties.
+ * Derived classes can override this method to customize the client configuration.
+ *
+ * @param configProps the properties map used to create the Kinesis Client
+ * @return a Kinesis Client
+ */
+ protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+ final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+ return AwsV2Util.createKinesisAsyncClient(configProps, config);
+ }
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java
new file mode 100644
index 0000000..aff6a85
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for a Kinesis proxy using AWS SDK v2.x operating on multiple Kinesis streams within the same AWS service region.
+ */
+@Internal
+public interface KinesisProxyV2Interface {
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 6652529..7301e7a 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -82,9 +82,7 @@ public class AWSUtil {
*/
public static AmazonKinesis createKinesisClient(Properties configProps, ClientConfiguration awsClientConfig) {
// set a Flink-specific user agent
- awsClientConfig.setUserAgentPrefix(String.format(USER_AGENT_FORMAT,
- EnvironmentInformation.getVersion(),
- EnvironmentInformation.getRevisionInformation().commitId));
+ awsClientConfig.setUserAgentPrefix(formatFlinkUserAgentPrefix());
// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
@@ -104,6 +102,19 @@ public class AWSUtil {
}
/**
+ * Creates a user agent prefix for Flink.
+ * This can be used by HTTP Clients.
+ *
+ * @return a user agent prefix for Flink
+ */
+ public static String formatFlinkUserAgentPrefix() {
+ return String.format(
+ USER_AGENT_FORMAT,
+ EnvironmentInformation.getVersion(),
+ EnvironmentInformation.getRevisionInformation().commitId);
+ }
+
+ /**
* Return a {@link AWSCredentialsProvider} instance corresponding to the configuration properties.
*
* @param configProps the configuration properties
@@ -114,29 +125,37 @@ public class AWSUtil {
}
/**
- * If the provider is ASSUME_ROLE, then the credentials for assuming this role are determined
- * recursively.
+ * Determines and returns the credential provider type from the given properties.
*
- * @param configProps the configuration properties
- * @param configPrefix the prefix of the config properties for this credentials provider,
- * e.g. aws.credentials.provider for the base credentials provider,
- * aws.credentials.provider.role.provider for the credentials provider
- * for assuming a role, and so on.
+ * @return the credential provider type
*/
- private static AWSCredentialsProvider getCredentialsProvider(final Properties configProps, final String configPrefix) {
- CredentialProvider credentialProviderType;
+ static CredentialProvider getCredentialProviderType(final Properties configProps, final String configPrefix) {
if (!configProps.containsKey(configPrefix)) {
if (configProps.containsKey(AWSConfigConstants.accessKeyId(configPrefix))
&& configProps.containsKey(AWSConfigConstants.secretKey(configPrefix))) {
// if the credential provider type is not specified, but the Access Key ID and Secret Key are given, it will default to BASIC
- credentialProviderType = CredentialProvider.BASIC;
+ return CredentialProvider.BASIC;
} else {
// if the credential provider type is not specified, it will default to AUTO
- credentialProviderType = CredentialProvider.AUTO;
+ return CredentialProvider.AUTO;
}
} else {
- credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(configPrefix));
+ return CredentialProvider.valueOf(configProps.getProperty(configPrefix));
}
+ }
+
+ /**
+ * If the provider is ASSUME_ROLE, then the credentials for assuming this role are determined
+ * recursively.
+ *
+ * @param configProps the configuration properties
+ * @param configPrefix the prefix of the config properties for this credentials provider,
+ * e.g. aws.credentials.provider for the base credentials provider,
+ * aws.credentials.provider.role.provider for the credentials provider
+ * for assuming a role, and so on.
+ */
+ private static AWSCredentialsProvider getCredentialsProvider(final Properties configProps, final String configPrefix) {
+ CredentialProvider credentialProviderType = getCredentialProviderType(configProps, configPrefix);
switch (credentialProviderType) {
case ENV_VAR:
@@ -188,9 +207,11 @@ public class AWSUtil {
.webIdentityTokenFile(configProps.getProperty(AWSConfigConstants.webIdentityTokenFile(configPrefix), null))
.build();
- default:
case AUTO:
return new DefaultAWSCredentialsProviderChain();
+
+ default:
+ throw new IllegalArgumentException("Credential provider not supported: " + credentialProviderType);
}
}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
new file mode 100644
index 0000000..f5d95df
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.profiles.ProfileFile;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Utility methods specific to Amazon Web Service SDK v2.x.
+ */
+@Internal
+public class AwsV2Util {
+
+ /**
+ * Creates an Amazon Kinesis Async Client from the provided properties.
+ * Configuration is copied from AWS SDK v1 configuration class as per:
+ * - https://github.com/aws/aws-sdk-java-v2/blob/2.13.52/docs/LaunchChangelog.md#134-client-override-retry-configuration
+ *
+ * @param configProps configuration properties
+ * @param config the AWS SDK v1.x client configuration used to create the client
+ * @return a new Amazon Kinesis Client
+ */
+ public static KinesisAsyncClient createKinesisAsyncClient(final Properties configProps, final ClientConfiguration config) {
+ final SdkAsyncHttpClient httpClient = createHttpClient(config, NettyNioAsyncHttpClient.builder());
+ final ClientOverrideConfiguration overrideConfiguration = createClientOverrideConfiguration(config, ClientOverrideConfiguration.builder());
+ final KinesisAsyncClientBuilder clientBuilder = KinesisAsyncClient.builder();
+
+ return createKinesisAsyncClient(configProps, clientBuilder, httpClient, overrideConfiguration);
+ }
+
+ @VisibleForTesting
+ static SdkAsyncHttpClient createHttpClient(
+ final ClientConfiguration config,
+ final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+ httpClientBuilder
+ .maxConcurrency(config.getMaxConnections())
+ .connectionTimeout(Duration.ofMillis(config.getConnectionTimeout()))
+ .writeTimeout(Duration.ofMillis(config.getSocketTimeout()))
+ .connectionMaxIdleTime(Duration.ofMillis(config.getConnectionMaxIdleMillis()))
+ .useIdleConnectionReaper(config.useReaper());
+
+ if (config.getConnectionTTL() > -1) {
+ httpClientBuilder.connectionTimeToLive(Duration.ofMillis(config.getConnectionTTL()));
+ }
+
+ return httpClientBuilder.build();
+ }
+
+ @VisibleForTesting
+ static ClientOverrideConfiguration createClientOverrideConfiguration(
+ final ClientConfiguration config,
+ final ClientOverrideConfiguration.Builder overrideConfigurationBuilder) {
+
+ overrideConfigurationBuilder
+ .putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, AWSUtil.formatFlinkUserAgentPrefix())
+ .putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, config.getUserAgentSuffix());
+
+ if (config.getRequestTimeout() > 0) {
+ overrideConfigurationBuilder.apiCallAttemptTimeout(Duration.ofMillis(config.getRequestTimeout()));
+ }
+
+ if (config.getClientExecutionTimeout() > 0) {
+ overrideConfigurationBuilder.apiCallTimeout(Duration.ofMillis(config.getClientExecutionTimeout()));
+ }
+
+ return overrideConfigurationBuilder.build();
+ }
+
+ @VisibleForTesting
+ static KinesisAsyncClient createKinesisAsyncClient(
+ final Properties configProps,
+ final KinesisAsyncClientBuilder clientBuilder,
+ final SdkAsyncHttpClient httpClient,
+ final ClientOverrideConfiguration overrideConfiguration) {
+
+ if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
+ final URI endpointOverride = URI.create(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+ clientBuilder.endpointOverride(endpointOverride);
+ }
+
+ return clientBuilder
+ .httpClient(httpClient)
+ .overrideConfiguration(overrideConfiguration)
+ .credentialsProvider(getCredentialsProvider(configProps))
+ .region(getRegion(configProps))
+ .build();
+ }
+
+ /**
+ * Return a {@link AWSCredentialsProvider} instance corresponding to the configuration properties.
+ *
+ * @param configProps the configuration properties
+ * @return The corresponding AWS Credentials Provider instance
+ */
+ public static AwsCredentialsProvider getCredentialsProvider(final Properties configProps) {
+ return getCredentialsProvider(configProps, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+ }
+
+ private static AwsCredentialsProvider getCredentialsProvider(final Properties configProps, final String configPrefix) {
+ CredentialProvider credentialProviderType = AWSUtil.getCredentialProviderType(configProps, configPrefix);
+
+ switch (credentialProviderType) {
+ case ENV_VAR:
+ return EnvironmentVariableCredentialsProvider.create();
+
+ case SYS_PROP:
+ return SystemPropertyCredentialsProvider.create();
+
+ case PROFILE:
+ return getProfileCredentialProvider(configProps, configPrefix);
+
+ case BASIC:
+ return () -> AwsBasicCredentials.create(
+ configProps.getProperty(AWSConfigConstants.accessKeyId(configPrefix)),
+ configProps.getProperty(AWSConfigConstants.secretKey(configPrefix)));
+
+ case ASSUME_ROLE:
+ return getAssumeRoleCredentialProvider(configProps, configPrefix);
+
+ case WEB_IDENTITY_TOKEN:
+ return getWebIdentityTokenFileCredentialsProvider(WebIdentityTokenFileCredentialsProvider.builder(), configProps, configPrefix);
+
+ case AUTO:
+ return DefaultCredentialsProvider.create();
+
+ default:
+ throw new IllegalArgumentException("Credential provider not supported: " + credentialProviderType);
+ }
+ }
+
+ private static AwsCredentialsProvider getProfileCredentialProvider(final Properties configProps, final String configPrefix) {
+ String profileName = configProps.getProperty(AWSConfigConstants.profileName(configPrefix), null);
+ String profileConfigPath = configProps.getProperty(AWSConfigConstants.profilePath(configPrefix), null);
+
+ ProfileCredentialsProvider.Builder profileBuilder = ProfileCredentialsProvider
+ .builder()
+ .profileName(profileName);
+
+ if (profileConfigPath != null) {
+ profileBuilder.profileFile(ProfileFile
+ .builder()
+ .type(ProfileFile.Type.CREDENTIALS)
+ .content(Paths.get(profileConfigPath))
+ .build());
+ }
+
+ return profileBuilder.build();
+ }
+
+ private static AwsCredentialsProvider getAssumeRoleCredentialProvider(final Properties configProps, final String configPrefix) {
+ return StsAssumeRoleCredentialsProvider
+ .builder()
+ .refreshRequest(AssumeRoleRequest
+ .builder()
+ .roleArn(configProps.getProperty(AWSConfigConstants.roleArn(configPrefix)))
+ .roleSessionName(configProps.getProperty(AWSConfigConstants.roleSessionName(configPrefix)))
+ .externalId(configProps.getProperty(AWSConfigConstants.externalId(configPrefix)))
+ .build())
+ .stsClient(StsClient
+ .builder()
+ .credentialsProvider(getCredentialsProvider(configProps, AWSConfigConstants.roleCredentialsProvider(configPrefix)))
+ .region(getRegion(configProps))
+ .build())
+ .build();
+ }
+
+ @VisibleForTesting
+ static AwsCredentialsProvider getWebIdentityTokenFileCredentialsProvider(
+ final WebIdentityTokenFileCredentialsProvider.Builder webIdentityBuilder,
+ final Properties configProps,
+ final String configPrefix) {
+
+ webIdentityBuilder
+ .roleArn(configProps.getProperty(AWSConfigConstants.roleArn(configPrefix), null))
+ .roleSessionName(configProps.getProperty(AWSConfigConstants.roleSessionName(configPrefix), null));
+
+ Optional.ofNullable(configProps.getProperty(AWSConfigConstants.webIdentityTokenFile(configPrefix), null))
+ .map(Paths::get)
+ .ifPresent(webIdentityBuilder::webIdentityTokenFile);
+
+ return webIdentityBuilder.build();
+ }
+
+ /**
+ * Creates a {@link Region} object from the given Properties.
+ *
+ * @param configProps the properties containing the region
+ * @return the region specified by the properties
+ */
+ public static Region getRegion(final Properties configProps) {
+ return Region.of(configProps.getProperty(AWSConfigConstants.AWS_REGION));
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
index 64fd5d1..e66ff51 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
@@ -18,11 +18,16 @@
package org.apache.flink.streaming.connectors.kinesis.util;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
import com.amazonaws.auth.WebIdentityTokenCredentialsProvider;
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -36,6 +41,11 @@ import java.util.Properties;
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider.ASSUME_ROLE;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider.AUTO;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider.BASIC;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider.WEB_IDENTITY_TOKEN;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
@@ -66,23 +76,131 @@ public class AWSUtilTest {
@Test
public void testGetCredentialsProvider() {
Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+ testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);
assertTrue(credentialsProvider instanceof WebIdentityTokenCredentialsProvider);
}
@Test
+ public void testGetCredentialsProviderTypeDefaultsAuto() {
+ assertEquals(AUTO, AWSUtil.getCredentialProviderType(new Properties(), AWS_CREDENTIALS_PROVIDER));
+ }
+
+ @Test
+ public void testGetCredentialsProviderTypeBasic() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+ testConfig.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+ assertEquals(BASIC, AWSUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER));
+ }
+
+ @Test
+ public void testGetCredentialsProviderTypeWebIdentityToken() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+ CredentialProvider type = AWSUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER);
+ assertEquals(WEB_IDENTITY_TOKEN, type);
+ }
+
+ @Test
+ public void testGetCredentialsProviderTypeAssumeRole() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");
+
+ CredentialProvider type = AWSUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER);
+ assertEquals(ASSUME_ROLE, type);
+ }
+
+ @Test
+ public void testGetCredentialsProviderEnvironmentVariables() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+ AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);
+
+ assertTrue(credentialsProvider instanceof EnvironmentVariableCredentialsProvider);
+ }
+
+ @Test
+ public void testGetCredentialsProviderSystemProperties() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+ AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);
+
+ assertTrue(credentialsProvider instanceof SystemPropertiesCredentialsProvider);
+ }
+
+ @Test
+ public void testGetCredentialsProviderBasic() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "BASIC");
+
+ testConfig.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+ testConfig.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+ AWSCredentials credentials = AWSUtil.getCredentialsProvider(testConfig).getCredentials();
+
+ assertEquals("ak", credentials.getAWSAccessKeyId());
+ assertEquals("sk", credentials.getAWSSecretKey());
+ }
+
+ @Test
+ public void testGetCredentialsProviderAuto() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "AUTO");
+
+ AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);
+
+ assertTrue(credentialsProvider instanceof DefaultAWSCredentialsProviderChain);
+ }
+
+ @Test
public void testInvalidCredentialsProvider() {
exception.expect(IllegalArgumentException.class);
Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "INVALID_PROVIDER");
+ testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "INVALID_PROVIDER");
AWSUtil.getCredentialsProvider(testConfig);
}
@Test
+ public void testGetCredentialsProviderProfile() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+ testConfig.setProperty(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "default");
+ testConfig.setProperty(AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER), "src/test/resources/profile");
+
+ AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);
+
+ assertTrue(credentialsProvider instanceof ProfileCredentialsProvider);
+
+ AWSCredentials credentials = credentialsProvider.getCredentials();
+ assertEquals("11111111111111111111", credentials.getAWSAccessKeyId());
+ assertEquals("wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111", credentials.getAWSSecretKey());
+ }
+
+ @Test
+ public void testGetCredentialsProviderNamedProfile() {
+ Properties testConfig = new Properties();
+ testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+ testConfig.setProperty(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "foo");
+ testConfig.setProperty(AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER), "src/test/resources/profile");
+
+ AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);
+
+ assertTrue(credentialsProvider instanceof ProfileCredentialsProvider);
+
+ AWSCredentials credentials = credentialsProvider.getCredentials();
+ assertEquals("22222222222222222222", credentials.getAWSAccessKeyId());
+ assertEquals("wJalrXUtnFEMI/K7MDENG/bPxRfiCY2222222222", credentials.getAWSSecretKey());
+ }
+
+ @Test
public void testValidRegion() {
assertTrue(AWSUtil.isValidRegion("us-east-1"));
}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
new file mode 100644
index 0000000..0862ec2
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AwsV2Util}.
+ */
+public class AwsV2UtilTest {
+
+ @Test
+ public void testGetCredentialsProviderEnvironmentVariables() {
+ Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+ AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+ assertTrue(credentialsProvider instanceof EnvironmentVariableCredentialsProvider);
+ }
+
+ @Test
+ public void testGetCredentialsProviderSystemProperties() {
+ Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+ AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+ assertTrue(credentialsProvider instanceof SystemPropertyCredentialsProvider);
+ }
+
+ @Test
+ public void testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+ Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+ AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+ assertTrue(credentialsProvider instanceof WebIdentityTokenFileCredentialsProvider);
+ }
+
+ @Test
+ public void testGetWebIdentityTokenFileCredentialsProvider() {
+ Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+ properties.setProperty(roleArn(AWS_CREDENTIALS_PROVIDER), "roleArn");
+ properties.setProperty(roleSessionName(AWS_CREDENTIALS_PROVIDER), "roleSessionName");
+
+ WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+ AwsV2Util.getWebIdentityTokenFileCredentialsProvider(builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+ verify(builder).roleArn("roleArn");
+ verify(builder).roleSessionName("roleSessionName");
+ verify(builder, never()).webIdentityTokenFile(any());
+ }
+
+ @Test
+ public void testGetWebIdentityTokenFileCredentialsProviderWithWebIdentityFile() {
+ Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+ properties.setProperty(webIdentityTokenFile(AWS_CREDENTIALS_PROVIDER), "webIdentityTokenFile");
+
+ WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+ AwsV2Util.getWebIdentityTokenFileCredentialsProvider(builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+ verify(builder).webIdentityTokenFile(Paths.get("webIdentityTokenFile"));
+ }
+
+ @Test
+ public void testGetCredentialsProviderAuto() {
+ Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "AUTO");
+
+ AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+ assertTrue(credentialsProvider instanceof DefaultCredentialsProvider);
+ }
+
+ @Test
+ public void testGetCredentialsProviderAssumeRole() {
+ Properties properties = spy(properties(AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE"));
+ properties.setProperty(AWS_REGION, "eu-west-2");
+
+ AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+ assertTrue(credentialsProvider instanceof StsAssumeRoleCredentialsProvider);
+
+ verify(properties).getProperty(AWSConfigConstants.roleArn(AWS_CREDENTIALS_PROVIDER));
+ verify(properties).getProperty(AWSConfigConstants.roleSessionName(AWS_CREDENTIALS_PROVIDER));
+ verify(properties).getProperty(AWSConfigConstants.externalId(AWS_CREDENTIALS_PROVIDER));
+ verify(properties).getProperty(AWS_REGION);
+ }
+
+ @Test
+ public void testGetCredentialsProviderBasic() {
+ Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "BASIC");
+ properties.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+ properties.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+ AwsCredentials credentials = AwsV2Util.getCredentialsProvider(properties).resolveCredentials();
+
+ assertEquals("ak", credentials.accessKeyId());
+ assertEquals("sk", credentials.secretAccessKey());
+ }
+
+ @Test
+ public void testGetCredentialsProviderProfile() {
+ Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+ properties.put(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "default");
+ properties.put(AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER), "src/test/resources/profile");
+
+ AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+ assertTrue(credentialsProvider instanceof ProfileCredentialsProvider);
+
+ AwsCredentials credentials = credentialsProvider.resolveCredentials();
+ assertEquals("11111111111111111111", credentials.accessKeyId());
+ assertEquals("wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111", credentials.secretAccessKey());
+ }
+
+ @Test
+ public void testGetCredentialsProviderNamedProfile() {
+ Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+ properties.setProperty(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "foo");
+ properties.setProperty(AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER), "src/test/resources/profile");
+
+ AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+ assertTrue(credentialsProvider instanceof ProfileCredentialsProvider);
+
+ AwsCredentials credentials = credentialsProvider.resolveCredentials();
+ assertEquals("22222222222222222222", credentials.accessKeyId());
+ assertEquals("wJalrXUtnFEMI/K7MDENG/bPxRfiCY2222222222", credentials.secretAccessKey());
+ }
+
+ @Test
+ public void testGetRegion() {
+ Region region = AwsV2Util.getRegion(properties(AWS_REGION, "eu-west-2"));
+
+ assertEquals(Region.EU_WEST_2, region);
+ }
+
+ @Test
+ public void testCreateKinesisAsyncClient() {
+ Properties properties = properties(AWS_REGION, "eu-west-2");
+ KinesisAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
+ ClientOverrideConfiguration clientOverrideConfiguration = ClientOverrideConfiguration.builder().build();
+ SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build();
+
+ AwsV2Util.createKinesisAsyncClient(properties, builder, httpClient, clientOverrideConfiguration);
+
+ verify(builder).overrideConfiguration(clientOverrideConfiguration);
+ verify(builder).httpClient(httpClient);
+ verify(builder).region(Region.of("eu-west-2"));
+ verify(builder).credentialsProvider(argThat(cp -> cp instanceof DefaultCredentialsProvider));
+ verify(builder, never()).endpointOverride(any());
+ }
+
+ @Test
+ public void testCreateKinesisAsyncClientWithEndpointOverride() {
+ Properties properties = properties(AWS_REGION, "eu-west-2");
+ properties.setProperty(ConsumerConfigConstants.AWS_ENDPOINT, "https://localhost");
+
+ KinesisAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
+ ClientOverrideConfiguration clientOverrideConfiguration = ClientOverrideConfiguration.builder().build();
+ SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build();
+
+ AwsV2Util.createKinesisAsyncClient(properties, builder, httpClient, clientOverrideConfiguration);
+
+ verify(builder).endpointOverride(URI.create("https://localhost"));
+ }
+
+ @Test
+ public void testCreateNettyHttpClientWithDefaults() {
+ ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+ NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+ AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+ verify(builder).build();
+ verify(builder).maxConcurrency(50);
+ verify(builder).connectionTimeout(Duration.ofSeconds(10));
+ verify(builder).writeTimeout(Duration.ofSeconds(50));
+ verify(builder).connectionMaxIdleTime(Duration.ofMinutes(1));
+ verify(builder).useIdleConnectionReaper(true);
+ verify(builder, never()).connectionTimeToLive(any());
+ }
+
+ @Test
+ public void testCreateNettyHttpClientMaxConcurrency() {
+ ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+ clientConfiguration.setMaxConnections(100);
+
+ NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+ AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+ verify(builder).maxConcurrency(100);
+ }
+
+ @Test
+ public void testCreateNettyHttpClientConnectionTimeout() {
+ ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+ clientConfiguration.setConnectionTimeout(1000);
+
+ NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+ AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+ verify(builder).connectionTimeout(Duration.ofSeconds(1));
+ }
+
+ @Test
+ public void testCreateNettyHttpClientWriteTimeout() {
+ ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+ clientConfiguration.setSocketTimeout(3000);
+
+ NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+ AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+ verify(builder).writeTimeout(Duration.ofSeconds(3));
+ }
+
+ @Test
+ public void testCreateNettyHttpClientConnectionMaxIdleTime() {
+ ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+ clientConfiguration.setConnectionMaxIdleMillis(2000);
+
+ NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+ AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+ verify(builder).connectionMaxIdleTime(Duration.ofSeconds(2));
+ }
+
+ @Test
+ public void testCreateNettyHttpClientIdleConnectionReaper() {
+ ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+ clientConfiguration.setUseReaper(false);
+
+ NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+ AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+ verify(builder).useIdleConnectionReaper(false);
+ }
+
+ @Test
+ public void testCreateNettyHttpClientIdleConnectionTtl() {
+ ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+ clientConfiguration.setConnectionTTL(5000);
+
+ NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+ AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+ verify(builder).connectionTimeToLive(Duration.ofSeconds(5));
+ }
+
+ @Test
+ public void testClientOverrideConfigurationWithDefaults() {
+ ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+ ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+ AwsV2Util.createClientOverrideConfiguration(clientConfiguration, builder);
+
+ verify(builder).build();
+ verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, AWSUtil.formatFlinkUserAgentPrefix());
+ verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, null);
+ verify(builder, never()).apiCallAttemptTimeout(any());
+ verify(builder, never()).apiCallTimeout(any());
+ }
+
+ @Test
+ public void testClientOverrideConfigurationUserAgentSuffix() {
+ ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+ clientConfiguration.setUserAgentSuffix("suffix");
+
+ ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+ AwsV2Util.createClientOverrideConfiguration(clientConfiguration, builder);
+
+ verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, "suffix");
+ }
+
+ @Test
+ public void testClientOverrideConfigurationApiCallAttemptTimeout() {
+ ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+ clientConfiguration.setRequestTimeout(500);
+
+ ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+ AwsV2Util.createClientOverrideConfiguration(clientConfiguration, builder);
+
+ verify(builder).apiCallAttemptTimeout(Duration.ofMillis(500));
+ }
+
+ @Test
+ public void testClientOverrideConfigurationApiCallTimeout() {
+ ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+ clientConfiguration.setClientExecutionTimeout(600);
+
+ ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+ AwsV2Util.createClientOverrideConfiguration(clientConfiguration, builder);
+
+ verify(builder).apiCallTimeout(Duration.ofMillis(600));
+ }
+
+ private Properties properties(final String key, final String value) {
+ Properties properties = new Properties();
+ properties.setProperty(key, value);
+ return properties;
+ }
+
+ private KinesisAsyncClientBuilder mockKinesisAsyncClientBuilder() {
+ KinesisAsyncClientBuilder builder = mock(KinesisAsyncClientBuilder.class);
+ when(builder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(builder);
+ when(builder.httpClient(any())).thenReturn(builder);
+ when(builder.credentialsProvider(any())).thenReturn(builder);
+ when(builder.region(any())).thenReturn(builder);
+
+ return builder;
+ }
+
+ private NettyNioAsyncHttpClient.Builder mockHttpClientBuilder() {
+ NettyNioAsyncHttpClient.Builder builder = mock(NettyNioAsyncHttpClient.Builder.class);
+ when(builder.maxConcurrency(anyInt())).thenReturn(builder);
+ when(builder.connectionTimeout(any())).thenReturn(builder);
+ when(builder.writeTimeout(any())).thenReturn(builder);
+ when(builder.connectionMaxIdleTime(any())).thenReturn(builder);
+ when(builder.useIdleConnectionReaper(anyBoolean())).thenReturn(builder);
+
+ return builder;
+ }
+
+ private ClientOverrideConfiguration.Builder mockClientOverrideConfigurationBuilder() {
+ ClientOverrideConfiguration.Builder builder = mock(ClientOverrideConfiguration.Builder.class);
+ when(builder.putAdvancedOption(any(), any())).thenReturn(builder);
+ when(builder.apiCallAttemptTimeout(any())).thenReturn(builder);
+ when(builder.apiCallTimeout(any())).thenReturn(builder);
+
+ return builder;
+ }
+
+ private WebIdentityTokenFileCredentialsProvider.Builder mockWebIdentityTokenFileCredentialsProviderBuilder() {
+ WebIdentityTokenFileCredentialsProvider.Builder builder = mock(WebIdentityTokenFileCredentialsProvider.Builder.class);
+ when(builder.roleArn(any())).thenReturn(builder);
+ when(builder.roleSessionName(any())).thenReturn(builder);
+ when(builder.webIdentityTokenFile(any())).thenReturn(builder);
+
+ return builder;
+ }
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/resources/profile b/flink-connectors/flink-connector-kinesis/src/test/resources/profile
new file mode 100644
index 0000000..2573fd6
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/resources/profile
@@ -0,0 +1,7 @@
+[default]
+aws_access_key_id=11111111111111111111
+aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111
+
+[foo]
+aws_access_key_id=22222222222222222222
+aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCY2222222222