You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/09/03 13:17:04 UTC

[flink] 01/06: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2967414a14d37a10d354ae00e3e84e035e3a7e4e
Author: Danny Cranmer <cr...@amazon.com>
AuthorDate: Tue Jul 21 09:40:22 2020 +0100

    [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2
---
 flink-connectors/flink-connector-kinesis/pom.xml   |  42 +++
 .../connectors/kinesis/proxy/KinesisProxyV2.java   |  60 +++
 .../kinesis/proxy/KinesisProxyV2Interface.java     |  28 ++
 .../streaming/connectors/kinesis/util/AWSUtil.java |  53 ++-
 .../connectors/kinesis/util/AwsV2Util.java         | 237 ++++++++++++
 .../connectors/kinesis/util/AWSUtilTest.java       | 122 +++++-
 .../connectors/kinesis/util/AwsV2UtilTest.java     | 407 +++++++++++++++++++++
 .../src/test/resources/profile                     |   7 +
 8 files changed, 938 insertions(+), 18 deletions(-)

diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index 8e0bf7c..8b43028 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -34,6 +34,7 @@ under the License.
 	<name>Flink : Connectors : Kinesis</name>
 	<properties>
 		<aws.sdk.version>1.11.754</aws.sdk.version>
+		<aws.sdkv2.version>2.13.52</aws.sdkv2.version>
 		<aws.kinesis-kcl.version>1.11.2</aws.kinesis-kcl.version>
 		<aws.kinesis-kpl.version>0.14.0</aws.kinesis-kpl.version>
 		<aws.dynamodbstreams-kinesis-adapter.version>1.5.0</aws.dynamodbstreams-kinesis-adapter.version>
@@ -161,6 +162,25 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<!-- Amazon AWS SDK v2.x dependencies -->
+		<dependency>
+			<groupId>software.amazon.awssdk</groupId>
+			<artifactId>kinesis</artifactId>
+			<version>${aws.sdkv2.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>software.amazon.awssdk</groupId>
+			<artifactId>netty-nio-client</artifactId>
+			<version>${aws.sdkv2.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>software.amazon.awssdk</groupId>
+			<artifactId>sts</artifactId>
+			<version>${aws.sdkv2.version}</version>
+		</dependency>
+
 	</dependencies>
 
 	<dependencyManagement>
@@ -211,6 +231,12 @@ under the License.
 									<include>com.amazonaws:*</include>
 									<include>com.google.protobuf:*</include>
 									<include>org.apache.httpcomponents:*</include>
+									<include>software.amazon.awssdk:*</include>
+									<include>software.amazon.eventstream:*</include>
+									<include>software.amazon.ion:*</include>
+									<include>org.reactivestreams:*</include>
+									<include>io.netty:*</include>
+									<include>com.typesafe.netty:*</include>
 								</includes>
 							</artifactSet>
 							<relocations combine.children="override">
@@ -228,6 +254,22 @@ under the License.
 									<pattern>org.apache.http</pattern>
 									<shadedPattern>org.apache.flink.kinesis.shaded.org.apache.http</shadedPattern>
 								</relocation>
+								<relocation>
+									<pattern>software.amazon</pattern>
+									<shadedPattern>org.apache.flink.kinesis.shaded.software.amazon</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>io.netty</pattern>
+									<shadedPattern>org.apache.flink.kinesis.shaded.io.netty</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>com.typesafe.netty</pattern>
+									<shadedPattern>org.apache.flink.kinesis.shaded.com.typesafe.netty</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>org.reactivestreams</pattern>
+									<shadedPattern>org.apache.flink.kinesis.shaded.org.reactivestreams</shadedPattern>
+								</relocation>
 							</relocations>
 							<filters>
 								<filter>
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
new file mode 100644
index 0000000..8c7fb52
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+import java.util.Properties;
+
+/**
+ * Kinesis proxy implementation using AWS SDK v2.x - a utility class that is used as a proxy to make
+ * calls to AWS Kinesis for several EFO (Enhanced Fan Out) functions, such as de-/registering stream consumers,
+ * subscribing to a shard and receiving records from a shard.
+ */
+@Internal
+public class KinesisProxyV2 implements KinesisProxyV2Interface {
+
+	private final KinesisAsyncClient kinesisAsyncClient;
+
+	/**
+	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
+	 *
+	 * @param configProps configuration properties containing AWS credential and AWS region info
+	 */
+	public KinesisProxyV2(final Properties configProps) {
+		this.kinesisAsyncClient = createKinesisAsyncClient(configProps);
+	}
+
+	/**
+	 * Create the Kinesis client, using the provided configuration properties.
+	 * Derived classes can override this method to customize the client configuration.
+	 *
+	 * @param configProps the properties map used to create the Kinesis Client
+	 * @return a Kinesis Client
+	 */
+	protected KinesisAsyncClient createKinesisAsyncClient(final Properties configProps) {
+		final ClientConfiguration config = new ClientConfigurationFactory().getConfig();
+		return AwsV2Util.createKinesisAsyncClient(configProps, config);
+	}
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java
new file mode 100644
index 0000000..aff6a85
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Interface.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for a Kinesis proxy using AWS SDK v2.x operating on multiple Kinesis streams within the same AWS service region.
+ */
+@Internal
+public interface KinesisProxyV2Interface {
+
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 6652529..7301e7a 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -82,9 +82,7 @@ public class AWSUtil {
 	 */
 	public static AmazonKinesis createKinesisClient(Properties configProps, ClientConfiguration awsClientConfig) {
 		// set a Flink-specific user agent
-		awsClientConfig.setUserAgentPrefix(String.format(USER_AGENT_FORMAT,
-				EnvironmentInformation.getVersion(),
-				EnvironmentInformation.getRevisionInformation().commitId));
+		awsClientConfig.setUserAgentPrefix(formatFlinkUserAgentPrefix());
 
 		// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
 		AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
@@ -104,6 +102,19 @@ public class AWSUtil {
 	}
 
 	/**
+	 * Creates a user agent prefix for Flink.
+	 * This can be used by HTTP Clients.
+	 *
+	 * @return a user agent prefix for Flink
+	 */
+	public static String formatFlinkUserAgentPrefix() {
+		return String.format(
+			USER_AGENT_FORMAT,
+			EnvironmentInformation.getVersion(),
+			EnvironmentInformation.getRevisionInformation().commitId);
+	}
+
+	/**
 	 * Return a {@link AWSCredentialsProvider} instance corresponding to the configuration properties.
 	 *
 	 * @param configProps the configuration properties
@@ -114,29 +125,37 @@ public class AWSUtil {
 	}
 
 	/**
-	 * If the provider is ASSUME_ROLE, then the credentials for assuming this role are determined
-	 * recursively.
+	 * Determines and returns the credential provider type from the given properties.
 	 *
-	 * @param configProps the configuration properties
-	 * @param configPrefix the prefix of the config properties for this credentials provider,
-	 *                     e.g. aws.credentials.provider for the base credentials provider,
-	 *                     aws.credentials.provider.role.provider for the credentials provider
-	 *                     for assuming a role, and so on.
+	 * @return the credential provider type
 	 */
-	private static AWSCredentialsProvider getCredentialsProvider(final Properties configProps, final String configPrefix) {
-		CredentialProvider credentialProviderType;
+	static CredentialProvider getCredentialProviderType(final Properties configProps, final String configPrefix) {
 		if (!configProps.containsKey(configPrefix)) {
 			if (configProps.containsKey(AWSConfigConstants.accessKeyId(configPrefix))
 				&& configProps.containsKey(AWSConfigConstants.secretKey(configPrefix))) {
 				// if the credential provider type is not specified, but the Access Key ID and Secret Key are given, it will default to BASIC
-				credentialProviderType = CredentialProvider.BASIC;
+				return CredentialProvider.BASIC;
 			} else {
 				// if the credential provider type is not specified, it will default to AUTO
-				credentialProviderType = CredentialProvider.AUTO;
+				return CredentialProvider.AUTO;
 			}
 		} else {
-			credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(configPrefix));
+			return CredentialProvider.valueOf(configProps.getProperty(configPrefix));
 		}
+	}
+
+	/**
+	 * If the provider is ASSUME_ROLE, then the credentials for assuming this role are determined
+	 * recursively.
+	 *
+	 * @param configProps the configuration properties
+	 * @param configPrefix the prefix of the config properties for this credentials provider,
+	 *                     e.g. aws.credentials.provider for the base credentials provider,
+	 *                     aws.credentials.provider.role.provider for the credentials provider
+	 *                     for assuming a role, and so on.
+	 */
+	private static AWSCredentialsProvider getCredentialsProvider(final Properties configProps, final String configPrefix) {
+		CredentialProvider credentialProviderType = getCredentialProviderType(configProps, configPrefix);
 
 		switch (credentialProviderType) {
 			case ENV_VAR:
@@ -188,9 +207,11 @@ public class AWSUtil {
 						.webIdentityTokenFile(configProps.getProperty(AWSConfigConstants.webIdentityTokenFile(configPrefix), null))
 						.build();
 
-			default:
 			case AUTO:
 				return new DefaultAWSCredentialsProviderChain();
+
+			default:
+				throw new IllegalArgumentException("Credential provider not supported: " + credentialProviderType);
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
new file mode 100644
index 0000000..f5d95df
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.profiles.ProfileFile;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Utility methods specific to Amazon Web Service SDK v2.x.
+ */
+@Internal
+public class AwsV2Util {
+
+	/**
+	 * Creates an Amazon Kinesis Async Client from the provided properties.
+	 * Configuration is copied from AWS SDK v1 configuration class as per:
+	 * - https://github.com/aws/aws-sdk-java-v2/blob/2.13.52/docs/LaunchChangelog.md#134-client-override-retry-configuration
+	 *
+	 * @param configProps configuration properties
+	 * @param config the AWS SDK v1.x client configuration used to create the client
+	 * @return a new Amazon Kinesis Client
+	 */
+	public static KinesisAsyncClient createKinesisAsyncClient(final Properties configProps, final ClientConfiguration config) {
+		final SdkAsyncHttpClient httpClient = createHttpClient(config, NettyNioAsyncHttpClient.builder());
+		final ClientOverrideConfiguration overrideConfiguration = createClientOverrideConfiguration(config, ClientOverrideConfiguration.builder());
+		final KinesisAsyncClientBuilder clientBuilder = KinesisAsyncClient.builder();
+
+		return createKinesisAsyncClient(configProps, clientBuilder, httpClient, overrideConfiguration);
+	}
+
+	@VisibleForTesting
+	static SdkAsyncHttpClient createHttpClient(
+			final ClientConfiguration config,
+			final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+		httpClientBuilder
+			.maxConcurrency(config.getMaxConnections())
+			.connectionTimeout(Duration.ofMillis(config.getConnectionTimeout()))
+			.writeTimeout(Duration.ofMillis(config.getSocketTimeout()))
+			.connectionMaxIdleTime(Duration.ofMillis(config.getConnectionMaxIdleMillis()))
+			.useIdleConnectionReaper(config.useReaper());
+
+		if (config.getConnectionTTL() > -1) {
+			httpClientBuilder.connectionTimeToLive(Duration.ofMillis(config.getConnectionTTL()));
+		}
+
+		return httpClientBuilder.build();
+	}
+
+	@VisibleForTesting
+	static ClientOverrideConfiguration createClientOverrideConfiguration(
+			final ClientConfiguration config,
+			final ClientOverrideConfiguration.Builder overrideConfigurationBuilder) {
+
+		overrideConfigurationBuilder
+			.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, AWSUtil.formatFlinkUserAgentPrefix())
+			.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, config.getUserAgentSuffix());
+
+		if (config.getRequestTimeout() > 0) {
+			overrideConfigurationBuilder.apiCallAttemptTimeout(Duration.ofMillis(config.getRequestTimeout()));
+		}
+
+		if (config.getClientExecutionTimeout() > 0) {
+			overrideConfigurationBuilder.apiCallTimeout(Duration.ofMillis(config.getClientExecutionTimeout()));
+		}
+
+		return overrideConfigurationBuilder.build();
+	}
+
+	@VisibleForTesting
+	static KinesisAsyncClient createKinesisAsyncClient(
+			final Properties configProps,
+			final KinesisAsyncClientBuilder clientBuilder,
+			final SdkAsyncHttpClient httpClient,
+			final ClientOverrideConfiguration overrideConfiguration) {
+
+		if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
+			final URI endpointOverride = URI.create(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+			clientBuilder.endpointOverride(endpointOverride);
+		}
+
+		return clientBuilder
+			.httpClient(httpClient)
+			.overrideConfiguration(overrideConfiguration)
+			.credentialsProvider(getCredentialsProvider(configProps))
+			.region(getRegion(configProps))
+			.build();
+	}
+
+	/**
+	 * Return a {@link AWSCredentialsProvider} instance corresponding to the configuration properties.
+	 *
+	 * @param configProps the configuration properties
+	 * @return The corresponding AWS Credentials Provider instance
+	 */
+	public static AwsCredentialsProvider getCredentialsProvider(final Properties configProps) {
+		return getCredentialsProvider(configProps, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+	}
+
+	private static AwsCredentialsProvider getCredentialsProvider(final Properties configProps, final String configPrefix) {
+		CredentialProvider credentialProviderType = AWSUtil.getCredentialProviderType(configProps, configPrefix);
+
+		switch (credentialProviderType) {
+			case ENV_VAR:
+				return EnvironmentVariableCredentialsProvider.create();
+
+			case SYS_PROP:
+				return SystemPropertyCredentialsProvider.create();
+
+			case PROFILE:
+				return getProfileCredentialProvider(configProps, configPrefix);
+
+			case BASIC:
+				return () -> AwsBasicCredentials.create(
+					configProps.getProperty(AWSConfigConstants.accessKeyId(configPrefix)),
+					configProps.getProperty(AWSConfigConstants.secretKey(configPrefix)));
+
+			case ASSUME_ROLE:
+				return getAssumeRoleCredentialProvider(configProps, configPrefix);
+
+			case WEB_IDENTITY_TOKEN:
+				return getWebIdentityTokenFileCredentialsProvider(WebIdentityTokenFileCredentialsProvider.builder(), configProps, configPrefix);
+
+			case AUTO:
+				return DefaultCredentialsProvider.create();
+
+			default:
+				throw new IllegalArgumentException("Credential provider not supported: " + credentialProviderType);
+		}
+	}
+
+	private static AwsCredentialsProvider getProfileCredentialProvider(final Properties configProps, final String configPrefix) {
+		String profileName = configProps.getProperty(AWSConfigConstants.profileName(configPrefix), null);
+		String profileConfigPath = configProps.getProperty(AWSConfigConstants.profilePath(configPrefix), null);
+
+		ProfileCredentialsProvider.Builder profileBuilder = ProfileCredentialsProvider
+			.builder()
+			.profileName(profileName);
+
+		if (profileConfigPath != null) {
+			profileBuilder.profileFile(ProfileFile
+				.builder()
+				.type(ProfileFile.Type.CREDENTIALS)
+				.content(Paths.get(profileConfigPath))
+				.build());
+		}
+
+		return profileBuilder.build();
+	}
+
+	private static AwsCredentialsProvider getAssumeRoleCredentialProvider(final Properties configProps, final String configPrefix) {
+		return StsAssumeRoleCredentialsProvider
+			.builder()
+			.refreshRequest(AssumeRoleRequest
+				.builder()
+				.roleArn(configProps.getProperty(AWSConfigConstants.roleArn(configPrefix)))
+				.roleSessionName(configProps.getProperty(AWSConfigConstants.roleSessionName(configPrefix)))
+				.externalId(configProps.getProperty(AWSConfigConstants.externalId(configPrefix)))
+				.build())
+			.stsClient(StsClient
+				.builder()
+				.credentialsProvider(getCredentialsProvider(configProps, AWSConfigConstants.roleCredentialsProvider(configPrefix)))
+				.region(getRegion(configProps))
+				.build())
+			.build();
+	}
+
+	@VisibleForTesting
+	static AwsCredentialsProvider getWebIdentityTokenFileCredentialsProvider(
+			final WebIdentityTokenFileCredentialsProvider.Builder webIdentityBuilder,
+			final Properties configProps,
+			final String configPrefix) {
+
+		webIdentityBuilder
+			.roleArn(configProps.getProperty(AWSConfigConstants.roleArn(configPrefix), null))
+			.roleSessionName(configProps.getProperty(AWSConfigConstants.roleSessionName(configPrefix), null));
+
+		Optional.ofNullable(configProps.getProperty(AWSConfigConstants.webIdentityTokenFile(configPrefix), null))
+			.map(Paths::get)
+			.ifPresent(webIdentityBuilder::webIdentityTokenFile);
+
+		return webIdentityBuilder.build();
+	}
+
+	/**
+	 * Creates a {@link Region} object from the given Properties.
+	 *
+	 * @param configProps the properties containing the region
+	 * @return the region specified by the properties
+	 */
+	public static Region getRegion(final Properties configProps) {
+		return Region.of(configProps.getProperty(AWSConfigConstants.AWS_REGION));
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
index 64fd5d1..e66ff51 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtilTest.java
@@ -18,11 +18,16 @@
 package org.apache.flink.streaming.connectors.kinesis.util;
 
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
 import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
 
+import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.WebIdentityTokenCredentialsProvider;
+import com.amazonaws.auth.profile.ProfileCredentialsProvider;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -36,6 +41,11 @@ import java.util.Properties;
 
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.LATEST;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider.ASSUME_ROLE;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider.AUTO;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider.BASIC;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider.WEB_IDENTITY_TOKEN;
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT;
 import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP;
 import static org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM;
@@ -66,23 +76,131 @@ public class AWSUtilTest {
 	@Test
 	public void testGetCredentialsProvider() {
 		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+		testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
 
 		AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);
 		assertTrue(credentialsProvider instanceof WebIdentityTokenCredentialsProvider);
 	}
 
 	@Test
+	public void testGetCredentialsProviderTypeDefaultsAuto() {
+		assertEquals(AUTO, AWSUtil.getCredentialProviderType(new Properties(), AWS_CREDENTIALS_PROVIDER));
+	}
+
+	@Test
+	public void testGetCredentialsProviderTypeBasic() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+		testConfig.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+		assertEquals(BASIC, AWSUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER));
+	}
+
+	@Test
+	public void testGetCredentialsProviderTypeWebIdentityToken() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+		CredentialProvider type = AWSUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER);
+		assertEquals(WEB_IDENTITY_TOKEN, type);
+	}
+
+	@Test
+	public void testGetCredentialsProviderTypeAssumeRole() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");
+
+		CredentialProvider type = AWSUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER);
+		assertEquals(ASSUME_ROLE, type);
+	}
+
+	@Test
+	public void testGetCredentialsProviderEnvironmentVariables() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+		AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);
+
+		assertTrue(credentialsProvider instanceof EnvironmentVariableCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderSystemProperties() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+		AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);
+
+		assertTrue(credentialsProvider instanceof SystemPropertiesCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderBasic() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "BASIC");
+
+		testConfig.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+		testConfig.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+		AWSCredentials credentials = AWSUtil.getCredentialsProvider(testConfig).getCredentials();
+
+		assertEquals("ak", credentials.getAWSAccessKeyId());
+		assertEquals("sk", credentials.getAWSSecretKey());
+	}
+
+	@Test
+	public void testGetCredentialsProviderAuto() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "AUTO");
+
+		AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);
+
+		assertTrue(credentialsProvider instanceof DefaultAWSCredentialsProviderChain);
+	}
+
+	@Test
 	public void testInvalidCredentialsProvider() {
 		exception.expect(IllegalArgumentException.class);
 
 		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "INVALID_PROVIDER");
+		testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "INVALID_PROVIDER");
 
 		AWSUtil.getCredentialsProvider(testConfig);
 	}
 
 	@Test
+	public void testGetCredentialsProviderProfile() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+		testConfig.setProperty(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "default");
+		testConfig.setProperty(AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER), "src/test/resources/profile");
+
+		AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);
+
+		assertTrue(credentialsProvider instanceof ProfileCredentialsProvider);
+
+		AWSCredentials credentials = credentialsProvider.getCredentials();
+		assertEquals("11111111111111111111", credentials.getAWSAccessKeyId());
+		assertEquals("wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111", credentials.getAWSSecretKey());
+	}
+
+	@Test
+	public void testGetCredentialsProviderNamedProfile() {
+		Properties testConfig = new Properties();
+		testConfig.setProperty(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+		testConfig.setProperty(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "foo");
+		testConfig.setProperty(AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER), "src/test/resources/profile");
+
+		AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);
+
+		assertTrue(credentialsProvider instanceof ProfileCredentialsProvider);
+
+		AWSCredentials credentials = credentialsProvider.getCredentials();
+		assertEquals("22222222222222222222", credentials.getAWSAccessKeyId());
+		assertEquals("wJalrXUtnFEMI/K7MDENG/bPxRfiCY2222222222", credentials.getAWSSecretKey());
+	}
+
+	@Test
 	public void testValidRegion() {
 		assertTrue(AWSUtil.isValidRegion("us-east-1"));
 	}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
new file mode 100644
index 0000000..0862ec2
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AwsV2Util}.
+ */
+public class AwsV2UtilTest {
+
+	@Test
+	public void testGetCredentialsProviderEnvironmentVariables() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof EnvironmentVariableCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderSystemProperties() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof SystemPropertyCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof WebIdentityTokenFileCredentialsProvider);
+	}
+
+	@Test
+	public void testGetWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+		properties.setProperty(roleArn(AWS_CREDENTIALS_PROVIDER), "roleArn");
+		properties.setProperty(roleSessionName(AWS_CREDENTIALS_PROVIDER), "roleSessionName");
+
+		WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+		AwsV2Util.getWebIdentityTokenFileCredentialsProvider(builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+		verify(builder).roleArn("roleArn");
+		verify(builder).roleSessionName("roleSessionName");
+		verify(builder, never()).webIdentityTokenFile(any());
+	}
+
+	@Test
+	public void testGetWebIdentityTokenFileCredentialsProviderWithWebIdentityFile() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+		properties.setProperty(webIdentityTokenFile(AWS_CREDENTIALS_PROVIDER), "webIdentityTokenFile");
+
+		WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+		AwsV2Util.getWebIdentityTokenFileCredentialsProvider(builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+		verify(builder).webIdentityTokenFile(Paths.get("webIdentityTokenFile"));
+	}
+
+	@Test
+	public void testGetCredentialsProviderAuto() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "AUTO");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof DefaultCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderAssumeRole() {
+		Properties properties = spy(properties(AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE"));
+		properties.setProperty(AWS_REGION, "eu-west-2");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof StsAssumeRoleCredentialsProvider);
+
+		verify(properties).getProperty(AWSConfigConstants.roleArn(AWS_CREDENTIALS_PROVIDER));
+		verify(properties).getProperty(AWSConfigConstants.roleSessionName(AWS_CREDENTIALS_PROVIDER));
+		verify(properties).getProperty(AWSConfigConstants.externalId(AWS_CREDENTIALS_PROVIDER));
+		verify(properties).getProperty(AWS_REGION);
+	}
+
+	@Test
+	public void testGetCredentialsProviderBasic() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "BASIC");
+		properties.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+		properties.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+		AwsCredentials credentials = AwsV2Util.getCredentialsProvider(properties).resolveCredentials();
+
+		assertEquals("ak", credentials.accessKeyId());
+		assertEquals("sk", credentials.secretAccessKey());
+	}
+
+	@Test
+	public void testGetCredentialsProviderProfile() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+		properties.put(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "default");
+		properties.put(AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER), "src/test/resources/profile");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof ProfileCredentialsProvider);
+
+		AwsCredentials credentials = credentialsProvider.resolveCredentials();
+		assertEquals("11111111111111111111", credentials.accessKeyId());
+		assertEquals("wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111", credentials.secretAccessKey());
+	}
+
+	@Test
+	public void testGetCredentialsProviderNamedProfile() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+		properties.setProperty(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "foo");
+		properties.setProperty(AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER), "src/test/resources/profile");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof ProfileCredentialsProvider);
+
+		AwsCredentials credentials = credentialsProvider.resolveCredentials();
+		assertEquals("22222222222222222222", credentials.accessKeyId());
+		assertEquals("wJalrXUtnFEMI/K7MDENG/bPxRfiCY2222222222", credentials.secretAccessKey());
+	}
+
+	@Test
+	public void testGetRegion() {
+		Region region = AwsV2Util.getRegion(properties(AWS_REGION, "eu-west-2"));
+
+		assertEquals(Region.EU_WEST_2, region);
+	}
+
+	@Test
+	public void testCreateKinesisAsyncClient() {
+		Properties properties = properties(AWS_REGION, "eu-west-2");
+		KinesisAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
+		ClientOverrideConfiguration clientOverrideConfiguration = ClientOverrideConfiguration.builder().build();
+		SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build();
+
+		AwsV2Util.createKinesisAsyncClient(properties, builder, httpClient, clientOverrideConfiguration);
+
+		verify(builder).overrideConfiguration(clientOverrideConfiguration);
+		verify(builder).httpClient(httpClient);
+		verify(builder).region(Region.of("eu-west-2"));
+		verify(builder).credentialsProvider(argThat(cp -> cp instanceof DefaultCredentialsProvider));
+		verify(builder, never()).endpointOverride(any());
+	}
+
+	@Test
+	public void testCreateKinesisAsyncClientWithEndpointOverride() {
+		Properties properties = properties(AWS_REGION, "eu-west-2");
+		properties.setProperty(ConsumerConfigConstants.AWS_ENDPOINT, "https://localhost");
+
+		KinesisAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
+		ClientOverrideConfiguration clientOverrideConfiguration = ClientOverrideConfiguration.builder().build();
+		SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build();
+
+		AwsV2Util.createKinesisAsyncClient(properties, builder, httpClient, clientOverrideConfiguration);
+
+		verify(builder).endpointOverride(URI.create("https://localhost"));
+	}
+
+	@Test
+	public void testCreateNettyHttpClientWithDefaults() {
+		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+		AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+		verify(builder).build();
+		verify(builder).maxConcurrency(50);
+		verify(builder).connectionTimeout(Duration.ofSeconds(10));
+		verify(builder).writeTimeout(Duration.ofSeconds(50));
+		verify(builder).connectionMaxIdleTime(Duration.ofMinutes(1));
+		verify(builder).useIdleConnectionReaper(true);
+		verify(builder, never()).connectionTimeToLive(any());
+	}
+
+	@Test
+	public void testCreateNettyHttpClientMaxConcurrency() {
+		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+		clientConfiguration.setMaxConnections(100);
+
+		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+		AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+		verify(builder).maxConcurrency(100);
+	}
+
+	@Test
+	public void testCreateNettyHttpClientConnectionTimeout() {
+		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+		clientConfiguration.setConnectionTimeout(1000);
+
+		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+		AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+		verify(builder).connectionTimeout(Duration.ofSeconds(1));
+	}
+
+	@Test
+	public void testCreateNettyHttpClientWriteTimeout() {
+		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+		clientConfiguration.setSocketTimeout(3000);
+
+		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+		AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+		verify(builder).writeTimeout(Duration.ofSeconds(3));
+	}
+
+	@Test
+	public void testCreateNettyHttpClientConnectionMaxIdleTime() {
+		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+		clientConfiguration.setConnectionMaxIdleMillis(2000);
+
+		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+		AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+		verify(builder).connectionMaxIdleTime(Duration.ofSeconds(2));
+	}
+
+	@Test
+	public void testCreateNettyHttpClientIdleConnectionReaper() {
+		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+		clientConfiguration.setUseReaper(false);
+
+		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+		AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+		verify(builder).useIdleConnectionReaper(false);
+	}
+
+	@Test
+	public void testCreateNettyHttpClientIdleConnectionTtl() {
+		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+		clientConfiguration.setConnectionTTL(5000);
+
+		NettyNioAsyncHttpClient.Builder builder = mockHttpClientBuilder();
+
+		AwsV2Util.createHttpClient(clientConfiguration, builder);
+
+		verify(builder).connectionTimeToLive(Duration.ofSeconds(5));
+	}
+
+	@Test
+	public void testClientOverrideConfigurationWithDefaults() {
+		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+		ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+		AwsV2Util.createClientOverrideConfiguration(clientConfiguration, builder);
+
+		verify(builder).build();
+		verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, AWSUtil.formatFlinkUserAgentPrefix());
+		verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, null);
+		verify(builder, never()).apiCallAttemptTimeout(any());
+		verify(builder, never()).apiCallTimeout(any());
+	}
+
+	@Test
+	public void testClientOverrideConfigurationUserAgentSuffix() {
+		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+		clientConfiguration.setUserAgentSuffix("suffix");
+
+		ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+		AwsV2Util.createClientOverrideConfiguration(clientConfiguration, builder);
+
+		verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, "suffix");
+	}
+
+	@Test
+	public void testClientOverrideConfigurationApiCallAttemptTimeout() {
+		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+		clientConfiguration.setRequestTimeout(500);
+
+		ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+		AwsV2Util.createClientOverrideConfiguration(clientConfiguration, builder);
+
+		verify(builder).apiCallAttemptTimeout(Duration.ofMillis(500));
+	}
+
+	@Test
+	public void testClientOverrideConfigurationApiCallTimeout() {
+		ClientConfiguration clientConfiguration = new ClientConfigurationFactory().getConfig();
+		clientConfiguration.setClientExecutionTimeout(600);
+
+		ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+		AwsV2Util.createClientOverrideConfiguration(clientConfiguration, builder);
+
+		verify(builder).apiCallTimeout(Duration.ofMillis(600));
+	}
+
+	private Properties properties(final String key, final String value) {
+		Properties properties = new Properties();
+		properties.setProperty(key, value);
+		return properties;
+	}
+
+	private KinesisAsyncClientBuilder mockKinesisAsyncClientBuilder() {
+		KinesisAsyncClientBuilder builder = mock(KinesisAsyncClientBuilder.class);
+		when(builder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(builder);
+		when(builder.httpClient(any())).thenReturn(builder);
+		when(builder.credentialsProvider(any())).thenReturn(builder);
+		when(builder.region(any())).thenReturn(builder);
+
+		return builder;
+	}
+
+	private NettyNioAsyncHttpClient.Builder mockHttpClientBuilder() {
+		NettyNioAsyncHttpClient.Builder builder = mock(NettyNioAsyncHttpClient.Builder.class);
+		when(builder.maxConcurrency(anyInt())).thenReturn(builder);
+		when(builder.connectionTimeout(any())).thenReturn(builder);
+		when(builder.writeTimeout(any())).thenReturn(builder);
+		when(builder.connectionMaxIdleTime(any())).thenReturn(builder);
+		when(builder.useIdleConnectionReaper(anyBoolean())).thenReturn(builder);
+
+		return builder;
+	}
+
+	private ClientOverrideConfiguration.Builder mockClientOverrideConfigurationBuilder() {
+		ClientOverrideConfiguration.Builder builder = mock(ClientOverrideConfiguration.Builder.class);
+		when(builder.putAdvancedOption(any(), any())).thenReturn(builder);
+		when(builder.apiCallAttemptTimeout(any())).thenReturn(builder);
+		when(builder.apiCallTimeout(any())).thenReturn(builder);
+
+		return builder;
+	}
+
+	private WebIdentityTokenFileCredentialsProvider.Builder mockWebIdentityTokenFileCredentialsProviderBuilder() {
+		WebIdentityTokenFileCredentialsProvider.Builder builder = mock(WebIdentityTokenFileCredentialsProvider.Builder.class);
+		when(builder.roleArn(any())).thenReturn(builder);
+		when(builder.roleSessionName(any())).thenReturn(builder);
+		when(builder.webIdentityTokenFile(any())).thenReturn(builder);
+
+		return builder;
+	}
+}
diff --git a/flink-connectors/flink-connector-kinesis/src/test/resources/profile b/flink-connectors/flink-connector-kinesis/src/test/resources/profile
new file mode 100644
index 0000000..2573fd6
--- /dev/null
+++ b/flink-connectors/flink-connector-kinesis/src/test/resources/profile
@@ -0,0 +1,7 @@
+[default]
+aws_access_key_id=11111111111111111111
+aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111
+
+[foo]
+aws_access_key_id=22222222222222222222
+aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCY2222222222