You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2021/12/02 14:38:32 UTC

[flink] branch master updated: [FLINK-24227][connectors/kinesis] Moved AWS general utils to base connector, Updated AWS SDK v2 dependency to meet convergence criteria, Updated async http client creation with new defaults

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c907bae7 [FLINK-24227][connectors/kinesis] Moved AWS general utils to base connector, Updated AWS SDK v2 dependency to meet convergence criteria, Updated async http client creation with new defaults
c907bae7 is described below

commit c907bae7f4837ca23299e60aca9c8847454b245f
Author: Nuno Afonso <af...@amazon.com>
AuthorDate: Tue Nov 30 15:35:02 2021 +0000

    [FLINK-24227][connectors/kinesis] Moved AWS general utils to base connector, Updated AWS SDK v2 dependency to meet convergence criteria, Updated async http client creation with new defaults
---
 flink-architecture-tests/pom.xml                   |   7 +
 flink-connectors/flink-connector-aws-base/pom.xml  |  77 +++
 .../connector/aws}/config/AWSConfigConstants.java  |  19 +-
 .../flink/connector/aws/util/AWSGeneralUtil.java   | 299 +++++++++
 .../src/main/resources/log4j2.properties           |  25 +
 .../connector/aws/util/AWSGeneralUtilTest.java     | 682 +++++++++++++++++++++
 .../apache/flink/connector/aws/util/TestUtil.java  |  54 ++
 .../src/test/resources/log4j2-test.properties      |  28 +
 .../src/test/resources/profile                     |   7 +
 flink-connectors/flink-connector-kinesis/pom.xml   |  23 +-
 .../kinesis/config/AWSConfigConstants.java         | 100 +--
 .../connectors/kinesis/util/KinesisConfigUtil.java |  49 +-
 .../src/main/resources/META-INF/NOTICE             |  57 +-
 .../kinesis/internals/ShardConsumerFanOutTest.java |   4 +-
 flink-connectors/pom.xml                           |   1 +
 pom.xml                                            |   1 +
 16 files changed, 1254 insertions(+), 179 deletions(-)

diff --git a/flink-architecture-tests/pom.xml b/flink-architecture-tests/pom.xml
index 5f9583b..b1b1321 100644
--- a/flink-architecture-tests/pom.xml
+++ b/flink-architecture-tests/pom.xml
@@ -132,6 +132,13 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-aws-base</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
diff --git a/flink-connectors/flink-connector-aws-base/pom.xml b/flink-connectors/flink-connector-aws-base/pom.xml
new file mode 100644
index 0000000..4cd574b
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-base/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-connectors</artifactId>
+		<version>1.15-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-aws-base</artifactId>
+	<name>Flink : Connectors : AWS Base</name>
+	<properties>
+		<aws.sdk.version>2.17.52</aws.sdk.version>
+	</properties>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>software.amazon.awssdk</groupId>
+			<artifactId>netty-nio-client</artifactId>
+			<version>${aws.sdk.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>software.amazon.awssdk</groupId>
+			<artifactId>sts</artifactId>
+			<version>${aws.sdk.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java
similarity index 89%
copy from flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
copy to flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java
index aa3298b..790c7e7 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
+++ b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java
@@ -15,20 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.kinesis.config;
+package org.apache.flink.connector.aws.config;
 
 import org.apache.flink.annotation.PublicEvolving;
 
-import com.amazonaws.auth.AWSCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 
 /** Configuration keys for AWS service usage. */
 @PublicEvolving
 public class AWSConfigConstants {
 
     /**
-     * Possible configuration values for the type of credential provider to use when accessing AWS
-     * Kinesis. Internally, a corresponding implementation of {@link AWSCredentialsProvider} will be
-     * used.
+     * Possible configuration values for the type of credential provider to use when accessing AWS.
+     * Internally, a corresponding implementation of {@link AwsCredentialsProvider} will be used.
      */
     public enum CredentialProvider {
 
@@ -72,7 +71,7 @@ public class AWSConfigConstants {
         AUTO,
     }
 
-    /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */
+    /** The AWS region of the service ("us-east-1" is used if not set). */
     public static final String AWS_REGION = "aws.region";
 
     /**
@@ -123,9 +122,15 @@ public class AWSConfigConstants {
     public static final String AWS_ROLE_CREDENTIALS_PROVIDER =
             roleCredentialsProvider(AWS_CREDENTIALS_PROVIDER);
 
-    /** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */
+    /** The AWS endpoint for the service (derived from the AWS region setting if not set). */
     public static final String AWS_ENDPOINT = "aws.endpoint";
 
+    /** Whether to trust all SSL certificates. */
+    public static final String TRUST_ALL_CERTIFICATES = "aws.trust.all.certificates";
+
+    /** The HTTP protocol version to use. */
+    public static final String HTTP_PROTOCOL_VERSION = "aws.http.protocol.version";
+
     public static String accessKeyId(String prefix) {
         return prefix + ".basic.accesskeyid";
     }
diff --git a/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
new file mode 100644
index 0000000..45154b6
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.Http2Configuration;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.profiles.ProfileFile;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.utils.AttributeMap;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Properties;
+
+/** Some general utilities specific to Amazon Web Service. */
+@Internal
+public class AWSGeneralUtil {
+    private static final Duration CONNECTION_ACQUISITION_TIMEOUT = Duration.ofSeconds(60);
+    private static final int INITIAL_WINDOW_SIZE_BYTES = 512 * 1024; // 512 KB
+    private static final Duration HEALTH_CHECK_PING_PERIOD = Duration.ofSeconds(60);
+
+    private static final int HTTP_CLIENT_MAX_CONCURRENCY = 10_000;
+    private static final Duration HTTP_CLIENT_READ_TIMEOUT = Duration.ofMinutes(6);
+    private static final Protocol HTTP_PROTOCOL = Protocol.HTTP2;
+    private static final boolean TRUST_ALL_CERTIFICATES = false;
+    private static final AttributeMap HTTP_CLIENT_DEFAULTS =
+            AttributeMap.builder()
+                    .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, HTTP_CLIENT_MAX_CONCURRENCY)
+                    .put(SdkHttpConfigurationOption.READ_TIMEOUT, HTTP_CLIENT_READ_TIMEOUT)
+                    .put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, TRUST_ALL_CERTIFICATES)
+                    .put(SdkHttpConfigurationOption.PROTOCOL, HTTP_PROTOCOL)
+                    .build();
+
+    /**
+     * Determines and returns the credential provider type from the given properties.
+     *
+     * @return the credential provider type
+     */
+    public static CredentialProvider getCredentialProviderType(
+            final Properties configProps, final String configPrefix) {
+        if (!configProps.containsKey(configPrefix)) {
+            if (configProps.containsKey(AWSConfigConstants.accessKeyId(configPrefix))
+                    && configProps.containsKey(AWSConfigConstants.secretKey(configPrefix))) {
+                // if the credential provider type is not specified, but the Access Key ID and
+                // Secret Key are given, it will default to BASIC
+                return CredentialProvider.BASIC;
+            } else {
+                // if the credential provider type is not specified, it will default to AUTO
+                return CredentialProvider.AUTO;
+            }
+        } else {
+            return CredentialProvider.valueOf(configProps.getProperty(configPrefix));
+        }
+    }
+
+    /**
+     * Return a {@link AwsCredentialsProvider} instance corresponding to the configuration
+     * properties.
+     *
+     * @param configProps the configuration properties
+     * @return The corresponding AWS Credentials Provider instance
+     */
+    public static AwsCredentialsProvider getCredentialsProvider(final Properties configProps) {
+        return getCredentialsProvider(configProps, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+    }
+
+    public static AwsCredentialsProvider getCredentialsProvider(
+            final Properties configProps, final String configPrefix) {
+        CredentialProvider credentialProviderType =
+                getCredentialProviderType(configProps, configPrefix);
+
+        switch (credentialProviderType) {
+            case ENV_VAR:
+                return EnvironmentVariableCredentialsProvider.create();
+
+            case SYS_PROP:
+                return SystemPropertyCredentialsProvider.create();
+
+            case PROFILE:
+                return getProfileCredentialProvider(configProps, configPrefix);
+
+            case BASIC:
+                return () ->
+                        AwsBasicCredentials.create(
+                                configProps.getProperty(
+                                        AWSConfigConstants.accessKeyId(configPrefix)),
+                                configProps.getProperty(
+                                        AWSConfigConstants.secretKey(configPrefix)));
+
+            case ASSUME_ROLE:
+                return getAssumeRoleCredentialProvider(configProps, configPrefix);
+
+            case WEB_IDENTITY_TOKEN:
+                return getWebIdentityTokenFileCredentialsProvider(
+                        WebIdentityTokenFileCredentialsProvider.builder(),
+                        configProps,
+                        configPrefix);
+
+            case AUTO:
+                return DefaultCredentialsProvider.create();
+
+            default:
+                throw new IllegalArgumentException(
+                        "Credential provider not supported: " + credentialProviderType);
+        }
+    }
+
+    public static AwsCredentialsProvider getProfileCredentialProvider(
+            final Properties configProps, final String configPrefix) {
+        String profileName =
+                configProps.getProperty(AWSConfigConstants.profileName(configPrefix), null);
+
+        ProfileCredentialsProvider.Builder profileBuilder =
+                ProfileCredentialsProvider.builder().profileName(profileName);
+
+        Optional.ofNullable(configProps.getProperty(AWSConfigConstants.profilePath(configPrefix)))
+                .map(Paths::get)
+                .ifPresent(
+                        path ->
+                                profileBuilder.profileFile(
+                                        ProfileFile.builder()
+                                                .type(ProfileFile.Type.CREDENTIALS)
+                                                .content(path)
+                                                .build()));
+
+        return profileBuilder.build();
+    }
+
+    private static AwsCredentialsProvider getAssumeRoleCredentialProvider(
+            final Properties configProps, final String configPrefix) {
+        return StsAssumeRoleCredentialsProvider.builder()
+                .refreshRequest(
+                        AssumeRoleRequest.builder()
+                                .roleArn(
+                                        configProps.getProperty(
+                                                AWSConfigConstants.roleArn(configPrefix)))
+                                .roleSessionName(
+                                        configProps.getProperty(
+                                                AWSConfigConstants.roleSessionName(configPrefix)))
+                                .externalId(
+                                        configProps.getProperty(
+                                                AWSConfigConstants.externalId(configPrefix)))
+                                .build())
+                .stsClient(
+                        StsClient.builder()
+                                .credentialsProvider(
+                                        getCredentialsProvider(
+                                                configProps,
+                                                AWSConfigConstants.roleCredentialsProvider(
+                                                        configPrefix)))
+                                .region(getRegion(configProps))
+                                .build())
+                .build();
+    }
+
+    @VisibleForTesting
+    static AwsCredentialsProvider getWebIdentityTokenFileCredentialsProvider(
+            final WebIdentityTokenFileCredentialsProvider.Builder webIdentityBuilder,
+            final Properties configProps,
+            final String configPrefix) {
+
+        Optional.ofNullable(configProps.getProperty(AWSConfigConstants.roleArn(configPrefix)))
+                .ifPresent(webIdentityBuilder::roleArn);
+
+        Optional.ofNullable(
+                        configProps.getProperty(AWSConfigConstants.roleSessionName(configPrefix)))
+                .ifPresent(webIdentityBuilder::roleSessionName);
+
+        Optional.ofNullable(
+                        configProps.getProperty(
+                                AWSConfigConstants.webIdentityTokenFile(configPrefix)))
+                .map(Paths::get)
+                .ifPresent(webIdentityBuilder::webIdentityTokenFile);
+
+        return webIdentityBuilder.build();
+    }
+
+    public static SdkAsyncHttpClient createAsyncHttpClient(
+            final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+        return createAsyncHttpClient(AttributeMap.empty(), httpClientBuilder);
+    }
+
+    public static SdkAsyncHttpClient createAsyncHttpClient(
+            final AttributeMap config, final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+        httpClientBuilder
+                .connectionAcquisitionTimeout(CONNECTION_ACQUISITION_TIMEOUT)
+                .http2Configuration(
+                        Http2Configuration.builder()
+                                .healthCheckPingPeriod(HEALTH_CHECK_PING_PERIOD)
+                                .initialWindowSize(INITIAL_WINDOW_SIZE_BYTES)
+                                .build());
+        return httpClientBuilder.buildWithDefaults(config.merge(HTTP_CLIENT_DEFAULTS));
+    }
+
+    /**
+     * Creates a {@link Region} object from the given Properties.
+     *
+     * @param configProps the properties containing the region
+     * @return the region specified by the properties
+     */
+    public static Region getRegion(final Properties configProps) {
+        return Region.of(configProps.getProperty(AWSConfigConstants.AWS_REGION));
+    }
+
+    /**
+     * Checks whether or not a region is valid.
+     *
+     * @param region The AWS region to check
+     * @return true if the supplied region is valid, false otherwise
+     */
+    public static boolean isValidRegion(Region region) {
+        return Region.regions().contains(region);
+    }
+
+    /**
+     * Validates configuration properties related to Amazon AWS service.
+     *
+     * @param config the properties to setup credentials and region
+     */
+    public static void validateAwsConfiguration(Properties config) {
+        if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
+            // value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be
+            // recognizable
+            try {
+                getCredentialsProvider(config);
+            } catch (IllegalArgumentException e) {
+                StringBuilder sb = new StringBuilder();
+                for (CredentialProvider type : CredentialProvider.values()) {
+                    sb.append(type.toString()).append(", ");
+                }
+                throw new IllegalArgumentException(
+                        "Invalid AWS Credential Provider Type set in config. Valid values are: "
+                                + sb.toString());
+            }
+
+            // if BASIC type is used, also check that the Access Key ID and Secret Key is supplied
+            CredentialProvider credentialsProviderType =
+                    getCredentialProviderType(config, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+            if (credentialsProviderType == CredentialProvider.BASIC) {
+                if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
+                        || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+                    throw new IllegalArgumentException(
+                            "Please set values for AWS Access Key ID ('"
+                                    + AWSConfigConstants.AWS_ACCESS_KEY_ID
+                                    + "') "
+                                    + "and Secret Key ('"
+                                    + AWSConfigConstants.AWS_SECRET_ACCESS_KEY
+                                    + "') when using the BASIC AWS credential provider type.");
+                }
+            }
+        }
+
+        if (config.containsKey(AWSConfigConstants.AWS_REGION)) {
+            // specified AWS Region name must be recognizable
+            if (!isValidRegion(getRegion(config))) {
+                StringBuilder sb = new StringBuilder();
+                for (Region region : Region.regions()) {
+                    sb.append(region).append(", ");
+                }
+                throw new IllegalArgumentException(
+                        "Invalid AWS region set in config. Valid values are: " + sb.toString());
+            }
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-base/src/main/resources/log4j2.properties b/flink-connectors/flink-connector-aws-base/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..c64a340
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-base/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = OFF
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSGeneralUtilTest.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSGeneralUtilTest.java
new file mode 100644
index 0000000..a7e2890
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSGeneralUtilTest.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.utils.AttributeMap;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.ASSUME_ROLE;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.AUTO;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.BASIC;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.WEB_IDENTITY_TOKEN;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.roleArn;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.roleSessionName;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+import static software.amazon.awssdk.http.Protocol.HTTP2;
+
+/** Tests for {@link AWSGeneralUtil}. */
+public class AWSGeneralUtilTest {
+    @Rule public ExpectedException exception = ExpectedException.none();
+
+    @Test
+    public void testGetCredentialsProviderTypeDefaultsAuto() {
+        assertEquals(
+                AUTO,
+                AWSGeneralUtil.getCredentialProviderType(
+                        new Properties(), AWS_CREDENTIALS_PROVIDER));
+    }
+
+    @Test
+    public void testGetCredentialsProviderTypeBasic() {
+        Properties testConfig =
+                TestUtil.properties(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+        testConfig.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+        assertEquals(
+                BASIC,
+                AWSGeneralUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER));
+    }
+
+    @Test
+    public void testGetCredentialsProviderTypeWebIdentityToken() {
+        Properties testConfig = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+        CredentialProvider type =
+                AWSGeneralUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER);
+        assertEquals(WEB_IDENTITY_TOKEN, type);
+    }
+
+    @Test
+    public void testGetCredentialsProviderTypeAssumeRole() {
+        Properties testConfig = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");
+
+        CredentialProvider type =
+                AWSGeneralUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER);
+        assertEquals(ASSUME_ROLE, type);
+    }
+
+    @Test
+    public void testGetCredentialsProviderEnvironmentVariables() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertTrue(credentialsProvider instanceof EnvironmentVariableCredentialsProvider);
+    }
+
+    @Test
+    public void testGetCredentialsProviderSystemProperties() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertTrue(credentialsProvider instanceof SystemPropertyCredentialsProvider);
+    }
+
+    @Test
+    public void testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertTrue(credentialsProvider instanceof WebIdentityTokenFileCredentialsProvider);
+    }
+
+    @Test
+    public void testGetWebIdentityTokenFileCredentialsProvider() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+        properties.setProperty(roleArn(AWS_CREDENTIALS_PROVIDER), "roleArn");
+        properties.setProperty(roleSessionName(AWS_CREDENTIALS_PROVIDER), "roleSessionName");
+
+        WebIdentityTokenFileCredentialsProvider.Builder builder =
+                mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+        AWSGeneralUtil.getWebIdentityTokenFileCredentialsProvider(
+                builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+        verify(builder).roleArn("roleArn");
+        verify(builder).roleSessionName("roleSessionName");
+        verify(builder, never()).webIdentityTokenFile(any());
+    }
+
+    @Test
+    public void testGetWebIdentityTokenFileCredentialsProviderWithWebIdentityFile() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+        properties.setProperty(
+                webIdentityTokenFile(AWS_CREDENTIALS_PROVIDER), "webIdentityTokenFile");
+
+        WebIdentityTokenFileCredentialsProvider.Builder builder =
+                mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+        AWSGeneralUtil.getWebIdentityTokenFileCredentialsProvider(
+                builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+        verify(builder).webIdentityTokenFile(Paths.get("webIdentityTokenFile"));
+    }
+
+    @Test
+    public void testGetCredentialsProviderAuto() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "AUTO");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertTrue(credentialsProvider instanceof DefaultCredentialsProvider);
+    }
+
+    @Test
+    public void testGetCredentialsProviderAssumeRole() {
+        Properties properties = spy(TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE"));
+        properties.setProperty(AWS_REGION, "eu-west-2");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertTrue(credentialsProvider instanceof StsAssumeRoleCredentialsProvider);
+
+        verify(properties).getProperty(AWSConfigConstants.roleArn(AWS_CREDENTIALS_PROVIDER));
+        verify(properties)
+                .getProperty(AWSConfigConstants.roleSessionName(AWS_CREDENTIALS_PROVIDER));
+        verify(properties).getProperty(AWSConfigConstants.externalId(AWS_CREDENTIALS_PROVIDER));
+        verify(properties).getProperty(AWS_REGION);
+    }
+
+    @Test
+    public void testGetCredentialsProviderBasic() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "BASIC");
+        properties.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+        properties.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+        AwsCredentials credentials =
+                AWSGeneralUtil.getCredentialsProvider(properties).resolveCredentials();
+
+        assertEquals("ak", credentials.accessKeyId());
+        assertEquals("sk", credentials.secretAccessKey());
+    }
+
+    @Test
+    public void testGetCredentialsProviderProfile() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+        properties.put(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "default");
+        properties.put(
+                AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER),
+                "src/test/resources/profile");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertTrue(credentialsProvider instanceof ProfileCredentialsProvider);
+
+        AwsCredentials credentials = credentialsProvider.resolveCredentials();
+        assertEquals("11111111111111111111", credentials.accessKeyId());
+        assertEquals("wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111", credentials.secretAccessKey());
+    }
+
+    @Test
+    public void testGetCredentialsProviderNamedProfile() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+        properties.setProperty(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "foo");
+        properties.setProperty(
+                AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER),
+                "src/test/resources/profile");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertTrue(credentialsProvider instanceof ProfileCredentialsProvider);
+
+        AwsCredentials credentials = credentialsProvider.resolveCredentials();
+        assertEquals("22222222222222222222", credentials.accessKeyId());
+        assertEquals("wJalrXUtnFEMI/K7MDENG/bPxRfiCY2222222222", credentials.secretAccessKey());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsConnectionAcquireTimeout()
+            throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(60_000, nettyConfiguration.connectionAcquireTimeoutMillis());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsConnectionTtl() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertEquals(
+                nettyDefaultConfiguration.connectionTtlMillis(),
+                nettyConfiguration.connectionTtlMillis());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsConnectionTimeout() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertEquals(
+                nettyDefaultConfiguration.connectTimeoutMillis(),
+                nettyConfiguration.connectTimeoutMillis());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsIdleTimeout() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertEquals(
+                nettyDefaultConfiguration.idleTimeoutMillis(),
+                nettyConfiguration.idleTimeoutMillis());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsMaxConnections() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(10_000, nettyConfiguration.maxConnections());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsMaxPendingConnectionAcquires()
+            throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertEquals(
+                nettyDefaultConfiguration.maxPendingConnectionAcquires(),
+                nettyConfiguration.maxPendingConnectionAcquires());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsReadTimeout() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(360_000, nettyConfiguration.readTimeoutMillis());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsReapIdleConnections() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertEquals(
+                nettyDefaultConfiguration.reapIdleConnections(),
+                nettyConfiguration.reapIdleConnections());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsTcpKeepAlive() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertEquals(nettyDefaultConfiguration.tcpKeepAlive(), nettyConfiguration.tcpKeepAlive());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsTlsKeyManagersProvider()
+            throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertEquals(
+                nettyDefaultConfiguration.tlsKeyManagersProvider(),
+                nettyConfiguration.tlsKeyManagersProvider());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsTlsTrustManagersProvider()
+            throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertEquals(
+                nettyDefaultConfiguration.tlsTrustManagersProvider(),
+                nettyConfiguration.tlsTrustManagersProvider());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsTrustAllCertificates() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(false, nettyConfiguration.trustAllCertificates());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsWriteTimeout() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertEquals(
+                nettyDefaultConfiguration.writeTimeoutMillis(),
+                nettyConfiguration.writeTimeoutMillis());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWithDefaultsProtocol() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(HTTP2, nettyConfiguration.attribute(SdkHttpConfigurationOption.PROTOCOL));
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientReadTimeout() throws Exception {
+        Duration readTimeout = Duration.ofMillis(1234);
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.READ_TIMEOUT, readTimeout)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(readTimeout.toMillis(), nettyConfiguration.readTimeoutMillis());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientTcpKeepAlive() throws Exception {
+        boolean tcpKeepAlive = true;
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.TCP_KEEPALIVE, tcpKeepAlive)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(tcpKeepAlive, nettyConfiguration.tcpKeepAlive());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientConnectionTimeout() throws Exception {
+        Duration connectionTimeout = Duration.ofMillis(1000);
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, connectionTimeout)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(connectionTimeout.toMillis(), nettyConfiguration.connectTimeoutMillis());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientMaxConcurrency() throws Exception {
+        int maxConnections = 123;
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, maxConnections)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(maxConnections, nettyConfiguration.maxConnections());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientWriteTimeout() throws Exception {
+        Duration writeTimeout = Duration.ofMillis(3000);
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.WRITE_TIMEOUT, writeTimeout)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(writeTimeout.toMillis(), nettyConfiguration.writeTimeoutMillis());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientConnectionMaxIdleTime() throws Exception {
+        Duration maxIdleTime = Duration.ofMillis(2000);
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, maxIdleTime)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(maxIdleTime.toMillis(), nettyConfiguration.idleTimeoutMillis());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientIdleConnectionReaper() throws Exception {
+        boolean reapIdleConnections = false;
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.REAP_IDLE_CONNECTIONS, reapIdleConnections)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(reapIdleConnections, nettyConfiguration.reapIdleConnections());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientIdleConnectionTtl() throws Exception {
+        Duration connectionTtl = Duration.ofMillis(5000);
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE, connectionTtl)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(connectionTtl.toMillis(), nettyConfiguration.connectionTtlMillis());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientTrustAllCertificates() throws Exception {
+        boolean trustAllCertificates = true;
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(
+                                SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
+                                trustAllCertificates)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(trustAllCertificates, nettyConfiguration.trustAllCertificates());
+    }
+
+    @Test
+    public void testCreateNettyAsyncHttpClientHttpVersion() throws Exception {
+        Protocol httpVersion = HTTP1_1;
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.PROTOCOL, httpVersion)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertEquals(
+                httpVersion, nettyConfiguration.attribute(SdkHttpConfigurationOption.PROTOCOL));
+    }
+
+    @Test
+    public void testGetRegion() {
+        Region region = AWSGeneralUtil.getRegion(TestUtil.properties(AWS_REGION, "eu-west-2"));
+
+        assertEquals(Region.EU_WEST_2, region);
+    }
+
+    @Test
+    public void testValidRegion() {
+        assertTrue(AWSGeneralUtil.isValidRegion(Region.of("us-east-1")));
+    }
+
+    @Test
+    public void testInvalidRegion() {
+        assertFalse(AWSGeneralUtil.isValidRegion(Region.of("ur-east-1")));
+    }
+
+    @Test
+    public void testUnrecognizableAwsRegionInConfig() {
+        exception.expect(IllegalArgumentException.class);
+        exception.expectMessage("Invalid AWS region");
+
+        Properties testConfig = TestUtil.properties(AWSConfigConstants.AWS_REGION, "wrongRegionId");
+        testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+        testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+        AWSGeneralUtil.validateAwsConfiguration(testConfig);
+    }
+
+    @Test
+    public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
+        exception.expect(IllegalArgumentException.class);
+        exception.expectMessage(
+                "Please set values for AWS Access Key ID ('"
+                        + AWSConfigConstants.AWS_ACCESS_KEY_ID
+                        + "') "
+                        + "and Secret Key ('"
+                        + AWSConfigConstants.AWS_SECRET_ACCESS_KEY
+                        + "') when using the BASIC AWS credential provider type.");
+
+        Properties testConfig = TestUtil.properties(AWSConfigConstants.AWS_REGION, "us-east-1");
+        testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+
+        AWSGeneralUtil.validateAwsConfiguration(testConfig);
+    }
+
+    @Test
+    public void testUnrecognizableCredentialProviderTypeInConfig() {
+        exception.expect(IllegalArgumentException.class);
+        exception.expectMessage("Invalid AWS Credential Provider Type");
+
+        Properties testConfig = TestUtil.getStandardProperties();
+        testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
+
+        AWSGeneralUtil.validateAwsConfiguration(testConfig);
+    }
+
+    private WebIdentityTokenFileCredentialsProvider.Builder
+            mockWebIdentityTokenFileCredentialsProviderBuilder() {
+        WebIdentityTokenFileCredentialsProvider.Builder builder =
+                mock(WebIdentityTokenFileCredentialsProvider.Builder.class);
+        when(builder.roleArn(any())).thenReturn(builder);
+        when(builder.roleSessionName(any())).thenReturn(builder);
+        when(builder.webIdentityTokenFile(any())).thenReturn(builder);
+
+        return builder;
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/TestUtil.java b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/TestUtil.java
new file mode 100644
index 0000000..cc52d17
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/TestUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+
+/** Utilities for tests in the package. */
+public class TestUtil {
+    public static Properties properties(final String key, final String value) {
+        Properties properties = new Properties();
+        properties.setProperty(key, value);
+        return properties;
+    }
+
+    public static Properties getStandardProperties() {
+        Properties config = properties(AWSConfigConstants.AWS_REGION, "us-east-1");
+        config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+        config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+        return config;
+    }
+
+    public static NettyConfiguration getNettyConfiguration(final SdkAsyncHttpClient httpClient)
+            throws Exception {
+        return getField("configuration", httpClient);
+    }
+
+    public static <T> T getField(String fieldName, Object obj) throws Exception {
+        Field field = obj.getClass().getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return (T) field.get(obj);
+    }
+}
diff --git a/flink-connectors/flink-connector-aws-base/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-aws-base/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..c4fa187
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-base/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connectors/flink-connector-aws-base/src/test/resources/profile b/flink-connectors/flink-connector-aws-base/src/test/resources/profile
new file mode 100644
index 0000000..2573fd6
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-base/src/test/resources/profile
@@ -0,0 +1,7 @@
+[default]
+aws_access_key_id=11111111111111111111
+aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111
+
+[foo]
+aws_access_key_id=22222222222222222222
+aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCY2222222222
diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml
index c5b1b83..0ac7b14 100644
--- a/flink-connectors/flink-connector-kinesis/pom.xml
+++ b/flink-connectors/flink-connector-kinesis/pom.xml
@@ -34,7 +34,7 @@ under the License.
 	<name>Flink : Connectors : Kinesis</name>
 	<properties>
 		<aws.sdk.version>1.12.7</aws.sdk.version>
-		<aws.sdkv2.version>2.16.86</aws.sdkv2.version>
+		<aws.sdkv2.version>2.17.52</aws.sdkv2.version>
 		<aws.kinesis-kcl.version>1.14.1</aws.kinesis-kcl.version>
 		<aws.kinesis-kpl.version>0.14.1</aws.kinesis-kpl.version>
 		<aws.dynamodbstreams-kinesis-adapter.version>1.5.3</aws.dynamodbstreams-kinesis-adapter.version>
@@ -140,6 +140,12 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-aws-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
 		<!-- Flink Table API ecosystem -->
 		<!-- Projects depending on this project won't depend on flink-table-*. -->
 		<dependency>
@@ -193,6 +199,14 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-aws-base</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
 			<groupId>org.testcontainers</groupId>
 			<artifactId>testcontainers</artifactId>
 			<scope>test</scope>
@@ -288,6 +302,7 @@ under the License.
 							<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
 							<artifactSet combine.children="append">
 								<includes>
+									<include>org.apache.flink:flink-connector-aws-base:*</include>
 									<include>com.amazonaws:*</include>
 									<include>com.google.protobuf:*</include>
 									<include>org.apache.httpcomponents:*</include>
@@ -350,6 +365,12 @@ under the License.
 										<exclude>META-INF/services/**</exclude>
 									</excludes>
 								</filter>
+								<filter>
+									<artifact>org.apache.flink:flink-connector-aws-base:*</artifact>
+									<excludes>
+										<exclude>profile</exclude>
+									</excludes>
+								</filter>
 							</filters>
 						</configuration>
 					</execution>
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
index aa3298b..1ab8e71 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java
@@ -19,16 +19,14 @@ package org.apache.flink.streaming.connectors.kinesis.config;
 
 import org.apache.flink.annotation.PublicEvolving;
 
-import com.amazonaws.auth.AWSCredentialsProvider;
-
 /** Configuration keys for AWS service usage. */
 @PublicEvolving
-public class AWSConfigConstants {
-
+public class AWSConfigConstants extends org.apache.flink.connector.aws.config.AWSConfigConstants {
     /**
      * Possible configuration values for the type of credential provider to use when accessing AWS
-     * Kinesis. Internally, a corresponding implementation of {@link AWSCredentialsProvider} will be
-     * used.
+     * Kinesis. Internally, a corresponding implementation of {@link AwsCredentialsProvider} will be
+     * used. Copy of org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider
+     * left for backwards compatibility
      */
     public enum CredentialProvider {
 
@@ -71,94 +69,4 @@ public class AWSConfigConstants {
          */
         AUTO,
     }
-
-    /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set). */
-    public static final String AWS_REGION = "aws.region";
-
-    /**
-     * The credential provider type to use when AWS credentials are required (BASIC is used if not
-     * set).
-     */
-    public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
-
-    /** The AWS access key ID to use when setting credentials provider type to BASIC. */
-    public static final String AWS_ACCESS_KEY_ID = accessKeyId(AWS_CREDENTIALS_PROVIDER);
-
-    /** The AWS secret key to use when setting credentials provider type to BASIC. */
-    public static final String AWS_SECRET_ACCESS_KEY = secretKey(AWS_CREDENTIALS_PROVIDER);
-
-    /** Optional configuration for profile path if credential provider type is set to be PROFILE. */
-    public static final String AWS_PROFILE_PATH = profilePath(AWS_CREDENTIALS_PROVIDER);
-
-    /** Optional configuration for profile name if credential provider type is set to be PROFILE. */
-    public static final String AWS_PROFILE_NAME = profileName(AWS_CREDENTIALS_PROVIDER);
-
-    /**
-     * The role ARN to use when credential provider type is set to ASSUME_ROLE or
-     * WEB_IDENTITY_TOKEN.
-     */
-    public static final String AWS_ROLE_ARN = roleArn(AWS_CREDENTIALS_PROVIDER);
-
-    /**
-     * The role session name to use when credential provider type is set to ASSUME_ROLE or
-     * WEB_IDENTITY_TOKEN.
-     */
-    public static final String AWS_ROLE_SESSION_NAME = roleSessionName(AWS_CREDENTIALS_PROVIDER);
-
-    /** The external ID to use when credential provider type is set to ASSUME_ROLE. */
-    public static final String AWS_ROLE_EXTERNAL_ID = externalId(AWS_CREDENTIALS_PROVIDER);
-
-    /**
-     * The absolute path to the web identity token file that should be used if provider type is set
-     * to WEB_IDENTITY_TOKEN.
-     */
-    public static final String AWS_WEB_IDENTITY_TOKEN_FILE =
-            webIdentityTokenFile(AWS_CREDENTIALS_PROVIDER);
-
-    /**
-     * The credentials provider that provides credentials for assuming the role when credential
-     * provider type is set to ASSUME_ROLE. Roles can be nested, so AWS_ROLE_CREDENTIALS_PROVIDER
-     * can again be set to "ASSUME_ROLE"
-     */
-    public static final String AWS_ROLE_CREDENTIALS_PROVIDER =
-            roleCredentialsProvider(AWS_CREDENTIALS_PROVIDER);
-
-    /** The AWS endpoint for Kinesis (derived from the AWS region setting if not set). */
-    public static final String AWS_ENDPOINT = "aws.endpoint";
-
-    public static String accessKeyId(String prefix) {
-        return prefix + ".basic.accesskeyid";
-    }
-
-    public static String secretKey(String prefix) {
-        return prefix + ".basic.secretkey";
-    }
-
-    public static String profilePath(String prefix) {
-        return prefix + ".profile.path";
-    }
-
-    public static String profileName(String prefix) {
-        return prefix + ".profile.name";
-    }
-
-    public static String roleArn(String prefix) {
-        return prefix + ".role.arn";
-    }
-
-    public static String roleSessionName(String prefix) {
-        return prefix + ".role.sessionName";
-    }
-
-    public static String externalId(String prefix) {
-        return prefix + ".role.externalId";
-    }
-
-    public static String roleCredentialsProvider(String prefix) {
-        return prefix + ".role.provider";
-    }
-
-    public static String webIdentityTokenFile(String prefix) {
-        return prefix + ".webIdentityToken.file";
-    }
 }
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index c8e54c0..83cfccc 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -18,17 +18,16 @@
 package org.apache.flink.streaming.connectors.kinesis.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
 import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
 import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.EFORegistrationType;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
 import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.RecordPublisherType;
 import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants;
 
-import com.amazonaws.regions.Regions;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 
 import java.text.ParseException;
@@ -513,51 +512,7 @@ public class KinesisConfigUtil {
 
     /** Validate configuration properties related to Amazon AWS service. */
     public static void validateAwsConfiguration(Properties config) {
-        if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
-            String credentialsProviderType =
-                    config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
-
-            // value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be
-            // recognizable
-            CredentialProvider providerType;
-            try {
-                providerType = CredentialProvider.valueOf(credentialsProviderType);
-            } catch (IllegalArgumentException e) {
-                StringBuilder sb = new StringBuilder();
-                for (CredentialProvider type : CredentialProvider.values()) {
-                    sb.append(type.toString()).append(", ");
-                }
-                throw new IllegalArgumentException(
-                        "Invalid AWS Credential Provider Type set in config. Valid values are: "
-                                + sb.toString());
-            }
-
-            // if BASIC type is used, also check that the Access Key ID and Secret Key is supplied
-            if (providerType == CredentialProvider.BASIC) {
-                if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
-                        || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
-                    throw new IllegalArgumentException(
-                            "Please set values for AWS Access Key ID ('"
-                                    + AWSConfigConstants.AWS_ACCESS_KEY_ID
-                                    + "') "
-                                    + "and Secret Key ('"
-                                    + AWSConfigConstants.AWS_SECRET_ACCESS_KEY
-                                    + "') when using the BASIC AWS credential provider type.");
-                }
-            }
-        }
-
-        if (config.containsKey(AWSConfigConstants.AWS_REGION)) {
-            // specified AWS Region name must be recognizable
-            if (!AWSUtil.isValidRegion(config.getProperty(AWSConfigConstants.AWS_REGION))) {
-                StringBuilder sb = new StringBuilder();
-                for (Regions region : Regions.values()) {
-                    sb.append(region.getName()).append(", ");
-                }
-                throw new IllegalArgumentException(
-                        "Invalid AWS region set in config. Valid values are: " + sb.toString());
-            }
-        }
+        AWSGeneralUtil.validateAwsConfiguration(config);
     }
 
     /**
diff --git a/flink-connectors/flink-connector-kinesis/src/main/resources/META-INF/NOTICE b/flink-connectors/flink-connector-kinesis/src/main/resources/META-INF/NOTICE
index 1a652f4..1b6b7ea 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/resources/META-INF/NOTICE
+++ b/flink-connectors/flink-connector-kinesis/src/main/resources/META-INF/NOTICE
@@ -20,34 +20,37 @@ This project bundles the following dependencies under the Apache Software Licens
 - org.apache.httpcomponents:httpclient:4.5.13
 - org.apache.httpcomponents:httpcore:4.4.14
 - software.amazon.ion:ion-java:1.0.2
-- software.amazon.awssdk:kinesis:2.16.86
-- software.amazon.awssdk:aws-cbor-protocol:2.16.86
-- software.amazon.awssdk:aws-json-protocol:2.16.86
-- software.amazon.awssdk:protocol-core:2.16.86
-- software.amazon.awssdk:profiles:2.16.86
-- software.amazon.awssdk:sdk-core:2.16.86
-- software.amazon.awssdk:auth:2.16.86
+- software.amazon.awssdk:kinesis:2.17.52
+- software.amazon.awssdk:aws-cbor-protocol:2.17.52
+- software.amazon.awssdk:aws-json-protocol:2.17.52
+- software.amazon.awssdk:protocol-core:2.17.52
+- software.amazon.awssdk:profiles:2.17.52
+- software.amazon.awssdk:sdk-core:2.17.52
+- software.amazon.awssdk:auth:2.17.52
 - software.amazon.eventstream:eventstream:1.0.1
-- software.amazon.awssdk:http-client-spi:2.16.86
-- software.amazon.awssdk:regions:2.16.86
-- software.amazon.awssdk:annotations:2.16.86
-- software.amazon.awssdk:utils:2.16.86
-- software.amazon.awssdk:aws-core:2.16.86
-- software.amazon.awssdk:metrics-spi:2.16.86
-- software.amazon.awssdk:apache-client:2.16.86
-- software.amazon.awssdk:netty-nio-client:2.16.86
-- software.amazon.awssdk:sts:2.16.86
-- software.amazon.awssdk:aws-query-protocol:2.16.86
-- io.netty:netty-codec-http:4.1.63.Final
-- io.netty:netty-codec-http2:4.1.63.Final
-- io.netty:netty-codec:4.1.63.Final
-- io.netty:netty-transport:4.1.63.Final
-- io.netty:netty-resolver:4.1.63.Final
-- io.netty:netty-common:4.1.63.Final
-- io.netty:netty-buffer:4.1.63.Final
-- io.netty:netty-handler:4.1.63.Final
-- io.netty:netty-transport-native-epoll:linux-x86_64:4.1.63.Final
-- io.netty:netty-transport-native-unix-common:4.1.63.Final
+- software.amazon.awssdk:http-client-spi:2.17.52
+- software.amazon.awssdk:regions:2.17.52
+- software.amazon.awssdk:annotations:2.17.52
+- software.amazon.awssdk:utils:2.17.52
+- software.amazon.awssdk:aws-core:2.17.52
+- software.amazon.awssdk:metrics-spi:2.17.52
+- software.amazon.awssdk:apache-client:2.17.52
+- software.amazon.awssdk:netty-nio-client:2.17.52
+- software.amazon.awssdk:sts:2.17.52
+- software.amazon.awssdk:aws-query-protocol:2.17.52
+- software.amazon.awssdk:json-utils:2.17.52
+- software.amazon.awssdk:third-party-jackson-core:2.17.52
+- software.amazon.awssdk:third-party-jackson-dataformat-cbor:2.17.52
+- io.netty:netty-codec-http:4.1.68.Final
+- io.netty:netty-codec-http2:4.1.68.Final
+- io.netty:netty-codec:4.1.68.Final
+- io.netty:netty-transport:4.1.68.Final
+- io.netty:netty-resolver:4.1.68.Final
+- io.netty:netty-common:4.1.68.Final
+- io.netty:netty-buffer:4.1.68.Final
+- io.netty:netty-handler:4.1.68.Final
+- io.netty:netty-transport-native-epoll:linux-x86_64:4.1.68.Final
+- io.netty:netty-transport-native-unix-common:4.1.68.Final
 - com.typesafe.netty:netty-reactive-streams-http:2.0.5
 - com.typesafe.netty:netty-reactive-streams:2.0.5
 
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java
index 04a6692..2d53917 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerFanOutTest.java
@@ -71,7 +71,9 @@ public class ShardConsumerFanOutTest {
 
         StartingPosition actual = kinesis.getStartingPositionForSubscription(0);
         assertEquals(AT_TIMESTAMP, actual.type());
-        assertTrue(now.equals(actual.timestamp()) || now.isBefore(actual.timestamp()));
+
+        // Considering milliseconds to make now have the same precision as actual
+        assertTrue(now.toEpochMilli() <= actual.timestamp().toEpochMilli());
     }
 
     @Test
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index f5151c1..7107477 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -52,6 +52,7 @@ under the License.
 		<module>flink-connector-cassandra</module>
 		<module>flink-connector-kafka</module>
 		<module>flink-connector-gcp-pubsub</module>
+		<module>flink-connector-aws-base</module>
 		<module>flink-connector-kinesis</module>
 		<module>flink-connector-base</module>
 		<module>flink-file-sink-common</module>
diff --git a/pom.xml b/pom.xml
index 145c1dc..25c9d67 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1445,6 +1445,7 @@ under the License.
 						<exclude>flink-connectors/flink-connector-hive/src/test/resources/**</exclude>
 						<exclude>flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/answer_set/*</exclude>
 						<exclude>flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/query/*</exclude>
+						<exclude>flink-connectors/flink-connector-aws-base/src/test/resources/profile</exclude>
 						<exclude>flink-connectors/flink-connector-kinesis/src/test/resources/profile</exclude>
 						<exclude>flink-table/flink-table-code-splitter/src/test/resources/**</exclude>
 						<exclude>flink-connectors/flink-connector-pulsar/src/test/resources/**</exclude>