You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/12/03 18:59:54 UTC

[flink-connector-aws] 02/08: [FLINK-29907][Connectors/AWS] Externalize AWS Base from Flink repo

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git

commit 89d4a559c3219b3c1b4e7d6bc2847c4f34c8105e
Author: Danny Cranmer <da...@apache.org>
AuthorDate: Fri Dec 2 09:25:34 2022 +0000

    [FLINK-29907][Connectors/AWS] Externalize AWS Base from Flink repo
---
 .gitignore                                         |   3 +-
 .../33252236-fc9f-4f63-b537-39e2322f7ccd           |   0
 .../733a854b-2487-43da-a5fa-9b089af5fb4e           |   0
 .../archunit-violations/stored.rules               |   4 +
 flink-connector-aws-base/pom.xml                   | 110 +++
 .../connector/aws/config/AWSConfigConstants.java   | 176 +++++
 .../connector/aws/table/util/AWSOptionUtils.java   |  87 +++
 .../aws/table/util/AsyncClientOptionsUtils.java    | 109 +++
 .../flink/connector/aws/util/AWSAsyncSinkUtil.java | 164 +++++
 .../aws/util/AWSAuthenticationException.java       |  38 +
 .../AWSCredentialFatalExceptionClassifiers.java    |  43 ++
 .../flink/connector/aws/util/AWSGeneralUtil.java   | 396 +++++++++++
 .../src/main/resources/log4j2.properties           |  25 +
 .../architecture/TestCodeArchitectureTest.java     |  40 ++
 .../aws/table/util/AWSOptionsUtilTest.java         | 137 ++++
 .../table/util/AsyncClientOptionsUtilsTest.java    | 146 ++++
 .../aws/testutils/AWSServicesTestUtils.java        | 146 ++++
 .../aws/testutils/LocalstackContainer.java         |  85 +++
 .../connector/aws/util/AWSAsyncSinkUtilTest.java   | 253 +++++++
 .../connector/aws/util/AWSGeneralUtilTest.java     | 792 +++++++++++++++++++++
 .../apache/flink/connector/aws/util/TestUtil.java  |  54 ++
 .../org.junit.jupiter.api.extension.Extension      |  16 +
 .../src/test/resources/archunit.properties         |  31 +
 .../src/test/resources/log4j2-test.properties      |  28 +
 .../src/test/resources/profile                     |   7 +
 pom.xml                                            |  40 +-
 26 files changed, 2920 insertions(+), 10 deletions(-)

diff --git a/.gitignore b/.gitignore
index 5f0068c..973e8d5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -35,4 +35,5 @@ out/
 tools/flink
 tools/flink-*
 tools/releasing/release
-tools/japicmp-output
\ No newline at end of file
+tools/japicmp-output
+*/.idea/
\ No newline at end of file
diff --git a/flink-connector-aws-base/archunit-violations/33252236-fc9f-4f63-b537-39e2322f7ccd b/flink-connector-aws-base/archunit-violations/33252236-fc9f-4f63-b537-39e2322f7ccd
new file mode 100644
index 0000000..e69de29
diff --git a/flink-connector-aws-base/archunit-violations/733a854b-2487-43da-a5fa-9b089af5fb4e b/flink-connector-aws-base/archunit-violations/733a854b-2487-43da-a5fa-9b089af5fb4e
new file mode 100644
index 0000000..e69de29
diff --git a/flink-connector-aws-base/archunit-violations/stored.rules b/flink-connector-aws-base/archunit-violations/stored.rules
new file mode 100644
index 0000000..ef2e628
--- /dev/null
+++ b/flink-connector-aws-base/archunit-violations/stored.rules
@@ -0,0 +1,4 @@
+#
+#Tue Feb 22 12:16:40 CET 2022
+Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=733a854b-2487-43da-a5fa-9b089af5fb4e
+ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=33252236-fc9f-4f63-b537-39e2322f7ccd
diff --git a/flink-connector-aws-base/pom.xml b/flink-connector-aws-base/pom.xml
new file mode 100644
index 0000000..0249d61
--- /dev/null
+++ b/flink-connector-aws-base/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-aws-parent</artifactId>
+        <version>4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-connector-aws-base</artifactId>
+    <name>Flink : Connectors : AWS : Base</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>netty-nio-client</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>sts</artifactId>
+        </dependency>
+
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>s3</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>iam</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- ArchUit test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-architecture-tests-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                        <configuration>
+                            <excludes>
+                                <!-- test-jar is still used by JUnit4 modules -->
+                                <exclude>META-INF/services/org.junit.jupiter.api.extension.Extension</exclude>
+                            </excludes>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java
new file mode 100644
index 0000000..b244b2c
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+
+/** Configuration keys for AWS service usage. */
+@PublicEvolving
+public class AWSConfigConstants {
+
+    /**
+     * Possible configuration values for the type of credential provider to use when accessing AWS.
+     * Internally, a corresponding implementation of {@link AwsCredentialsProvider} will be used.
+     */
+    public enum CredentialProvider {
+
+        /**
+         * Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create
+         * AWS credentials.
+         */
+        ENV_VAR,
+
+        /**
+         * Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS
+         * credentials.
+         */
+        SYS_PROP,
+
+        /** Use a AWS credentials profile file to create the AWS credentials. */
+        PROFILE,
+
+        /**
+         * Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in
+         * the configuration properties.
+         */
+        BASIC,
+
+        /**
+         * Create AWS credentials by assuming a role. The credentials for assuming the role must be
+         * supplied. *
+         */
+        ASSUME_ROLE,
+
+        /**
+         * Use AWS WebIdentityToken in order to assume a role. A token file and role details can be
+         * supplied as configuration or environment variables. *
+         */
+        WEB_IDENTITY_TOKEN,
+
+        /**
+         * A credentials provider chain will be used that searches for credentials in this order:
+         * ENV_VARS, SYS_PROPS, WEB_IDENTITY_TOKEN, PROFILE in the AWS instance metadata. *
+         */
+        AUTO,
+    }
+
+    /** The AWS region of the service ("us-east-1" is used if not set). */
+    public static final String AWS_REGION = "aws.region";
+
+    /**
+     * The credential provider type to use when AWS credentials are required (BASIC is used if not
+     * set).
+     */
+    public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+
+    /** The AWS access key ID to use when setting credentials provider type to BASIC. */
+    public static final String AWS_ACCESS_KEY_ID = accessKeyId(AWS_CREDENTIALS_PROVIDER);
+
+    /** The AWS secret key to use when setting credentials provider type to BASIC. */
+    public static final String AWS_SECRET_ACCESS_KEY = secretKey(AWS_CREDENTIALS_PROVIDER);
+
+    /** Optional configuration for profile path if credential provider type is set to be PROFILE. */
+    public static final String AWS_PROFILE_PATH = profilePath(AWS_CREDENTIALS_PROVIDER);
+
+    /** Optional configuration for profile name if credential provider type is set to be PROFILE. */
+    public static final String AWS_PROFILE_NAME = profileName(AWS_CREDENTIALS_PROVIDER);
+
+    /**
+     * The role ARN to use when credential provider type is set to ASSUME_ROLE or
+     * WEB_IDENTITY_TOKEN.
+     */
+    public static final String AWS_ROLE_ARN = roleArn(AWS_CREDENTIALS_PROVIDER);
+
+    /**
+     * The role session name to use when credential provider type is set to ASSUME_ROLE or
+     * WEB_IDENTITY_TOKEN.
+     */
+    public static final String AWS_ROLE_SESSION_NAME = roleSessionName(AWS_CREDENTIALS_PROVIDER);
+
+    /** The external ID to use when credential provider type is set to ASSUME_ROLE. */
+    public static final String AWS_ROLE_EXTERNAL_ID = externalId(AWS_CREDENTIALS_PROVIDER);
+
+    /**
+     * The absolute path to the web identity token file that should be used if provider type is set
+     * to WEB_IDENTITY_TOKEN.
+     */
+    public static final String AWS_WEB_IDENTITY_TOKEN_FILE =
+            webIdentityTokenFile(AWS_CREDENTIALS_PROVIDER);
+
+    /**
+     * The credentials provider that provides credentials for assuming the role when credential
+     * provider type is set to ASSUME_ROLE. Roles can be nested, so AWS_ROLE_CREDENTIALS_PROVIDER
+     * can again be set to "ASSUME_ROLE"
+     */
+    public static final String AWS_ROLE_CREDENTIALS_PROVIDER =
+            roleCredentialsProvider(AWS_CREDENTIALS_PROVIDER);
+
+    /** The AWS endpoint for the service (derived from the AWS region setting if not set). */
+    public static final String AWS_ENDPOINT = "aws.endpoint";
+
+    /** Whether to trust all SSL certificates. */
+    public static final String TRUST_ALL_CERTIFICATES = "aws.trust.all.certificates";
+
+    /** The HTTP protocol version to use. */
+    public static final String HTTP_PROTOCOL_VERSION = "aws.http.protocol.version";
+
+    /** Maximum request concurrency for {@link SdkAsyncHttpClient}. */
+    public static final String HTTP_CLIENT_MAX_CONCURRENCY = "aws.http-client.max-concurrency";
+
+    /** Read Request timeout for {@link SdkAsyncHttpClient}. */
+    public static final String HTTP_CLIENT_READ_TIMEOUT_MILLIS = "aws.http-client.read-timeout";
+
+    public static String accessKeyId(String prefix) {
+        return prefix + ".basic.accesskeyid";
+    }
+
+    public static String secretKey(String prefix) {
+        return prefix + ".basic.secretkey";
+    }
+
+    public static String profilePath(String prefix) {
+        return prefix + ".profile.path";
+    }
+
+    public static String profileName(String prefix) {
+        return prefix + ".profile.name";
+    }
+
+    public static String roleArn(String prefix) {
+        return prefix + ".role.arn";
+    }
+
+    public static String roleSessionName(String prefix) {
+        return prefix + ".role.sessionName";
+    }
+
+    public static String externalId(String prefix) {
+        return prefix + ".role.externalId";
+    }
+
+    public static String roleCredentialsProvider(String prefix) {
+        return prefix + ".role.provider";
+    }
+
+    public static String webIdentityTokenFile(String prefix) {
+        return prefix + ".webIdentityToken.file";
+    }
+}
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/AWSOptionUtils.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/AWSOptionUtils.java
new file mode 100644
index 0000000..c9a7f58
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/AWSOptionUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.table.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.table.options.ConfigurationValidator;
+import org.apache.flink.connector.base.table.options.TableOptionsUtils;
+import org.apache.flink.connector.base.table.util.ConfigurationValidatorUtil;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/** Handler for AWS specific table options. */
+@PublicEvolving
+public class AWSOptionUtils implements TableOptionsUtils, ConfigurationValidator {
+    /** Prefix for properties defined in {@link AWSConfigConstants}. */
+    public static final String AWS_PROPERTIES_PREFIX = "aws.";
+
+    private final Map<String, String> resolvedOptions;
+
+    public AWSOptionUtils(Map<String, String> resolvedOptions) {
+        this.resolvedOptions = resolvedOptions;
+    }
+
+    @Override
+    public Map<String, String> getProcessedResolvedOptions() {
+        Map<String, String> mappedResolvedOptions = new HashMap<>();
+        for (String key : resolvedOptions.keySet()) {
+            if (key.startsWith(AWS_PROPERTIES_PREFIX)) {
+                mappedResolvedOptions.put(translateAwsKey(key), resolvedOptions.get(key));
+            }
+        }
+        return mappedResolvedOptions;
+    }
+
+    @Override
+    public List<String> getNonValidatedPrefixes() {
+        return Collections.singletonList(AWS_PROPERTIES_PREFIX);
+    }
+
+    @Override
+    public Properties getValidatedConfigurations() {
+        Properties awsConfigurations = new Properties();
+        Map<String, String> mappedProperties = getProcessedResolvedOptions();
+        for (Map.Entry<String, String> entry : mappedProperties.entrySet()) {
+            awsConfigurations.setProperty(entry.getKey(), entry.getValue());
+        }
+        AWSGeneralUtil.validateAwsConfiguration(awsConfigurations);
+        ConfigurationValidatorUtil.validateOptionalBooleanProperty(
+                awsConfigurations,
+                AWSConfigConstants.TRUST_ALL_CERTIFICATES,
+                String.format(
+                        "Invalid %s value, must be a boolean.",
+                        AWSConfigConstants.TRUST_ALL_CERTIFICATES));
+        return awsConfigurations;
+    }
+
+    /** Map {@code scan.foo.bar} to {@code flink.foo.bar}. */
+    private static String translateAwsKey(String key) {
+        if (!key.endsWith("credentials.provider")) {
+            return key.replace("credentials.", "credentials.provider.");
+        } else {
+            return key;
+        }
+    }
+}
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtils.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtils.java
new file mode 100644
index 0000000..b347a01
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.table.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.base.table.util.ConfigurationValidatorUtil;
+
+import software.amazon.awssdk.http.Protocol;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/** Class for handling Kinesis async client specific options. */
+@PublicEvolving
+public class AsyncClientOptionsUtils extends AWSOptionUtils {
+    /** Prefix for properties defined in {@link AWSConfigConstants}. */
+    public static final String SINK_CLIENT_PREFIX = "sink.http-client.";
+
+    private static final String CLIENT_MAX_CONCURRENCY_OPTION = "max-concurrency";
+    private static final String CLIENT_MAX_TIMEOUT_OPTION = "read-timeout";
+    private static final String CLIENT_HTTP_PROTOCOL_VERSION_OPTION = "protocol.version";
+
+    private final Map<String, String> resolvedOptions;
+
+    public AsyncClientOptionsUtils(Map<String, String> resolvedOptions) {
+        super(resolvedOptions);
+        this.resolvedOptions = resolvedOptions;
+    }
+
+    @Override
+    public Map<String, String> getProcessedResolvedOptions() {
+        Map<String, String> mappedResolvedOptions = super.getProcessedResolvedOptions();
+        for (String key : resolvedOptions.keySet()) {
+            if (key.startsWith(SINK_CLIENT_PREFIX)) {
+                mappedResolvedOptions.put(translateClientKeys(key), resolvedOptions.get(key));
+            }
+        }
+        return mappedResolvedOptions;
+    }
+
+    @Override
+    public List<String> getNonValidatedPrefixes() {
+        return Arrays.asList(AWS_PROPERTIES_PREFIX, SINK_CLIENT_PREFIX);
+    }
+
+    @Override
+    public Properties getValidatedConfigurations() {
+        Properties clientConfigurations = super.getValidatedConfigurations();
+        clientConfigurations.putAll(getProcessedResolvedOptions());
+        validatedConfigurations(clientConfigurations);
+        return clientConfigurations;
+    }
+
+    private static String translateClientKeys(String key) {
+        String truncatedKey = key.substring(SINK_CLIENT_PREFIX.length());
+        switch (truncatedKey) {
+            case CLIENT_MAX_CONCURRENCY_OPTION:
+                return AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY;
+            case CLIENT_MAX_TIMEOUT_OPTION:
+                return AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS;
+            case CLIENT_HTTP_PROTOCOL_VERSION_OPTION:
+                return AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+            default:
+                return truncatedKey;
+        }
+    }
+
+    private void validatedConfigurations(Properties config) {
+        ConfigurationValidatorUtil.validateOptionalPositiveIntProperty(
+                config,
+                AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY,
+                "Invalid value given for HTTP client max concurrency. Must be positive integer.");
+        ConfigurationValidatorUtil.validateOptionalPositiveIntProperty(
+                config,
+                AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS,
+                "Invalid value given for HTTP read timeout. Must be positive integer.");
+        validateOptionalHttpProtocolProperty(config);
+    }
+
+    private void validateOptionalHttpProtocolProperty(Properties config) {
+        if (config.containsKey(AWSConfigConstants.HTTP_PROTOCOL_VERSION)) {
+            try {
+                Protocol.valueOf(config.getProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION));
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException(
+                        "Invalid value given for HTTP protocol. Must be HTTP1_1 or HTTP2.");
+            }
+        }
+    }
+}
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java
new file mode 100644
index 0000000..1256142
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
+import software.amazon.awssdk.core.client.config.SdkClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+
+import java.net.URI;
+import java.util.Optional;
+import java.util.Properties;
+
+/** Some utilities specific to Amazon Web Service. */
+@Internal
+public class AWSAsyncSinkUtil extends AWSGeneralUtil {
+
+    /** V2 suffix to denote the unified sinks. V1 sinks are based on KPL etc. */
+    static final String V2_USER_AGENT_SUFFIX = " V2";
+
+    /**
+     * Creates a user agent prefix for Flink. This can be used by HTTP Clients.
+     *
+     * @param userAgentFormat flink user agent prefix format with placeholders for version and
+     *     commit id.
+     * @return a user agent prefix for Flink
+     */
+    public static String formatFlinkUserAgentPrefix(String userAgentFormat) {
+        return String.format(
+                userAgentFormat,
+                EnvironmentInformation.getVersion(),
+                EnvironmentInformation.getRevisionInformation().commitId);
+    }
+
+    /**
+     * @param configProps configuration properties
+     * @param httpClient the underlying HTTP client used to talk to AWS
+     * @return a new AWS Client
+     */
+    public static <
+                    S extends SdkClient,
+                    T extends
+                            AwsAsyncClientBuilder<? extends T, S>
+                                    & AwsClientBuilder<? extends T, S>>
+            S createAwsAsyncClient(
+                    final Properties configProps,
+                    final SdkAsyncHttpClient httpClient,
+                    final T clientBuilder,
+                    final String awsUserAgentPrefixFormat,
+                    final String awsClientUserAgentPrefix) {
+        SdkClientConfiguration clientConfiguration = SdkClientConfiguration.builder().build();
+        return createAwsAsyncClient(
+                configProps,
+                clientConfiguration,
+                httpClient,
+                clientBuilder,
+                awsUserAgentPrefixFormat,
+                awsClientUserAgentPrefix);
+    }
+
+    /**
+     * @param configProps configuration properties
+     * @param clientConfiguration the AWS SDK v2 config to instantiate the client
+     * @param httpClient the underlying HTTP client used to talk to AWS
+     * @return a new AWS Client
+     */
+    public static <
+                    S extends SdkClient,
+                    T extends
+                            AwsAsyncClientBuilder<? extends T, S>
+                                    & AwsClientBuilder<? extends T, S>>
+            S createAwsAsyncClient(
+                    final Properties configProps,
+                    final SdkClientConfiguration clientConfiguration,
+                    final SdkAsyncHttpClient httpClient,
+                    final T clientBuilder,
+                    final String awsUserAgentPrefixFormat,
+                    final String awsClientUserAgentPrefix) {
+        String flinkUserAgentPrefix =
+                Optional.ofNullable(configProps.getProperty(awsClientUserAgentPrefix))
+                        .orElse(
+                                formatFlinkUserAgentPrefix(
+                                        awsUserAgentPrefixFormat + V2_USER_AGENT_SUFFIX));
+
+        final ClientOverrideConfiguration overrideConfiguration =
+                createClientOverrideConfiguration(
+                        clientConfiguration,
+                        ClientOverrideConfiguration.builder(),
+                        flinkUserAgentPrefix);
+
+        return createAwsAsyncClient(configProps, clientBuilder, httpClient, overrideConfiguration);
+    }
+
+    @VisibleForTesting
+    static ClientOverrideConfiguration createClientOverrideConfiguration(
+            final SdkClientConfiguration config,
+            final ClientOverrideConfiguration.Builder overrideConfigurationBuilder,
+            String flinkUserAgentPrefix) {
+
+        overrideConfigurationBuilder
+                .putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, flinkUserAgentPrefix)
+                .putAdvancedOption(
+                        SdkAdvancedClientOption.USER_AGENT_SUFFIX,
+                        config.option(SdkAdvancedClientOption.USER_AGENT_SUFFIX));
+
+        Optional.ofNullable(config.option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT))
+                .ifPresent(overrideConfigurationBuilder::apiCallAttemptTimeout);
+
+        Optional.ofNullable(config.option(SdkClientOption.API_CALL_TIMEOUT))
+                .ifPresent(overrideConfigurationBuilder::apiCallTimeout);
+
+        return overrideConfigurationBuilder.build();
+    }
+
+    @VisibleForTesting
+    static <
+                    S extends SdkClient,
+                    T extends
+                            AwsAsyncClientBuilder<? extends T, S>
+                                    & AwsClientBuilder<? extends T, S>>
+            S createAwsAsyncClient(
+                    final Properties configProps,
+                    final T clientBuilder,
+                    final SdkAsyncHttpClient httpClient,
+                    final ClientOverrideConfiguration overrideConfiguration) {
+
+        if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
+            final URI endpointOverride =
+                    URI.create(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+            clientBuilder.endpointOverride(endpointOverride);
+        }
+
+        return clientBuilder
+                .httpClient(httpClient)
+                .overrideConfiguration(overrideConfiguration)
+                .credentialsProvider(getCredentialsProvider(configProps))
+                .region(getRegion(configProps))
+                .build();
+    }
+}
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAuthenticationException.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAuthenticationException.java
new file mode 100644
index 0000000..e5527b3
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAuthenticationException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Exception thrown on failure of authentication of Aws Credentials, this includes missing
+ * configuration, illegal access and unreachable endpoints. All {@code AWSAuthenticationException}
+ * should be non-retryable.
+ */
+@Internal
+public class AWSAuthenticationException extends RuntimeException {
+
+    public AWSAuthenticationException(final String message) {
+        super(message);
+    }
+
+    public AWSAuthenticationException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSCredentialFatalExceptionClassifiers.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSCredentialFatalExceptionClassifiers.java
new file mode 100644
index 0000000..713e11d
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSCredentialFatalExceptionClassifiers.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.sts.model.StsException;
+
+/** Class containing set of {@link FatalExceptionClassifier} for AWS credential failures. */
+@Internal
+public class AWSCredentialFatalExceptionClassifiers {
+    public static FatalExceptionClassifier getInvalidCredentialsExceptionClassifier() {
+        return FatalExceptionClassifier.withRootCauseOfType(
+                StsException.class,
+                err ->
+                        new AWSAuthenticationException(
+                                "Encountered non-recoverable exception relating to the provided credentials.",
+                                err));
+    }
+
+    public static FatalExceptionClassifier getSdkClientMisconfiguredExceptionClassifier() {
+        return FatalExceptionClassifier.withRootCauseOfType(
+                SdkClientException.class, err -> (Exception) err);
+    }
+}
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
new file mode 100644
index 0000000..e1d7f98
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider;
+import org.apache.flink.util.ExceptionUtils;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.Http2Configuration;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.profiles.ProfileFile;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.utils.AttributeMap;
+import software.amazon.awssdk.utils.SdkAutoCloseable;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+/** Some general utilities specific to Amazon Web Service. */
+@Internal
+public class AWSGeneralUtil {
+    private static final Duration CONNECTION_ACQUISITION_TIMEOUT = Duration.ofSeconds(60);
+    private static final int INITIAL_WINDOW_SIZE_BYTES = 512 * 1024; // 512 KB
+    private static final Duration HEALTH_CHECK_PING_PERIOD = Duration.ofSeconds(60);
+
+    private static final int HTTP_CLIENT_MAX_CONCURRENCY = 10_000;
+    private static final Duration HTTP_CLIENT_READ_TIMEOUT = Duration.ofMinutes(6);
+    private static final Protocol HTTP_PROTOCOL = Protocol.HTTP2;
+    private static final boolean TRUST_ALL_CERTIFICATES = false;
+    private static final AttributeMap HTTP_CLIENT_DEFAULTS =
+            AttributeMap.builder()
+                    .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, HTTP_CLIENT_MAX_CONCURRENCY)
+                    .put(SdkHttpConfigurationOption.READ_TIMEOUT, HTTP_CLIENT_READ_TIMEOUT)
+                    .put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, TRUST_ALL_CERTIFICATES)
+                    .put(SdkHttpConfigurationOption.PROTOCOL, HTTP_PROTOCOL)
+                    .build();
+
+    /**
+     * Determines and returns the credential provider type from the given properties.
+     *
+     * @return the credential provider type
+     */
+    public static CredentialProvider getCredentialProviderType(
+            final Properties configProps, final String configPrefix) {
+        if (!configProps.containsKey(configPrefix)) {
+            if (configProps.containsKey(AWSConfigConstants.accessKeyId(configPrefix))
+                    && configProps.containsKey(AWSConfigConstants.secretKey(configPrefix))) {
+                // if the credential provider type is not specified, but the Access Key ID and
+                // Secret Key are given, it will default to BASIC
+                return CredentialProvider.BASIC;
+            } else {
+                // if the credential provider type is not specified, it will default to AUTO
+                return CredentialProvider.AUTO;
+            }
+        } else {
+            try {
+                return CredentialProvider.valueOf(configProps.getProperty(configPrefix));
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Invalid AWS Credential Provider Type %s.",
+                                configProps.getProperty(configPrefix)),
+                        e);
+            }
+        }
+    }
+
+    /**
+     * Return a {@link AwsCredentialsProvider} instance corresponding to the configuration
+     * properties.
+     *
+     * @param configProps the configuration property map
+     * @return The corresponding AWS Credentials Provider instance
+     */
+    public static AwsCredentialsProvider getCredentialsProvider(final Map<String, ?> configProps) {
+        Properties properties = new Properties();
+        properties.putAll(configProps);
+
+        return getCredentialsProvider(properties);
+    }
+
+    /**
+     * Return a {@link AwsCredentialsProvider} instance corresponding to the configuration
+     * properties.
+     *
+     * @param configProps the configuration properties
+     * @return The corresponding AWS Credentials Provider instance
+     */
+    public static AwsCredentialsProvider getCredentialsProvider(final Properties configProps) {
+        return getCredentialsProvider(configProps, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+    }
+
+    public static AwsCredentialsProvider getCredentialsProvider(
+            final Properties configProps, final String configPrefix) {
+        CredentialProvider credentialProviderType =
+                getCredentialProviderType(configProps, configPrefix);
+
+        switch (credentialProviderType) {
+            case ENV_VAR:
+                return EnvironmentVariableCredentialsProvider.create();
+
+            case SYS_PROP:
+                return SystemPropertyCredentialsProvider.create();
+
+            case PROFILE:
+                return getProfileCredentialProvider(configProps, configPrefix);
+
+            case BASIC:
+                return () ->
+                        AwsBasicCredentials.create(
+                                configProps.getProperty(
+                                        AWSConfigConstants.accessKeyId(configPrefix)),
+                                configProps.getProperty(
+                                        AWSConfigConstants.secretKey(configPrefix)));
+
+            case ASSUME_ROLE:
+                return getAssumeRoleCredentialProvider(configProps, configPrefix);
+
+            case WEB_IDENTITY_TOKEN:
+                return getWebIdentityTokenFileCredentialsProvider(
+                        WebIdentityTokenFileCredentialsProvider.builder(),
+                        configProps,
+                        configPrefix);
+
+            case AUTO:
+                return DefaultCredentialsProvider.create();
+
+            default:
+                throw new IllegalArgumentException(
+                        "Credential provider not supported: " + credentialProviderType);
+        }
+    }
+
+    public static AwsCredentialsProvider getProfileCredentialProvider(
+            final Properties configProps, final String configPrefix) {
+        String profileName =
+                configProps.getProperty(AWSConfigConstants.profileName(configPrefix), null);
+
+        ProfileCredentialsProvider.Builder profileBuilder =
+                ProfileCredentialsProvider.builder().profileName(profileName);
+
+        Optional.ofNullable(configProps.getProperty(AWSConfigConstants.profilePath(configPrefix)))
+                .map(Paths::get)
+                .ifPresent(
+                        path ->
+                                profileBuilder.profileFile(
+                                        ProfileFile.builder()
+                                                .type(ProfileFile.Type.CREDENTIALS)
+                                                .content(path)
+                                                .build()));
+
+        return profileBuilder.build();
+    }
+
+    private static AwsCredentialsProvider getAssumeRoleCredentialProvider(
+            final Properties configProps, final String configPrefix) {
+        return StsAssumeRoleCredentialsProvider.builder()
+                .refreshRequest(
+                        AssumeRoleRequest.builder()
+                                .roleArn(
+                                        configProps.getProperty(
+                                                AWSConfigConstants.roleArn(configPrefix)))
+                                .roleSessionName(
+                                        configProps.getProperty(
+                                                AWSConfigConstants.roleSessionName(configPrefix)))
+                                .externalId(
+                                        configProps.getProperty(
+                                                AWSConfigConstants.externalId(configPrefix)))
+                                .build())
+                .stsClient(
+                        StsClient.builder()
+                                .credentialsProvider(
+                                        getCredentialsProvider(
+                                                configProps,
+                                                AWSConfigConstants.roleCredentialsProvider(
+                                                        configPrefix)))
+                                .region(getRegion(configProps))
+                                .build())
+                .build();
+    }
+
+    @VisibleForTesting
+    static AwsCredentialsProvider getWebIdentityTokenFileCredentialsProvider(
+            final WebIdentityTokenFileCredentialsProvider.Builder webIdentityBuilder,
+            final Properties configProps,
+            final String configPrefix) {
+
+        Optional.ofNullable(configProps.getProperty(AWSConfigConstants.roleArn(configPrefix)))
+                .ifPresent(webIdentityBuilder::roleArn);
+
+        Optional.ofNullable(
+                        configProps.getProperty(AWSConfigConstants.roleSessionName(configPrefix)))
+                .ifPresent(webIdentityBuilder::roleSessionName);
+
+        Optional.ofNullable(
+                        configProps.getProperty(
+                                AWSConfigConstants.webIdentityTokenFile(configPrefix)))
+                .map(Paths::get)
+                .ifPresent(webIdentityBuilder::webIdentityTokenFile);
+
+        return webIdentityBuilder.build();
+    }
+
+    public static SdkAsyncHttpClient createAsyncHttpClient(final Properties configProperties) {
+        return createAsyncHttpClient(configProperties, NettyNioAsyncHttpClient.builder());
+    }
+
+    public static SdkAsyncHttpClient createAsyncHttpClient(
+            final Properties configProperties,
+            final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+        final AttributeMap.Builder clientConfiguration =
+                AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);
+
+        Optional.ofNullable(
+                        configProperties.getProperty(
+                                AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY))
+                .map(Integer::parseInt)
+                .ifPresent(
+                        integer ->
+                                clientConfiguration.put(
+                                        SdkHttpConfigurationOption.MAX_CONNECTIONS, integer));
+
+        Optional.ofNullable(
+                        configProperties.getProperty(
+                                AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS))
+                .map(Integer::parseInt)
+                .map(Duration::ofMillis)
+                .ifPresent(
+                        timeout ->
+                                clientConfiguration.put(
+                                        SdkHttpConfigurationOption.READ_TIMEOUT, timeout));
+
+        Optional.ofNullable(configProperties.getProperty(AWSConfigConstants.TRUST_ALL_CERTIFICATES))
+                .map(Boolean::parseBoolean)
+                .ifPresent(
+                        bool ->
+                                clientConfiguration.put(
+                                        SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, bool));
+
+        Optional.ofNullable(configProperties.getProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION))
+                .map(Protocol::valueOf)
+                .ifPresent(
+                        protocol ->
+                                clientConfiguration.put(
+                                        SdkHttpConfigurationOption.PROTOCOL, protocol));
+        return createAsyncHttpClient(clientConfiguration.build(), httpClientBuilder);
+    }
+
+    public static SdkAsyncHttpClient createAsyncHttpClient(
+            final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+        return createAsyncHttpClient(AttributeMap.empty(), httpClientBuilder);
+    }
+
+    public static SdkAsyncHttpClient createAsyncHttpClient(
+            final AttributeMap config, final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+        httpClientBuilder
+                .connectionAcquisitionTimeout(CONNECTION_ACQUISITION_TIMEOUT)
+                .http2Configuration(
+                        Http2Configuration.builder()
+                                .healthCheckPingPeriod(HEALTH_CHECK_PING_PERIOD)
+                                .initialWindowSize(INITIAL_WINDOW_SIZE_BYTES)
+                                .build());
+        return httpClientBuilder.buildWithDefaults(config.merge(HTTP_CLIENT_DEFAULTS));
+    }
+
+    /**
+     * Creates a {@link Region} object from the given Properties.
+     *
+     * @param configProps the properties containing the region
+     * @return the region specified by the properties
+     */
+    public static Region getRegion(final Properties configProps) {
+        return Region.of(configProps.getProperty(AWSConfigConstants.AWS_REGION));
+    }
+
+    /**
+     * Checks whether or not a region is valid.
+     *
+     * @param region The AWS region to check
+     * @return true if the supplied region is valid, false otherwise
+     */
+    public static boolean isValidRegion(Region region) {
+        return Pattern.matches(
+                "^[a-z]+-([a-z]+[-]{0,1}[a-z]+-([0-9]|global)|global)$", region.id());
+    }
+
+    /**
+     * Validates configuration properties related to Amazon AWS service.
+     *
+     * @param config the properties to setup credentials and region
+     */
+    public static void validateAwsConfiguration(Properties config) {
+        if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
+
+            validateCredentialProvider(config);
+            // if BASIC type is used, also check that the Access Key ID and Secret Key is supplied
+            CredentialProvider credentialsProviderType =
+                    getCredentialProviderType(config, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+            if (credentialsProviderType == CredentialProvider.BASIC) {
+                if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
+                        || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+                    throw new IllegalArgumentException(
+                            "Please set values for AWS Access Key ID ('"
+                                    + AWSConfigConstants.AWS_ACCESS_KEY_ID
+                                    + "') "
+                                    + "and Secret Key ('"
+                                    + AWSConfigConstants.AWS_SECRET_ACCESS_KEY
+                                    + "') when using the BASIC AWS credential provider type.");
+                }
+            }
+        }
+
+        if (config.containsKey(AWSConfigConstants.AWS_REGION)) {
+            // specified AWS Region name must be recognizable
+            if (!isValidRegion(getRegion(config))) {
+                StringBuilder sb = new StringBuilder();
+                for (Region region : Region.regions()) {
+                    sb.append(region).append(", ");
+                }
+                throw new IllegalArgumentException(
+                        "Invalid AWS region set in config. Valid values are: " + sb.toString());
+            }
+        }
+    }
+
+    public static void closeResources(SdkAutoCloseable... resources) {
+        RuntimeException exception = null;
+        for (SdkAutoCloseable resource : resources) {
+            if (resource != null) {
+                try {
+                    resource.close();
+                } catch (RuntimeException e) {
+                    exception = ExceptionUtils.firstOrSuppressed(e, exception);
+                }
+            }
+        }
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+    public static void validateAwsCredentials(Properties config) {
+        validateAwsConfiguration(config);
+        getCredentialsProvider(config).resolveCredentials();
+    }
+
+    private static void validateCredentialProvider(Properties config) {
+        // value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be
+        // recognizable
+        try {
+            getCredentialsProvider(config);
+        } catch (IllegalArgumentException e) {
+            StringBuilder sb = new StringBuilder();
+            for (CredentialProvider type : CredentialProvider.values()) {
+                sb.append(type.toString()).append(", ");
+            }
+            throw new IllegalArgumentException(
+                    "Invalid AWS Credential Provider Type set in config. Valid values are: "
+                            + sb.toString());
+        }
+    }
+}
diff --git a/flink-connector-aws-base/src/main/resources/log4j2.properties b/flink-connector-aws-base/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..c64a340
--- /dev/null
+++ b/flink-connector-aws-base/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = OFF
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
new file mode 100644
index 0000000..8d533a6
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.architecture;
+
+import org.apache.flink.architecture.common.ImportOptions;
+
+import com.tngtech.archunit.core.importer.ImportOption;
+import com.tngtech.archunit.junit.AnalyzeClasses;
+import com.tngtech.archunit.junit.ArchTest;
+import com.tngtech.archunit.junit.ArchTests;
+
+/** Architecture tests for test code. */
+@AnalyzeClasses(
+        packages = "org.apache.flink.connector.aws",
+        importOptions = {
+            ImportOption.OnlyIncludeTests.class,
+            ImportOptions.ExcludeScalaImportOption.class,
+            ImportOptions.ExcludeShadedImportOption.class
+        })
+public class TestCodeArchitectureTest {
+
+    @ArchTest
+    public static final ArchTests COMMON_TESTS = ArchTests.in(TestCodeArchitectureTestBase.class);
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AWSOptionsUtilTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AWSOptionsUtilTest.java
new file mode 100644
index 0000000..6786317
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AWSOptionsUtilTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.table.util;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/** Unit tests for {@link AWSOptionUtils}. */
+class AWSOptionsUtilTest {
+
+    @Test
+    void testAWSKeyMapper() {
+        AWSOptionUtils awsOptionUtils = new AWSOptionUtils(getDefaultAWSConfigurations());
+        Map<String, String> expectedProperties = getDefaultExpectedAWSConfigurations();
+
+        // process default aws options.
+        Map<String, String> actualMappedProperties = awsOptionUtils.getProcessedResolvedOptions();
+
+        Assertions.assertThat(actualMappedProperties).isEqualTo(expectedProperties);
+    }
+
+    @Test
+    void testAWSKeySelectionAndMapping() {
+        Map<String, String> resolvedTableOptions = getDefaultAWSConfigurations();
+        Map<String, String> expectedProperties = getDefaultExpectedAWSConfigurations();
+        // adding irrelevant configurations
+        resolvedTableOptions.put("non.aws.key1", "value1");
+        resolvedTableOptions.put("non.aws.key2", "value2");
+        resolvedTableOptions.put("non.aws.key3", "value3");
+        resolvedTableOptions.put("non.aws.key4", "value4");
+
+        AWSOptionUtils awsOptionUtils = new AWSOptionUtils(resolvedTableOptions);
+        Map<String, String> actualMappedProperties = awsOptionUtils.getProcessedResolvedOptions();
+
+        Assertions.assertThat(actualMappedProperties).isEqualTo(expectedProperties);
+    }
+
+    @Test
+    void testGoodAWSProperties() {
+        AWSOptionUtils awsOptionUtils = new AWSOptionUtils(getDefaultAWSConfigurations());
+        Properties expectedProperties = new Properties();
+        expectedProperties.putAll(getDefaultExpectedAWSConfigurations());
+        // extract aws configuration from properties
+        Properties actualProperties = awsOptionUtils.getValidatedConfigurations();
+
+        Assertions.assertThat(actualProperties).isEqualTo(expectedProperties);
+    }
+
+    @Test
+    void testBadAWSRegion() {
+        Map<String, String> defaultProperties = getDefaultAWSConfigurations();
+        defaultProperties.put("aws.region", "invalid-aws-region");
+
+        AWSOptionUtils awsOptionUtils = new AWSOptionUtils(defaultProperties);
+
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(awsOptionUtils::getValidatedConfigurations)
+                .withMessageContaining("Invalid AWS region set in config.");
+    }
+
+    @Test
+    void testMissingAWSCredentials() {
+        Map<String, String> defaultProperties = getDefaultAWSConfigurations();
+        defaultProperties.remove("aws.credentials.basic.accesskeyid");
+
+        AWSOptionUtils awsOptionUtils = new AWSOptionUtils(defaultProperties);
+
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(awsOptionUtils::getValidatedConfigurations)
+                .withMessageContaining(
+                        String.format(
+                                "Please set values for AWS Access Key ID ('%s') "
+                                        + "and Secret Key ('%s') when using the BASIC AWS credential provider type.",
+                                AWSConfigConstants.AWS_ACCESS_KEY_ID,
+                                AWSConfigConstants.AWS_SECRET_ACCESS_KEY));
+    }
+
+    @Test
+    void testInvalidTrustAllCertificatesOption() {
+        Map<String, String> defaultProperties = getDefaultAWSConfigurations();
+        defaultProperties.put("aws.trust.all.certificates", "invalid-boolean");
+
+        AWSOptionUtils awsOptionUtils = new AWSOptionUtils(defaultProperties);
+
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(awsOptionUtils::getValidatedConfigurations)
+                .withMessageContaining(
+                        String.format(
+                                "Invalid %s value, must be a boolean.",
+                                AWSConfigConstants.TRUST_ALL_CERTIFICATES));
+    }
+
+    private Map<String, String> getDefaultAWSConfigurations() {
+        Map<String, String> defaultAWSConfigurations = new HashMap<String, String>();
+        defaultAWSConfigurations.put("aws.region", "us-west-2");
+        defaultAWSConfigurations.put("aws.credentials.provider", "BASIC");
+        defaultAWSConfigurations.put("aws.credentials.basic.accesskeyid", "ververicka");
+        defaultAWSConfigurations.put(
+                "aws.credentials.basic.secretkey", "SuperSecretSecretSquirrel");
+        defaultAWSConfigurations.put("aws.trust.all.certificates", "true");
+        return defaultAWSConfigurations;
+    }
+
+    private Map<String, String> getDefaultExpectedAWSConfigurations() {
+        Map<String, String> defaultExpectedAWSConfigurations = new HashMap<String, String>();
+        defaultExpectedAWSConfigurations.put("aws.region", "us-west-2");
+        defaultExpectedAWSConfigurations.put("aws.credentials.provider", "BASIC");
+        defaultExpectedAWSConfigurations.put(
+                "aws.credentials.provider.basic.accesskeyid", "ververicka");
+        defaultExpectedAWSConfigurations.put(
+                "aws.credentials.provider.basic.secretkey", "SuperSecretSecretSquirrel");
+        defaultExpectedAWSConfigurations.put("aws.trust.all.certificates", "true");
+        return defaultExpectedAWSConfigurations;
+    }
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtilsTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtilsTest.java
new file mode 100644
index 0000000..afc35e2
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtilsTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.table.util;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/** Unit tests for {@link AsyncClientOptionsUtils}. */
+class AsyncClientOptionsUtilsTest {
+
+    @Test
+    public void testGoodAsyncClientOptionsMapping() {
+        AsyncClientOptionsUtils asyncClientOptionsUtils =
+                new AsyncClientOptionsUtils(getDefaultClientOptions());
+
+        Map<String, String> expectedConfigurations = getDefaultExpectedClientOptions();
+        Map<String, String> actualConfigurations =
+                asyncClientOptionsUtils.getProcessedResolvedOptions();
+
+        Assertions.assertThat(actualConfigurations).isEqualTo(expectedConfigurations);
+    }
+
+    @Test
+    void testAsyncClientOptionsUtilsFilteringNonPrefixedOptions() {
+        Map<String, String> defaultClientOptions = getDefaultClientOptions();
+        defaultClientOptions.put("sink.not.http-client.some.option", "someValue");
+
+        AsyncClientOptionsUtils asyncClientOptionsUtils =
+                new AsyncClientOptionsUtils(defaultClientOptions);
+
+        Map<String, String> expectedConfigurations = getDefaultExpectedClientOptions();
+        Map<String, String> actualConfigurations =
+                asyncClientOptionsUtils.getProcessedResolvedOptions();
+
+        Assertions.assertThat(actualConfigurations).isEqualTo(expectedConfigurations);
+    }
+
+    @Test
+    void testAsyncClientOptionsUtilsExtractingCorrectConfiguration() {
+        AsyncClientOptionsUtils asyncClientOptionsUtils =
+                new AsyncClientOptionsUtils(getDefaultClientOptions());
+
+        Properties expectedConfigurations = getDefaultExpectedClientConfigs();
+        Properties actualConfigurations = asyncClientOptionsUtils.getValidatedConfigurations();
+
+        Assertions.assertThat(actualConfigurations).isEqualTo(expectedConfigurations);
+    }
+
+    @Test
+    void testAsyncClientOptionsUtilsFailOnInvalidMaxConcurrency() {
+        Map<String, String> defaultClientOptions = getDefaultClientOptions();
+        defaultClientOptions.put("sink.http-client.max-concurrency", "invalid-integer");
+
+        AsyncClientOptionsUtils asyncClientOptionsUtils =
+                new AsyncClientOptionsUtils(defaultClientOptions);
+
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(asyncClientOptionsUtils::getValidatedConfigurations)
+                .withMessageContaining(
+                        "Invalid value given for HTTP client max concurrency. Must be positive integer.");
+    }
+
+    @Test
+    void testAsyncClientOptionsUtilsFailOnInvalidReadTimeout() {
+        Map<String, String> defaultClientOptions = getDefaultClientOptions();
+        defaultClientOptions.put("sink.http-client.read-timeout", "invalid-integer");
+
+        AsyncClientOptionsUtils asyncClientOptionsUtils =
+                new AsyncClientOptionsUtils(defaultClientOptions);
+
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(asyncClientOptionsUtils::getValidatedConfigurations)
+                .withMessageContaining(
+                        "Invalid value given for HTTP read timeout. Must be positive integer.");
+    }
+
+    @Test
+    void testAsyncClientOptionsUtilsFailOnInvalidHttpProtocol() {
+        Map<String, String> defaultProperties = getDefaultClientOptions();
+        defaultProperties.put("sink.http-client.protocol.version", "invalid-http-protocol");
+
+        AsyncClientOptionsUtils asyncClientOptionsUtils =
+                new AsyncClientOptionsUtils(defaultProperties);
+
+        Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(asyncClientOptionsUtils::getValidatedConfigurations)
+                .withMessageContaining(
+                        "Invalid value given for HTTP protocol. Must be HTTP1_1 or HTTP2.");
+    }
+
+    private static Map<String, String> getDefaultClientOptions() {
+        Map<String, String> defaultKinesisClientOptions = new HashMap<String, String>();
+        defaultKinesisClientOptions.put("aws.region", "us-east-1");
+        defaultKinesisClientOptions.put("sink.http-client.max-concurrency", "10000");
+        defaultKinesisClientOptions.put("sink.http-client.read-timeout", "360000");
+        defaultKinesisClientOptions.put("sink.http-client.protocol.version", "HTTP2");
+        return defaultKinesisClientOptions;
+    }
+
+    private static Map<String, String> getDefaultExpectedClientOptions() {
+        Map<String, String> defaultExpectedKinesisClientConfigurations =
+                new HashMap<String, String>();
+        defaultExpectedKinesisClientConfigurations.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+        defaultExpectedKinesisClientConfigurations.put(
+                AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY, "10000");
+        defaultExpectedKinesisClientConfigurations.put(
+                AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS, "360000");
+        defaultExpectedKinesisClientConfigurations.put(
+                AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP2");
+        return defaultExpectedKinesisClientConfigurations;
+    }
+
+    private static Properties getDefaultExpectedClientConfigs() {
+        Properties defaultExpectedKinesisClientConfigurations = new Properties();
+        defaultExpectedKinesisClientConfigurations.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+        defaultExpectedKinesisClientConfigurations.put(
+                AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY, "10000");
+        defaultExpectedKinesisClientConfigurations.put(
+                AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS, "360000");
+        defaultExpectedKinesisClientConfigurations.put(
+                AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP2");
+        return defaultExpectedKinesisClientConfigurations;
+    }
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
new file mode 100644
index 0000000..7d54c39
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.testutils;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.iam.IamClient;
+import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.waiters.S3Waiter;
+import software.amazon.awssdk.utils.AttributeMap;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the Localstack container.
+ */
+public class AWSServicesTestUtils {
+
+    private static final String ACCESS_KEY_ID = "accessKeyId";
+    private static final String SECRET_ACCESS_KEY = "secretAccessKey";
+
+    public static S3Client createS3Client(String endpoint, SdkHttpClient httpClient) {
+        return createAwsSyncClient(endpoint, httpClient, S3Client.builder());
+    }
+
+    public static IamClient createIamClient(String endpoint, SdkHttpClient httpClient) {
+        return createAwsSyncClient(endpoint, httpClient, IamClient.builder());
+    }
+
+    public static <
+                    S extends SdkClient,
+                    T extends
+                            AwsSyncClientBuilder<? extends T, S> & AwsClientBuilder<? extends T, S>>
+            S createAwsSyncClient(String endpoint, SdkHttpClient httpClient, T clientBuilder) {
+        Properties config = createConfig(endpoint);
+        return clientBuilder
+                .httpClient(httpClient)
+                .endpointOverride(URI.create(endpoint))
+                .credentialsProvider(AWSGeneralUtil.getCredentialsProvider(config))
+                .region(AWSGeneralUtil.getRegion(config))
+                .build();
+    }
+
+    public static Properties createConfig(String endpoint) {
+        Properties config = new Properties();
+        config.setProperty(AWS_REGION, Region.AP_SOUTHEAST_1.toString());
+        config.setProperty(AWS_ENDPOINT, endpoint);
+        config.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), ACCESS_KEY_ID);
+        config.setProperty(
+                AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), SECRET_ACCESS_KEY);
+        config.setProperty(TRUST_ALL_CERTIFICATES, "true");
+        return config;
+    }
+
+    public static SdkHttpClient createHttpClient() {
+        AttributeMap.Builder attributeMapBuilder = AttributeMap.builder();
+        attributeMapBuilder.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true);
+        attributeMapBuilder.put(SdkHttpConfigurationOption.PROTOCOL, Protocol.HTTP1_1);
+        return ApacheHttpClient.builder().buildWithDefaults(attributeMapBuilder.build());
+    }
+
+    public static void createBucket(S3Client s3Client, String bucketName) {
+        CreateBucketRequest bucketRequest =
+                CreateBucketRequest.builder().bucket(bucketName).build();
+        s3Client.createBucket(bucketRequest);
+
+        HeadBucketRequest bucketRequestWait =
+                HeadBucketRequest.builder().bucket(bucketName).build();
+
+        try (final S3Waiter waiter = s3Client.waiter()) {
+            waiter.waitUntilBucketExists(bucketRequestWait);
+        }
+    }
+
+    public static void createIAMRole(IamClient iam, String roleName) {
+        CreateRoleRequest request = CreateRoleRequest.builder().roleName(roleName).build();
+
+        iam.createRole(request);
+    }
+
+    public static List<S3Object> listBucketObjects(S3Client s3, String bucketName) {
+        ListObjectsV2Request listObjects =
+                ListObjectsV2Request.builder().bucket(bucketName).build();
+        ListObjectsV2Response res = s3.listObjectsV2(listObjects);
+        return res.contents();
+    }
+
+    public static <T> List<T> readObjectsFromS3Bucket(
+            S3Client s3Client,
+            List<S3Object> objects,
+            String bucketName,
+            Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
+        return objects.stream()
+                .map(
+                        object ->
+                                GetObjectRequest.builder()
+                                        .bucket(bucketName)
+                                        .key(object.key())
+                                        .build())
+                .map(s3Client::getObjectAsBytes)
+                .map(deserializer)
+                .collect(Collectors.toList());
+    }
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java
new file mode 100644
index 0000000..e11ec27
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.testutils;
+
+import org.rnorth.ducttape.ratelimits.RateLimiter;
+import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import java.util.List;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * A class wrapping the Localstack container that provides mock implementations of many common AWS
+ * services.
+ */
+public class LocalstackContainer extends GenericContainer<LocalstackContainer> {
+
+    private static final int CONTAINER_PORT = 4566;
+
+    public LocalstackContainer(DockerImageName imageName) {
+        super(imageName);
+        withExposedPorts(CONTAINER_PORT);
+        waitingFor(new ListBucketObjectsWaitStrategy());
+    }
+
+    public String getEndpoint() {
+        return String.format("https://%s:%s", getHost(), getMappedPort(CONTAINER_PORT));
+    }
+
+    private class ListBucketObjectsWaitStrategy extends AbstractWaitStrategy {
+        private static final int TRANSACTIONS_PER_SECOND = 1;
+
+        private final RateLimiter rateLimiter =
+                RateLimiterBuilder.newBuilder()
+                        .withRate(TRANSACTIONS_PER_SECOND, SECONDS)
+                        .withConstantThroughput()
+                        .build();
+
+        @Override
+        protected void waitUntilReady() {
+            try {
+                Thread.sleep(30_000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                throw new IllegalStateException("Localstack Container startup was interrupted");
+            }
+            Unreliables.retryUntilSuccess(
+                    (int) startupTimeout.getSeconds(),
+                    SECONDS,
+                    () -> rateLimiter.getWhenReady(this::list));
+        }
+
+        private List<S3Object> list() {
+            final String bucketName = "bucket-name-not-to-be-used";
+            try (final SdkHttpClient httpClient = AWSServicesTestUtils.createHttpClient();
+                    final S3Client client =
+                            AWSServicesTestUtils.createS3Client(getEndpoint(), httpClient)) {
+                AWSServicesTestUtils.createBucket(client, bucketName);
+                return AWSServicesTestUtils.listBucketObjects(client, bucketName);
+            }
+        }
+    }
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java
new file mode 100644
index 0000000..cfc12bf
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
+import software.amazon.awssdk.core.client.config.SdkClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.aws.util.AWSAsyncSinkUtil.formatFlinkUserAgentPrefix;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Tests for {@link AWSAsyncSinkUtil}. */
+class AWSAsyncSinkUtilTest {
+
+    private static final String DEFAULT_USER_AGENT_PREFIX_FORMAT =
+            "Apache Flink %s (%s) *Destination* Connector";
+    private static final String DEFAULT_USER_AGENT_PREFIX_FORMAT_V2 =
+            "Apache Flink %s (%s) *Destination* Connector V2";
+
+    @Test
+    void testCreateKinesisAsyncClient() {
+        Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
+        MockAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
+        ClientOverrideConfiguration clientOverrideConfiguration =
+                ClientOverrideConfiguration.builder().build();
+        SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build();
+
+        AWSAsyncSinkUtil.createAwsAsyncClient(
+                properties, builder, httpClient, clientOverrideConfiguration);
+
+        verify(builder).overrideConfiguration(clientOverrideConfiguration);
+        verify(builder).httpClient(httpClient);
+        verify(builder).region(Region.of("eu-west-2"));
+        verify(builder)
+                .credentialsProvider(argThat(cp -> cp instanceof DefaultCredentialsProvider));
+        verify(builder, never()).endpointOverride(any());
+    }
+
+    @Test
+    void testCreateKinesisAsyncClientWithEndpointOverride() {
+        Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
+        properties.setProperty(AWS_ENDPOINT, "https://localhost");
+
+        MockAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
+        ClientOverrideConfiguration clientOverrideConfiguration =
+                ClientOverrideConfiguration.builder().build();
+        SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build();
+
+        AWSAsyncSinkUtil.createAwsAsyncClient(
+                properties, builder, httpClient, clientOverrideConfiguration);
+
+        verify(builder).endpointOverride(URI.create("https://localhost"));
+    }
+
+    @Test
+    void testClientOverrideConfigurationWithDefaults() {
+        SdkClientConfiguration clientConfiguration = SdkClientConfiguration.builder().build();
+
+        ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+        AWSAsyncSinkUtil.createClientOverrideConfiguration(
+                clientConfiguration,
+                builder,
+                formatFlinkUserAgentPrefix(
+                        DEFAULT_USER_AGENT_PREFIX_FORMAT + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
+
+        verify(builder).build();
+        verify(builder)
+                .putAdvancedOption(
+                        SdkAdvancedClientOption.USER_AGENT_PREFIX,
+                        formatFlinkUserAgentPrefix(DEFAULT_USER_AGENT_PREFIX_FORMAT_V2));
+        verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, null);
+        verify(builder, never()).apiCallAttemptTimeout(any());
+        verify(builder, never()).apiCallTimeout(any());
+    }
+
+    @Test
+    void testClientOverrideConfigurationUserAgentSuffix() {
+        SdkClientConfiguration clientConfiguration =
+                SdkClientConfiguration.builder()
+                        .option(SdkAdvancedClientOption.USER_AGENT_SUFFIX, "suffix")
+                        .build();
+
+        ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+        AWSAsyncSinkUtil.createClientOverrideConfiguration(
+                clientConfiguration,
+                builder,
+                formatFlinkUserAgentPrefix(
+                        DEFAULT_USER_AGENT_PREFIX_FORMAT + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
+
+        verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, "suffix");
+    }
+
+    @Test
+    void testClientOverrideConfigurationApiCallAttemptTimeout() {
+        SdkClientConfiguration clientConfiguration =
+                SdkClientConfiguration.builder()
+                        .option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT, Duration.ofMillis(500))
+                        .build();
+
+        ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+        AWSAsyncSinkUtil.createClientOverrideConfiguration(
+                clientConfiguration,
+                builder,
+                formatFlinkUserAgentPrefix(
+                        DEFAULT_USER_AGENT_PREFIX_FORMAT_V2
+                                + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
+
+        verify(builder).apiCallAttemptTimeout(Duration.ofMillis(500));
+    }
+
+    @Test
+    void testClientOverrideConfigurationApiCallTimeout() {
+        SdkClientConfiguration clientConfiguration =
+                SdkClientConfiguration.builder()
+                        .option(SdkClientOption.API_CALL_TIMEOUT, Duration.ofMillis(600))
+                        .build();
+
+        ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+        AWSAsyncSinkUtil.createClientOverrideConfiguration(
+                clientConfiguration,
+                builder,
+                formatFlinkUserAgentPrefix(
+                        DEFAULT_USER_AGENT_PREFIX_FORMAT_V2
+                                + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
+
+        verify(builder).apiCallTimeout(Duration.ofMillis(600));
+    }
+
+    private MockAsyncClientBuilder mockKinesisAsyncClientBuilder() {
+        MockAsyncClientBuilder builder = mock(MockAsyncClientBuilder.class);
+        when(builder.overrideConfiguration(any(ClientOverrideConfiguration.class)))
+                .thenReturn(builder);
+        when(builder.httpClient(any())).thenReturn(builder);
+        when(builder.credentialsProvider(any())).thenReturn(builder);
+        when(builder.region(any())).thenReturn(builder);
+
+        return builder;
+    }
+
+    private ClientOverrideConfiguration.Builder mockClientOverrideConfigurationBuilder() {
+        ClientOverrideConfiguration.Builder builder =
+                mock(ClientOverrideConfiguration.Builder.class);
+        when(builder.putAdvancedOption(any(), any())).thenReturn(builder);
+        when(builder.apiCallAttemptTimeout(any())).thenReturn(builder);
+        when(builder.apiCallTimeout(any())).thenReturn(builder);
+
+        return builder;
+    }
+
+    private static class MockAsyncClientBuilder
+            implements AwsAsyncClientBuilder<MockAsyncClientBuilder, SdkClient>,
+                    AwsClientBuilder<MockAsyncClientBuilder, SdkClient> {
+
+        @Override
+        public MockAsyncClientBuilder asyncConfiguration(
+                ClientAsyncConfiguration clientAsyncConfiguration) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder httpClient(SdkAsyncHttpClient sdkAsyncHttpClient) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder httpClientBuilder(SdkAsyncHttpClient.Builder builder) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder credentialsProvider(
+                AwsCredentialsProvider awsCredentialsProvider) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder region(Region region) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder dualstackEnabled(Boolean aBoolean) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder fipsEnabled(Boolean aBoolean) {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder overrideConfiguration(
+                ClientOverrideConfiguration clientOverrideConfiguration) {
+            return null;
+        }
+
+        @Override
+        public ClientOverrideConfiguration overrideConfiguration() {
+            return null;
+        }
+
+        @Override
+        public MockAsyncClientBuilder endpointOverride(URI uri) {
+            return null;
+        }
+
+        @Override
+        public SdkClient build() {
+            return null;
+        }
+    }
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSGeneralUtilTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSGeneralUtilTest.java
new file mode 100644
index 0000000..ca5613a
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSGeneralUtilTest.java
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.utils.AttributeMap;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.ASSUME_ROLE;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.AUTO;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.BASIC;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.WEB_IDENTITY_TOKEN;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.roleArn;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.roleSessionName;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+import static software.amazon.awssdk.http.Protocol.HTTP2;
+
+/** Tests for {@link AWSGeneralUtil}. */
+class AWSGeneralUtilTest {
+
+    @Test
+    void testGetCredentialsProviderTypeDefaultsAuto() {
+        assertThat(
+                        AWSGeneralUtil.getCredentialProviderType(
+                                new Properties(), AWS_CREDENTIALS_PROVIDER))
+                .isEqualTo(AUTO);
+    }
+
+    @Test
+    void testGetCredentialsProviderTypeBasic() {
+        Properties testConfig =
+                TestUtil.properties(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+        testConfig.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+        assertThat(AWSGeneralUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER))
+                .isEqualTo(BASIC);
+    }
+
+    @Test
+    void testGetCredentialsProviderTypeWebIdentityToken() {
+        Properties testConfig = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+        CredentialProvider type =
+                AWSGeneralUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER);
+        assertThat(type).isEqualTo(WEB_IDENTITY_TOKEN);
+    }
+
+    @Test
+    void testGetCredentialsProviderTypeAssumeRole() {
+        Properties testConfig = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");
+
+        CredentialProvider type =
+                AWSGeneralUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER);
+        assertThat(type).isEqualTo(ASSUME_ROLE);
+    }
+
+    @Test
+    void testGetCredentialsProviderEnvironmentVariables() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertThat(credentialsProvider).isInstanceOf(EnvironmentVariableCredentialsProvider.class);
+    }
+
+    @Test
+    void testGetCredentialsProviderSystemProperties() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertThat(credentialsProvider).isInstanceOf(SystemPropertyCredentialsProvider.class);
+    }
+
+    @Test
+    void testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertThat(credentialsProvider).isInstanceOf(WebIdentityTokenFileCredentialsProvider.class);
+    }
+
+    @Test
+    void testGetWebIdentityTokenFileCredentialsProvider() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+        properties.setProperty(roleArn(AWS_CREDENTIALS_PROVIDER), "roleArn");
+        properties.setProperty(roleSessionName(AWS_CREDENTIALS_PROVIDER), "roleSessionName");
+
+        WebIdentityTokenFileCredentialsProvider.Builder builder =
+                mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+        AWSGeneralUtil.getWebIdentityTokenFileCredentialsProvider(
+                builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+        verify(builder).roleArn("roleArn");
+        verify(builder).roleSessionName("roleSessionName");
+        verify(builder, never()).webIdentityTokenFile(any());
+    }
+
+    @Test
+    void testGetWebIdentityTokenFileCredentialsProviderWithWebIdentityFile() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+        properties.setProperty(
+                webIdentityTokenFile(AWS_CREDENTIALS_PROVIDER), "webIdentityTokenFile");
+
+        WebIdentityTokenFileCredentialsProvider.Builder builder =
+                mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+        AWSGeneralUtil.getWebIdentityTokenFileCredentialsProvider(
+                builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+        verify(builder).webIdentityTokenFile(Paths.get("webIdentityTokenFile"));
+    }
+
+    @Test
+    void testGetCredentialsProviderAuto() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "AUTO");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertThat(credentialsProvider).isInstanceOf(DefaultCredentialsProvider.class);
+    }
+
+    @Test
+    void testGetCredentialsProviderFromMap() {
+        Map<String, Object> config = ImmutableMap.of(AWS_CREDENTIALS_PROVIDER, "AUTO");
+
+        AwsCredentialsProvider credentialsProvider = AWSGeneralUtil.getCredentialsProvider(config);
+
+        assertThat(credentialsProvider).isInstanceOf(DefaultCredentialsProvider.class);
+    }
+
+    @Test
+    void testGetCredentialsProviderAssumeRole() {
+        Properties properties = spy(TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE"));
+        properties.setProperty(AWS_REGION, "eu-west-2");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertThat(credentialsProvider).isInstanceOf(StsAssumeRoleCredentialsProvider.class);
+
+        verify(properties).getProperty(AWSConfigConstants.roleArn(AWS_CREDENTIALS_PROVIDER));
+        verify(properties)
+                .getProperty(AWSConfigConstants.roleSessionName(AWS_CREDENTIALS_PROVIDER));
+        verify(properties).getProperty(AWSConfigConstants.externalId(AWS_CREDENTIALS_PROVIDER));
+        verify(properties).getProperty(AWS_REGION);
+    }
+
+    @Test
+    void testGetCredentialsProviderBasic() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "BASIC");
+        properties.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+        properties.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+        AwsCredentials credentials =
+                AWSGeneralUtil.getCredentialsProvider(properties).resolveCredentials();
+
+        assertThat(credentials.accessKeyId()).isEqualTo("ak");
+        assertThat(credentials.secretAccessKey()).isEqualTo("sk");
+    }
+
+    @Test
+    void testGetCredentialsProviderProfile() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+        properties.put(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "default");
+        properties.put(
+                AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER),
+                "src/test/resources/profile");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertThat(credentialsProvider).isInstanceOf(ProfileCredentialsProvider.class);
+
+        AwsCredentials credentials = credentialsProvider.resolveCredentials();
+        assertThat(credentials.accessKeyId()).isEqualTo("11111111111111111111");
+        assertThat(credentials.secretAccessKey())
+                .isEqualTo("wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111");
+    }
+
+    @Test
+    void testGetCredentialsProviderNamedProfile() {
+        Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+        properties.setProperty(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "foo");
+        properties.setProperty(
+                AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER),
+                "src/test/resources/profile");
+
+        AwsCredentialsProvider credentialsProvider =
+                AWSGeneralUtil.getCredentialsProvider(properties);
+
+        assertThat(credentialsProvider).isInstanceOf(ProfileCredentialsProvider.class);
+
+        AwsCredentials credentials = credentialsProvider.resolveCredentials();
+        assertThat(credentials.accessKeyId()).isEqualTo("22222222222222222222");
+        assertThat(credentials.secretAccessKey())
+                .isEqualTo("wJalrXUtnFEMI/K7MDENG/bPxRfiCY2222222222");
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithPropertyTcpKeepAlive() throws Exception {
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(new Properties());
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.tcpKeepAlive()).isTrue();
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithPropertyMaxConcurrency() throws Exception {
+        int maxConnections = 45678;
+        Properties properties = new Properties();
+        properties.setProperty(
+                AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY, String.valueOf(maxConnections));
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(properties);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.maxConnections()).isEqualTo(maxConnections);
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithPropertyReadTimeout() throws Exception {
+        int readTimeoutMillis = 45678;
+        Properties properties = new Properties();
+        properties.setProperty(
+                AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS,
+                String.valueOf(readTimeoutMillis));
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(properties);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.readTimeoutMillis()).isEqualTo(readTimeoutMillis);
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithPropertyTrustAllCertificates() throws Exception {
+        boolean trustAllCerts = true;
+        Properties properties = new Properties();
+        properties.setProperty(
+                AWSConfigConstants.TRUST_ALL_CERTIFICATES, String.valueOf(trustAllCerts));
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(properties);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.trustAllCertificates()).isEqualTo(trustAllCerts);
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithPropertyProtocol() throws Exception {
+        Protocol httpVersion = HTTP1_1;
+        Properties properties = new Properties();
+        properties.setProperty(
+                AWSConfigConstants.HTTP_PROTOCOL_VERSION, String.valueOf(httpVersion));
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(properties);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.attribute(SdkHttpConfigurationOption.PROTOCOL))
+                .isEqualTo(httpVersion);
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsConnectionAcquireTimeout() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.connectionAcquireTimeoutMillis()).isEqualTo(60_000);
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsConnectionTtl() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertThat(nettyConfiguration.connectionTtlMillis())
+                .isEqualTo(nettyDefaultConfiguration.connectionTtlMillis());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsConnectionTimeout() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertThat(nettyDefaultConfiguration.connectTimeoutMillis())
+                .isEqualTo(nettyConfiguration.connectTimeoutMillis());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsIdleTimeout() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertThat(nettyConfiguration.idleTimeoutMillis())
+                .isEqualTo(nettyDefaultConfiguration.idleTimeoutMillis());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsMaxConnections() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.maxConnections()).isEqualTo(10_000);
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsMaxPendingConnectionAcquires() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertThat(nettyConfiguration.maxPendingConnectionAcquires())
+                .isEqualTo(nettyDefaultConfiguration.maxPendingConnectionAcquires());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsReadTimeout() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.readTimeoutMillis()).isEqualTo(360_000);
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsReapIdleConnections() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertThat(nettyConfiguration.reapIdleConnections())
+                .isEqualTo(nettyDefaultConfiguration.reapIdleConnections());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsTcpKeepAlive() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertThat(nettyConfiguration.tcpKeepAlive())
+                .isEqualTo(nettyDefaultConfiguration.tcpKeepAlive());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsTlsKeyManagersProvider() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertThat(nettyConfiguration.tlsKeyManagersProvider())
+                .isEqualTo(nettyDefaultConfiguration.tlsKeyManagersProvider());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsTlsTrustManagersProvider() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertThat(nettyConfiguration.tlsTrustManagersProvider())
+                .isEqualTo(nettyDefaultConfiguration.tlsTrustManagersProvider());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsTrustAllCertificates() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.trustAllCertificates()).isFalse();
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsWriteTimeout() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+        NettyConfiguration nettyDefaultConfiguration =
+                TestUtil.getNettyConfiguration(httpDefaultClient);
+
+        assertThat(nettyConfiguration.writeTimeoutMillis())
+                .isEqualTo(nettyDefaultConfiguration.writeTimeoutMillis());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWithDefaultsProtocol() throws Exception {
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+        SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.attribute(SdkHttpConfigurationOption.PROTOCOL))
+                .isEqualTo(HTTP2);
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientReadTimeout() throws Exception {
+        Duration readTimeout = Duration.ofMillis(1234);
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.READ_TIMEOUT, readTimeout)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.readTimeoutMillis()).isEqualTo(readTimeout.toMillis());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientTcpKeepAlive() throws Exception {
+        boolean tcpKeepAlive = true;
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.TCP_KEEPALIVE, tcpKeepAlive)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.tcpKeepAlive()).isEqualTo(tcpKeepAlive);
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientConnectionTimeout() throws Exception {
+        Duration connectionTimeout = Duration.ofMillis(1000);
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, connectionTimeout)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.connectTimeoutMillis())
+                .isEqualTo(connectionTimeout.toMillis());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientMaxConcurrency() throws Exception {
+        int maxConnections = 123;
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, maxConnections)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.maxConnections()).isEqualTo(maxConnections);
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientWriteTimeout() throws Exception {
+        Duration writeTimeout = Duration.ofMillis(3000);
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.WRITE_TIMEOUT, writeTimeout)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.writeTimeoutMillis()).isEqualTo(writeTimeout.toMillis());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientConnectionMaxIdleTime() throws Exception {
+        Duration maxIdleTime = Duration.ofMillis(2000);
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, maxIdleTime)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.idleTimeoutMillis()).isEqualTo(maxIdleTime.toMillis());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientIdleConnectionReaper() throws Exception {
+        boolean reapIdleConnections = false;
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.REAP_IDLE_CONNECTIONS, reapIdleConnections)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.reapIdleConnections()).isEqualTo(reapIdleConnections);
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientIdleConnectionTtl() throws Exception {
+        Duration connectionTtl = Duration.ofMillis(5000);
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE, connectionTtl)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.connectionTtlMillis()).isEqualTo(connectionTtl.toMillis());
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientTrustAllCertificates() throws Exception {
+        boolean trustAllCertificates = true;
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(
+                                SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
+                                trustAllCertificates)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.trustAllCertificates()).isEqualTo(trustAllCertificates);
+    }
+
+    @Test
+    void testCreateNettyAsyncHttpClientHttpVersion() throws Exception {
+        Protocol httpVersion = HTTP1_1;
+
+        AttributeMap clientConfiguration =
+                AttributeMap.builder()
+                        .put(SdkHttpConfigurationOption.PROTOCOL, httpVersion)
+                        .build();
+
+        NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+        SdkAsyncHttpClient httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+        NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+        assertThat(nettyConfiguration.attribute(SdkHttpConfigurationOption.PROTOCOL))
+                .isEqualTo(httpVersion);
+    }
+
+    @Test
+    void testGetRegion() {
+        Region region = AWSGeneralUtil.getRegion(TestUtil.properties(AWS_REGION, "eu-west-2"));
+
+        assertThat(region).isEqualTo(Region.EU_WEST_2);
+    }
+
+    @Test
+    void testValidRegion() {
+        assertThat(AWSGeneralUtil.isValidRegion(Region.of("us-east-1"))).isTrue();
+        assertThat(AWSGeneralUtil.isValidRegion(Region.of("us-gov-west-1"))).isTrue();
+        assertThat(AWSGeneralUtil.isValidRegion(Region.of("us-isob-east-1"))).isTrue();
+        assertThat(AWSGeneralUtil.isValidRegion(Region.of("aws-global"))).isTrue();
+        assertThat(AWSGeneralUtil.isValidRegion(Region.of("aws-iso-global"))).isTrue();
+        assertThat(AWSGeneralUtil.isValidRegion(Region.of("aws-iso-b-global"))).isTrue();
+    }
+
+    @Test
+    void testInvalidRegion() {
+        assertThat(AWSGeneralUtil.isValidRegion(Region.of("unstructured-string"))).isFalse();
+    }
+
+    @Test
+    void testUnrecognizableAwsRegionInConfig() {
+
+        Properties testConfig = TestUtil.properties(AWSConfigConstants.AWS_REGION, "wrongRegionId");
+        testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+        testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+        assertThatThrownBy(() -> AWSGeneralUtil.validateAwsConfiguration(testConfig))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Invalid AWS region");
+    }
+
+    @Test
+    void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
+        Properties testConfig = TestUtil.properties(AWSConfigConstants.AWS_REGION, "us-east-1");
+        testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+
+        assertThatThrownBy(() -> AWSGeneralUtil.validateAwsConfiguration(testConfig))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Please set values for AWS Access Key ID ('"
+                                + AWSConfigConstants.AWS_ACCESS_KEY_ID
+                                + "') "
+                                + "and Secret Key ('"
+                                + AWSConfigConstants.AWS_SECRET_ACCESS_KEY
+                                + "') when using the BASIC AWS credential provider type.");
+    }
+
+    @Test
+    void testUnrecognizableCredentialProviderTypeInConfig() {
+        Properties testConfig = TestUtil.getStandardProperties();
+        testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
+
+        assertThatThrownBy(() -> AWSGeneralUtil.validateAwsConfiguration(testConfig))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Invalid AWS Credential Provider Type");
+    }
+
+    @Test
+    void testMissingWebIdentityTokenFileInCredentials() {
+        Properties properties = TestUtil.getStandardProperties();
+        properties.setProperty(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+        assertThatThrownBy(() -> AWSGeneralUtil.validateAwsCredentials(properties))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage(
+                        "Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set.");
+    }
+
+    @Test
+    void testMissingEnvironmentVariableCredentials() {
+        Properties properties = TestUtil.getStandardProperties();
+        properties.setProperty(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+        assertThatThrownBy(() -> AWSGeneralUtil.validateAwsCredentials(properties))
+                .isInstanceOf(SdkClientException.class)
+                .hasMessageContaining(
+                        "Access key must be specified either via environment variable");
+    }
+
+    @Test
+    void testFailedSystemPropertiesCredentialsValidationsOnMissingAccessKey() {
+        Properties properties = TestUtil.getStandardProperties();
+        properties.setProperty(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+        assertThatThrownBy(() -> AWSGeneralUtil.validateAwsCredentials(properties))
+                .isInstanceOf(SdkClientException.class)
+                .hasMessageContaining(
+                        "Access key must be specified either via environment variable (AWS_ACCESS_KEY_ID) or system property (aws.accessKeyId)");
+    }
+
+    @Test
+    void testFailedSystemPropertiesCredentialsValidationsOnMissingSecretKey() {
+        System.setProperty("aws.accessKeyId", "accesKeyId");
+        Properties properties = TestUtil.getStandardProperties();
+        properties.setProperty(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+        assertThatThrownBy(() -> AWSGeneralUtil.validateAwsCredentials(properties))
+                .isInstanceOf(SdkClientException.class)
+                .hasMessageContaining(
+                        "Secret key must be specified either via environment variable (AWS_SECRET_ACCESS_KEY) or system property (aws.secretAccessKey)");
+    }
+
+    private WebIdentityTokenFileCredentialsProvider.Builder
+            mockWebIdentityTokenFileCredentialsProviderBuilder() {
+        WebIdentityTokenFileCredentialsProvider.Builder builder =
+                mock(WebIdentityTokenFileCredentialsProvider.Builder.class);
+        when(builder.roleArn(any())).thenReturn(builder);
+        when(builder.roleSessionName(any())).thenReturn(builder);
+        when(builder.webIdentityTokenFile(any())).thenReturn(builder);
+
+        return builder;
+    }
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/TestUtil.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/TestUtil.java
new file mode 100644
index 0000000..cc52d17
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/TestUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+
+/** Utilities for tests in the package. */
+public class TestUtil {
+    public static Properties properties(final String key, final String value) {
+        Properties properties = new Properties();
+        properties.setProperty(key, value);
+        return properties;
+    }
+
+    public static Properties getStandardProperties() {
+        Properties config = properties(AWSConfigConstants.AWS_REGION, "us-east-1");
+        config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+        config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+        return config;
+    }
+
+    public static NettyConfiguration getNettyConfiguration(final SdkAsyncHttpClient httpClient)
+            throws Exception {
+        return getField("configuration", httpClient);
+    }
+
+    public static <T> T getField(String fieldName, Object obj) throws Exception {
+        Field field = obj.getClass().getDeclaredField(fieldName);
+        field.setAccessible(true);
+        return (T) field.get(obj);
+    }
+}
diff --git a/flink-connector-aws-base/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/flink-connector-aws-base/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 0000000..2899913
--- /dev/null
+++ b/flink-connector-aws-base/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension
\ No newline at end of file
diff --git a/flink-connector-aws-base/src/test/resources/archunit.properties b/flink-connector-aws-base/src/test/resources/archunit.properties
new file mode 100644
index 0000000..15be88c
--- /dev/null
+++ b/flink-connector-aws-base/src/test/resources/archunit.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# By default we allow removing existing violations, but fail when new violations are added.
+freeze.store.default.allowStoreUpdate=true
+
+# Enable this if a new (frozen) rule has been added in order to create the initial store and record the existing violations.
+#freeze.store.default.allowStoreCreation=true
+
+# Enable this to add allow new violations to be recorded.
+# NOTE: Adding new violations should be avoided when possible. If the rule was correct to flag a new
+#       violation, please try to avoid creating the violation. If the violation was created due to a
+#       shortcoming of the rule, file a JIRA issue so the rule can be improved.
+#freeze.refreeze=true
+
+freeze.store.default.path=archunit-violations
diff --git a/flink-connector-aws-base/src/test/resources/log4j2-test.properties b/flink-connector-aws-base/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..c4fa187
--- /dev/null
+++ b/flink-connector-aws-base/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connector-aws-base/src/test/resources/profile b/flink-connector-aws-base/src/test/resources/profile
new file mode 100644
index 0000000..2573fd6
--- /dev/null
+++ b/flink-connector-aws-base/src/test/resources/profile
@@ -0,0 +1,7 @@
+[default]
+aws_access_key_id=11111111111111111111
+aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111
+
+[foo]
+aws_access_key_id=22222222222222222222
+aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCY2222222222
diff --git a/pom.xml b/pom.xml
index fbf9545..0dc0342 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,17 +62,13 @@ under the License.
         <assertj.version>3.21.0</assertj.version>
         <archunit.version>0.22.0</archunit.version>
         <testcontainers.version>1.17.2</testcontainers.version>
-        <mockito.version>2.21.0</mockito.version>
-
-        <!-- For dependency convergence -->
-        <kryo.version>2.24.0</kryo.version>
-        <slf4j.version>1.7.36</slf4j.version>
-        <snappy.version>1.1.8.3</snappy.version>
+        <mockito.version>3.4.6</mockito.version>
 
         <flink.parent.artifactId>flink-connector-aws-parent</flink.parent.artifactId>
     </properties>
 
     <modules>
+        <module>flink-connector-aws-base</module>
         <module>flink-connector-dynamodb</module>
         <module>flink-sql-connector-dynamodb</module>
     </modules>
@@ -102,6 +98,20 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <type>jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-test-utils</artifactId>
@@ -141,6 +151,13 @@ under the License.
                 <scope>test</scope>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-architecture-tests-test</artifactId>
+                <version>${flink.version}</version>
+                <scope>test</scope>
+            </dependency>
+
             <dependency>
                 <groupId>org.junit</groupId>
                 <artifactId>junit-bom</artifactId>
@@ -168,23 +185,28 @@ under the License.
             <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-api</artifactId>
-                <version>${slf4j.version}</version>
+                <version>1.7.36</version>
             </dependency>
             <dependency>
                 <groupId>com.esotericsoftware.kryo</groupId>
                 <artifactId>kryo</artifactId>
-                <version>${kryo.version}</version>
+                <version>2.24.0</version>
             </dependency>
             <dependency>
                 <groupId>org.xerial.snappy</groupId>
                 <artifactId>snappy-java</artifactId>
-                <version>${snappy.version}</version>
+                <version>1.1.8.3</version>
             </dependency>
             <dependency>
                 <groupId>com.google.code.findbugs</groupId>
                 <artifactId>jsr305</artifactId>
                 <version>1.3.9</version>
             </dependency>
+            <dependency>
+                <groupId>org.objenesis</groupId>
+                <artifactId>objenesis</artifactId>
+                <version>2.1</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>