You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/12/03 18:59:54 UTC
[flink-connector-aws] 02/08: [FLINK-29907][Connectors/AWS] Externalize AWS Base from Flink repo
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
commit 89d4a559c3219b3c1b4e7d6bc2847c4f34c8105e
Author: Danny Cranmer <da...@apache.org>
AuthorDate: Fri Dec 2 09:25:34 2022 +0000
[FLINK-29907][Connectors/AWS] Externalize AWS Base from Flink repo
---
.gitignore | 3 +-
.../33252236-fc9f-4f63-b537-39e2322f7ccd | 0
.../733a854b-2487-43da-a5fa-9b089af5fb4e | 0
.../archunit-violations/stored.rules | 4 +
flink-connector-aws-base/pom.xml | 110 +++
.../connector/aws/config/AWSConfigConstants.java | 176 +++++
.../connector/aws/table/util/AWSOptionUtils.java | 87 +++
.../aws/table/util/AsyncClientOptionsUtils.java | 109 +++
.../flink/connector/aws/util/AWSAsyncSinkUtil.java | 164 +++++
.../aws/util/AWSAuthenticationException.java | 38 +
.../AWSCredentialFatalExceptionClassifiers.java | 43 ++
.../flink/connector/aws/util/AWSGeneralUtil.java | 396 +++++++++++
.../src/main/resources/log4j2.properties | 25 +
.../architecture/TestCodeArchitectureTest.java | 40 ++
.../aws/table/util/AWSOptionsUtilTest.java | 137 ++++
.../table/util/AsyncClientOptionsUtilsTest.java | 146 ++++
.../aws/testutils/AWSServicesTestUtils.java | 146 ++++
.../aws/testutils/LocalstackContainer.java | 85 +++
.../connector/aws/util/AWSAsyncSinkUtilTest.java | 253 +++++++
.../connector/aws/util/AWSGeneralUtilTest.java | 792 +++++++++++++++++++++
.../apache/flink/connector/aws/util/TestUtil.java | 54 ++
.../org.junit.jupiter.api.extension.Extension | 16 +
.../src/test/resources/archunit.properties | 31 +
.../src/test/resources/log4j2-test.properties | 28 +
.../src/test/resources/profile | 7 +
pom.xml | 40 +-
26 files changed, 2920 insertions(+), 10 deletions(-)
diff --git a/.gitignore b/.gitignore
index 5f0068c..973e8d5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -35,4 +35,5 @@ out/
tools/flink
tools/flink-*
tools/releasing/release
-tools/japicmp-output
\ No newline at end of file
+tools/japicmp-output
+*/.idea/
\ No newline at end of file
diff --git a/flink-connector-aws-base/archunit-violations/33252236-fc9f-4f63-b537-39e2322f7ccd b/flink-connector-aws-base/archunit-violations/33252236-fc9f-4f63-b537-39e2322f7ccd
new file mode 100644
index 0000000..e69de29
diff --git a/flink-connector-aws-base/archunit-violations/733a854b-2487-43da-a5fa-9b089af5fb4e b/flink-connector-aws-base/archunit-violations/733a854b-2487-43da-a5fa-9b089af5fb4e
new file mode 100644
index 0000000..e69de29
diff --git a/flink-connector-aws-base/archunit-violations/stored.rules b/flink-connector-aws-base/archunit-violations/stored.rules
new file mode 100644
index 0000000..ef2e628
--- /dev/null
+++ b/flink-connector-aws-base/archunit-violations/stored.rules
@@ -0,0 +1,4 @@
+#
+#Tue Feb 22 12:16:40 CET 2022
+Tests\ inheriting\ from\ AbstractTestBase\ should\ have\ name\ ending\ with\ ITCase=733a854b-2487-43da-a5fa-9b089af5fb4e
+ITCASE\ tests\ should\ use\ a\ MiniCluster\ resource\ or\ extension=33252236-fc9f-4f63-b537-39e2322f7ccd
diff --git a/flink-connector-aws-base/pom.xml b/flink-connector-aws-base/pom.xml
new file mode 100644
index 0000000..0249d61
--- /dev/null
+++ b/flink-connector-aws-base/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-aws-parent</artifactId>
+ <version>4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-connector-aws-base</artifactId>
+ <name>Flink : Connectors : AWS : Base</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>netty-nio-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sts</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>iam</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- ArchUit test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-architecture-tests-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <configuration>
+ <excludes>
+ <!-- test-jar is still used by JUnit4 modules -->
+ <exclude>META-INF/services/org.junit.jupiter.api.extension.Extension</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java
new file mode 100644
index 0000000..b244b2c
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+
+/** Configuration keys for AWS service usage. */
+@PublicEvolving
+public class AWSConfigConstants {
+
+ /**
+ * Possible configuration values for the type of credential provider to use when accessing AWS.
+ * Internally, a corresponding implementation of {@link AwsCredentialsProvider} will be used.
+ */
+ public enum CredentialProvider {
+
+ /**
+ * Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create
+ * AWS credentials.
+ */
+ ENV_VAR,
+
+ /**
+ * Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS
+ * credentials.
+ */
+ SYS_PROP,
+
+ /** Use a AWS credentials profile file to create the AWS credentials. */
+ PROFILE,
+
+ /**
+ * Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in
+ * the configuration properties.
+ */
+ BASIC,
+
+ /**
+ * Create AWS credentials by assuming a role. The credentials for assuming the role must be
+ * supplied. *
+ */
+ ASSUME_ROLE,
+
+ /**
+ * Use AWS WebIdentityToken in order to assume a role. A token file and role details can be
+ * supplied as configuration or environment variables. *
+ */
+ WEB_IDENTITY_TOKEN,
+
+ /**
+ * A credentials provider chain will be used that searches for credentials in this order:
+ * ENV_VARS, SYS_PROPS, WEB_IDENTITY_TOKEN, PROFILE in the AWS instance metadata. *
+ */
+ AUTO,
+ }
+
+ /** The AWS region of the service ("us-east-1" is used if not set). */
+ public static final String AWS_REGION = "aws.region";
+
+ /**
+ * The credential provider type to use when AWS credentials are required (BASIC is used if not
+ * set).
+ */
+ public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider";
+
+ /** The AWS access key ID to use when setting credentials provider type to BASIC. */
+ public static final String AWS_ACCESS_KEY_ID = accessKeyId(AWS_CREDENTIALS_PROVIDER);
+
+ /** The AWS secret key to use when setting credentials provider type to BASIC. */
+ public static final String AWS_SECRET_ACCESS_KEY = secretKey(AWS_CREDENTIALS_PROVIDER);
+
+ /** Optional configuration for profile path if credential provider type is set to be PROFILE. */
+ public static final String AWS_PROFILE_PATH = profilePath(AWS_CREDENTIALS_PROVIDER);
+
+ /** Optional configuration for profile name if credential provider type is set to be PROFILE. */
+ public static final String AWS_PROFILE_NAME = profileName(AWS_CREDENTIALS_PROVIDER);
+
+ /**
+ * The role ARN to use when credential provider type is set to ASSUME_ROLE or
+ * WEB_IDENTITY_TOKEN.
+ */
+ public static final String AWS_ROLE_ARN = roleArn(AWS_CREDENTIALS_PROVIDER);
+
+ /**
+ * The role session name to use when credential provider type is set to ASSUME_ROLE or
+ * WEB_IDENTITY_TOKEN.
+ */
+ public static final String AWS_ROLE_SESSION_NAME = roleSessionName(AWS_CREDENTIALS_PROVIDER);
+
+ /** The external ID to use when credential provider type is set to ASSUME_ROLE. */
+ public static final String AWS_ROLE_EXTERNAL_ID = externalId(AWS_CREDENTIALS_PROVIDER);
+
+ /**
+ * The absolute path to the web identity token file that should be used if provider type is set
+ * to WEB_IDENTITY_TOKEN.
+ */
+ public static final String AWS_WEB_IDENTITY_TOKEN_FILE =
+ webIdentityTokenFile(AWS_CREDENTIALS_PROVIDER);
+
+ /**
+ * The credentials provider that provides credentials for assuming the role when credential
+ * provider type is set to ASSUME_ROLE. Roles can be nested, so AWS_ROLE_CREDENTIALS_PROVIDER
+ * can again be set to "ASSUME_ROLE"
+ */
+ public static final String AWS_ROLE_CREDENTIALS_PROVIDER =
+ roleCredentialsProvider(AWS_CREDENTIALS_PROVIDER);
+
+ /** The AWS endpoint for the service (derived from the AWS region setting if not set). */
+ public static final String AWS_ENDPOINT = "aws.endpoint";
+
+ /** Whether to trust all SSL certificates. */
+ public static final String TRUST_ALL_CERTIFICATES = "aws.trust.all.certificates";
+
+ /** The HTTP protocol version to use. */
+ public static final String HTTP_PROTOCOL_VERSION = "aws.http.protocol.version";
+
+ /** Maximum request concurrency for {@link SdkAsyncHttpClient}. */
+ public static final String HTTP_CLIENT_MAX_CONCURRENCY = "aws.http-client.max-concurrency";
+
+ /** Read Request timeout for {@link SdkAsyncHttpClient}. */
+ public static final String HTTP_CLIENT_READ_TIMEOUT_MILLIS = "aws.http-client.read-timeout";
+
+ public static String accessKeyId(String prefix) {
+ return prefix + ".basic.accesskeyid";
+ }
+
+ public static String secretKey(String prefix) {
+ return prefix + ".basic.secretkey";
+ }
+
+ public static String profilePath(String prefix) {
+ return prefix + ".profile.path";
+ }
+
+ public static String profileName(String prefix) {
+ return prefix + ".profile.name";
+ }
+
+ public static String roleArn(String prefix) {
+ return prefix + ".role.arn";
+ }
+
+ public static String roleSessionName(String prefix) {
+ return prefix + ".role.sessionName";
+ }
+
+ public static String externalId(String prefix) {
+ return prefix + ".role.externalId";
+ }
+
+ public static String roleCredentialsProvider(String prefix) {
+ return prefix + ".role.provider";
+ }
+
+ public static String webIdentityTokenFile(String prefix) {
+ return prefix + ".webIdentityToken.file";
+ }
+}
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/AWSOptionUtils.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/AWSOptionUtils.java
new file mode 100644
index 0000000..c9a7f58
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/AWSOptionUtils.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.table.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.table.options.ConfigurationValidator;
+import org.apache.flink.connector.base.table.options.TableOptionsUtils;
+import org.apache.flink.connector.base.table.util.ConfigurationValidatorUtil;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/** Handler for AWS specific table options. */
+@PublicEvolving
+public class AWSOptionUtils implements TableOptionsUtils, ConfigurationValidator {
+ /** Prefix for properties defined in {@link AWSConfigConstants}. */
+ public static final String AWS_PROPERTIES_PREFIX = "aws.";
+
+ private final Map<String, String> resolvedOptions;
+
+ public AWSOptionUtils(Map<String, String> resolvedOptions) {
+ this.resolvedOptions = resolvedOptions;
+ }
+
+ @Override
+ public Map<String, String> getProcessedResolvedOptions() {
+ Map<String, String> mappedResolvedOptions = new HashMap<>();
+ for (String key : resolvedOptions.keySet()) {
+ if (key.startsWith(AWS_PROPERTIES_PREFIX)) {
+ mappedResolvedOptions.put(translateAwsKey(key), resolvedOptions.get(key));
+ }
+ }
+ return mappedResolvedOptions;
+ }
+
+ @Override
+ public List<String> getNonValidatedPrefixes() {
+ return Collections.singletonList(AWS_PROPERTIES_PREFIX);
+ }
+
+ @Override
+ public Properties getValidatedConfigurations() {
+ Properties awsConfigurations = new Properties();
+ Map<String, String> mappedProperties = getProcessedResolvedOptions();
+ for (Map.Entry<String, String> entry : mappedProperties.entrySet()) {
+ awsConfigurations.setProperty(entry.getKey(), entry.getValue());
+ }
+ AWSGeneralUtil.validateAwsConfiguration(awsConfigurations);
+ ConfigurationValidatorUtil.validateOptionalBooleanProperty(
+ awsConfigurations,
+ AWSConfigConstants.TRUST_ALL_CERTIFICATES,
+ String.format(
+ "Invalid %s value, must be a boolean.",
+ AWSConfigConstants.TRUST_ALL_CERTIFICATES));
+ return awsConfigurations;
+ }
+
+ /** Map {@code scan.foo.bar} to {@code flink.foo.bar}. */
+ private static String translateAwsKey(String key) {
+ if (!key.endsWith("credentials.provider")) {
+ return key.replace("credentials.", "credentials.provider.");
+ } else {
+ return key;
+ }
+ }
+}
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtils.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtils.java
new file mode 100644
index 0000000..b347a01
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.table.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.base.table.util.ConfigurationValidatorUtil;
+
+import software.amazon.awssdk.http.Protocol;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/** Class for handling Kinesis async client specific options. */
+@PublicEvolving
+public class AsyncClientOptionsUtils extends AWSOptionUtils {
+ /** Prefix for properties defined in {@link AWSConfigConstants}. */
+ public static final String SINK_CLIENT_PREFIX = "sink.http-client.";
+
+ private static final String CLIENT_MAX_CONCURRENCY_OPTION = "max-concurrency";
+ private static final String CLIENT_MAX_TIMEOUT_OPTION = "read-timeout";
+ private static final String CLIENT_HTTP_PROTOCOL_VERSION_OPTION = "protocol.version";
+
+ private final Map<String, String> resolvedOptions;
+
+ public AsyncClientOptionsUtils(Map<String, String> resolvedOptions) {
+ super(resolvedOptions);
+ this.resolvedOptions = resolvedOptions;
+ }
+
+ @Override
+ public Map<String, String> getProcessedResolvedOptions() {
+ Map<String, String> mappedResolvedOptions = super.getProcessedResolvedOptions();
+ for (String key : resolvedOptions.keySet()) {
+ if (key.startsWith(SINK_CLIENT_PREFIX)) {
+ mappedResolvedOptions.put(translateClientKeys(key), resolvedOptions.get(key));
+ }
+ }
+ return mappedResolvedOptions;
+ }
+
+ @Override
+ public List<String> getNonValidatedPrefixes() {
+ return Arrays.asList(AWS_PROPERTIES_PREFIX, SINK_CLIENT_PREFIX);
+ }
+
+ @Override
+ public Properties getValidatedConfigurations() {
+ Properties clientConfigurations = super.getValidatedConfigurations();
+ clientConfigurations.putAll(getProcessedResolvedOptions());
+ validatedConfigurations(clientConfigurations);
+ return clientConfigurations;
+ }
+
+ private static String translateClientKeys(String key) {
+ String truncatedKey = key.substring(SINK_CLIENT_PREFIX.length());
+ switch (truncatedKey) {
+ case CLIENT_MAX_CONCURRENCY_OPTION:
+ return AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY;
+ case CLIENT_MAX_TIMEOUT_OPTION:
+ return AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS;
+ case CLIENT_HTTP_PROTOCOL_VERSION_OPTION:
+ return AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+ default:
+ return truncatedKey;
+ }
+ }
+
+ private void validatedConfigurations(Properties config) {
+ ConfigurationValidatorUtil.validateOptionalPositiveIntProperty(
+ config,
+ AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY,
+ "Invalid value given for HTTP client max concurrency. Must be positive integer.");
+ ConfigurationValidatorUtil.validateOptionalPositiveIntProperty(
+ config,
+ AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS,
+ "Invalid value given for HTTP read timeout. Must be positive integer.");
+ validateOptionalHttpProtocolProperty(config);
+ }
+
+ private void validateOptionalHttpProtocolProperty(Properties config) {
+ if (config.containsKey(AWSConfigConstants.HTTP_PROTOCOL_VERSION)) {
+ try {
+ Protocol.valueOf(config.getProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ "Invalid value given for HTTP protocol. Must be HTTP1_1 or HTTP2.");
+ }
+ }
+ }
+}
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java
new file mode 100644
index 0000000..1256142
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtil.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
+import software.amazon.awssdk.core.client.config.SdkClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+
+import java.net.URI;
+import java.util.Optional;
+import java.util.Properties;
+
+/** Some utilities specific to Amazon Web Service. */
+@Internal
+public class AWSAsyncSinkUtil extends AWSGeneralUtil {
+
+ /** V2 suffix to denote the unified sinks. V1 sinks are based on KPL etc. */
+ static final String V2_USER_AGENT_SUFFIX = " V2";
+
+ /**
+ * Creates a user agent prefix for Flink. This can be used by HTTP Clients.
+ *
+ * @param userAgentFormat flink user agent prefix format with placeholders for version and
+ * commit id.
+ * @return a user agent prefix for Flink
+ */
+ public static String formatFlinkUserAgentPrefix(String userAgentFormat) {
+ return String.format(
+ userAgentFormat,
+ EnvironmentInformation.getVersion(),
+ EnvironmentInformation.getRevisionInformation().commitId);
+ }
+
+ /**
+ * @param configProps configuration properties
+ * @param httpClient the underlying HTTP client used to talk to AWS
+ * @return a new AWS Client
+ */
+ public static <
+ S extends SdkClient,
+ T extends
+ AwsAsyncClientBuilder<? extends T, S>
+ & AwsClientBuilder<? extends T, S>>
+ S createAwsAsyncClient(
+ final Properties configProps,
+ final SdkAsyncHttpClient httpClient,
+ final T clientBuilder,
+ final String awsUserAgentPrefixFormat,
+ final String awsClientUserAgentPrefix) {
+ SdkClientConfiguration clientConfiguration = SdkClientConfiguration.builder().build();
+ return createAwsAsyncClient(
+ configProps,
+ clientConfiguration,
+ httpClient,
+ clientBuilder,
+ awsUserAgentPrefixFormat,
+ awsClientUserAgentPrefix);
+ }
+
+ /**
+ * @param configProps configuration properties
+ * @param clientConfiguration the AWS SDK v2 config to instantiate the client
+ * @param httpClient the underlying HTTP client used to talk to AWS
+ * @return a new AWS Client
+ */
+ public static <
+ S extends SdkClient,
+ T extends
+ AwsAsyncClientBuilder<? extends T, S>
+ & AwsClientBuilder<? extends T, S>>
+ S createAwsAsyncClient(
+ final Properties configProps,
+ final SdkClientConfiguration clientConfiguration,
+ final SdkAsyncHttpClient httpClient,
+ final T clientBuilder,
+ final String awsUserAgentPrefixFormat,
+ final String awsClientUserAgentPrefix) {
+ String flinkUserAgentPrefix =
+ Optional.ofNullable(configProps.getProperty(awsClientUserAgentPrefix))
+ .orElse(
+ formatFlinkUserAgentPrefix(
+ awsUserAgentPrefixFormat + V2_USER_AGENT_SUFFIX));
+
+ final ClientOverrideConfiguration overrideConfiguration =
+ createClientOverrideConfiguration(
+ clientConfiguration,
+ ClientOverrideConfiguration.builder(),
+ flinkUserAgentPrefix);
+
+ return createAwsAsyncClient(configProps, clientBuilder, httpClient, overrideConfiguration);
+ }
+
+ @VisibleForTesting
+ static ClientOverrideConfiguration createClientOverrideConfiguration(
+ final SdkClientConfiguration config,
+ final ClientOverrideConfiguration.Builder overrideConfigurationBuilder,
+ String flinkUserAgentPrefix) {
+
+ overrideConfigurationBuilder
+ .putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, flinkUserAgentPrefix)
+ .putAdvancedOption(
+ SdkAdvancedClientOption.USER_AGENT_SUFFIX,
+ config.option(SdkAdvancedClientOption.USER_AGENT_SUFFIX));
+
+ Optional.ofNullable(config.option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT))
+ .ifPresent(overrideConfigurationBuilder::apiCallAttemptTimeout);
+
+ Optional.ofNullable(config.option(SdkClientOption.API_CALL_TIMEOUT))
+ .ifPresent(overrideConfigurationBuilder::apiCallTimeout);
+
+ return overrideConfigurationBuilder.build();
+ }
+
+ @VisibleForTesting
+ static <
+ S extends SdkClient,
+ T extends
+ AwsAsyncClientBuilder<? extends T, S>
+ & AwsClientBuilder<? extends T, S>>
+ S createAwsAsyncClient(
+ final Properties configProps,
+ final T clientBuilder,
+ final SdkAsyncHttpClient httpClient,
+ final ClientOverrideConfiguration overrideConfiguration) {
+
+ if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
+ final URI endpointOverride =
+ URI.create(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+ clientBuilder.endpointOverride(endpointOverride);
+ }
+
+ return clientBuilder
+ .httpClient(httpClient)
+ .overrideConfiguration(overrideConfiguration)
+ .credentialsProvider(getCredentialsProvider(configProps))
+ .region(getRegion(configProps))
+ .build();
+ }
+}
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAuthenticationException.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAuthenticationException.java
new file mode 100644
index 0000000..e5527b3
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSAuthenticationException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Exception thrown on failure of authentication of Aws Credentials, this includes missing
+ * configuration, illegal access and unreachable endpoints. All {@code AWSAuthenticationException}
+ * should be non-retryable.
+ */
+@Internal
+public class AWSAuthenticationException extends RuntimeException {
+
+ public AWSAuthenticationException(final String message) {
+ super(message);
+ }
+
+ public AWSAuthenticationException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSCredentialFatalExceptionClassifiers.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSCredentialFatalExceptionClassifiers.java
new file mode 100644
index 0000000..713e11d
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSCredentialFatalExceptionClassifiers.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.sts.model.StsException;
+
+/** Class containing set of {@link FatalExceptionClassifier} for AWS credential failures. */
+@Internal
+public class AWSCredentialFatalExceptionClassifiers {
+ public static FatalExceptionClassifier getInvalidCredentialsExceptionClassifier() {
+ return FatalExceptionClassifier.withRootCauseOfType(
+ StsException.class,
+ err ->
+ new AWSAuthenticationException(
+ "Encountered non-recoverable exception relating to the provided credentials.",
+ err));
+ }
+
+ public static FatalExceptionClassifier getSdkClientMisconfiguredExceptionClassifier() {
+ return FatalExceptionClassifier.withRootCauseOfType(
+ SdkClientException.class, err -> (Exception) err);
+ }
+}
diff --git a/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
new file mode 100644
index 0000000..e1d7f98
--- /dev/null
+++ b/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider;
+import org.apache.flink.util.ExceptionUtils;
+
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.Http2Configuration;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.profiles.ProfileFile;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.utils.AttributeMap;
+import software.amazon.awssdk.utils.SdkAutoCloseable;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+/** Some general utilities specific to Amazon Web Service. */
+@Internal
+public class AWSGeneralUtil {
+ private static final Duration CONNECTION_ACQUISITION_TIMEOUT = Duration.ofSeconds(60);
+ private static final int INITIAL_WINDOW_SIZE_BYTES = 512 * 1024; // 512 KB
+ private static final Duration HEALTH_CHECK_PING_PERIOD = Duration.ofSeconds(60);
+
+ private static final int HTTP_CLIENT_MAX_CONCURRENCY = 10_000;
+ private static final Duration HTTP_CLIENT_READ_TIMEOUT = Duration.ofMinutes(6);
+ private static final Protocol HTTP_PROTOCOL = Protocol.HTTP2;
+ private static final boolean TRUST_ALL_CERTIFICATES = false;
+ private static final AttributeMap HTTP_CLIENT_DEFAULTS =
+ AttributeMap.builder()
+ .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, HTTP_CLIENT_MAX_CONCURRENCY)
+ .put(SdkHttpConfigurationOption.READ_TIMEOUT, HTTP_CLIENT_READ_TIMEOUT)
+ .put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, TRUST_ALL_CERTIFICATES)
+ .put(SdkHttpConfigurationOption.PROTOCOL, HTTP_PROTOCOL)
+ .build();
+
+ /**
+ * Determines and returns the credential provider type from the given properties.
+ *
+ * @return the credential provider type
+ */
+ public static CredentialProvider getCredentialProviderType(
+ final Properties configProps, final String configPrefix) {
+ if (!configProps.containsKey(configPrefix)) {
+ if (configProps.containsKey(AWSConfigConstants.accessKeyId(configPrefix))
+ && configProps.containsKey(AWSConfigConstants.secretKey(configPrefix))) {
+ // if the credential provider type is not specified, but the Access Key ID and
+ // Secret Key are given, it will default to BASIC
+ return CredentialProvider.BASIC;
+ } else {
+ // if the credential provider type is not specified, it will default to AUTO
+ return CredentialProvider.AUTO;
+ }
+ } else {
+ try {
+ return CredentialProvider.valueOf(configProps.getProperty(configPrefix));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid AWS Credential Provider Type %s.",
+ configProps.getProperty(configPrefix)),
+ e);
+ }
+ }
+ }
+
+ /**
+ * Return a {@link AwsCredentialsProvider} instance corresponding to the configuration
+ * properties.
+ *
+ * @param configProps the configuration property map
+ * @return The corresponding AWS Credentials Provider instance
+ */
+ public static AwsCredentialsProvider getCredentialsProvider(final Map<String, ?> configProps) {
+ Properties properties = new Properties();
+ properties.putAll(configProps);
+
+ return getCredentialsProvider(properties);
+ }
+
+ /**
+ * Return a {@link AwsCredentialsProvider} instance corresponding to the configuration
+ * properties.
+ *
+ * @param configProps the configuration properties
+ * @return The corresponding AWS Credentials Provider instance
+ */
+ public static AwsCredentialsProvider getCredentialsProvider(final Properties configProps) {
+ return getCredentialsProvider(configProps, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+ }
+
+ public static AwsCredentialsProvider getCredentialsProvider(
+ final Properties configProps, final String configPrefix) {
+ CredentialProvider credentialProviderType =
+ getCredentialProviderType(configProps, configPrefix);
+
+ switch (credentialProviderType) {
+ case ENV_VAR:
+ return EnvironmentVariableCredentialsProvider.create();
+
+ case SYS_PROP:
+ return SystemPropertyCredentialsProvider.create();
+
+ case PROFILE:
+ return getProfileCredentialProvider(configProps, configPrefix);
+
+ case BASIC:
+ return () ->
+ AwsBasicCredentials.create(
+ configProps.getProperty(
+ AWSConfigConstants.accessKeyId(configPrefix)),
+ configProps.getProperty(
+ AWSConfigConstants.secretKey(configPrefix)));
+
+ case ASSUME_ROLE:
+ return getAssumeRoleCredentialProvider(configProps, configPrefix);
+
+ case WEB_IDENTITY_TOKEN:
+ return getWebIdentityTokenFileCredentialsProvider(
+ WebIdentityTokenFileCredentialsProvider.builder(),
+ configProps,
+ configPrefix);
+
+ case AUTO:
+ return DefaultCredentialsProvider.create();
+
+ default:
+ throw new IllegalArgumentException(
+ "Credential provider not supported: " + credentialProviderType);
+ }
+ }
+
+ public static AwsCredentialsProvider getProfileCredentialProvider(
+ final Properties configProps, final String configPrefix) {
+ String profileName =
+ configProps.getProperty(AWSConfigConstants.profileName(configPrefix), null);
+
+ ProfileCredentialsProvider.Builder profileBuilder =
+ ProfileCredentialsProvider.builder().profileName(profileName);
+
+ Optional.ofNullable(configProps.getProperty(AWSConfigConstants.profilePath(configPrefix)))
+ .map(Paths::get)
+ .ifPresent(
+ path ->
+ profileBuilder.profileFile(
+ ProfileFile.builder()
+ .type(ProfileFile.Type.CREDENTIALS)
+ .content(path)
+ .build()));
+
+ return profileBuilder.build();
+ }
+
+ private static AwsCredentialsProvider getAssumeRoleCredentialProvider(
+ final Properties configProps, final String configPrefix) {
+ return StsAssumeRoleCredentialsProvider.builder()
+ .refreshRequest(
+ AssumeRoleRequest.builder()
+ .roleArn(
+ configProps.getProperty(
+ AWSConfigConstants.roleArn(configPrefix)))
+ .roleSessionName(
+ configProps.getProperty(
+ AWSConfigConstants.roleSessionName(configPrefix)))
+ .externalId(
+ configProps.getProperty(
+ AWSConfigConstants.externalId(configPrefix)))
+ .build())
+ .stsClient(
+ StsClient.builder()
+ .credentialsProvider(
+ getCredentialsProvider(
+ configProps,
+ AWSConfigConstants.roleCredentialsProvider(
+ configPrefix)))
+ .region(getRegion(configProps))
+ .build())
+ .build();
+ }
+
+ @VisibleForTesting
+ static AwsCredentialsProvider getWebIdentityTokenFileCredentialsProvider(
+ final WebIdentityTokenFileCredentialsProvider.Builder webIdentityBuilder,
+ final Properties configProps,
+ final String configPrefix) {
+
+ Optional.ofNullable(configProps.getProperty(AWSConfigConstants.roleArn(configPrefix)))
+ .ifPresent(webIdentityBuilder::roleArn);
+
+ Optional.ofNullable(
+ configProps.getProperty(AWSConfigConstants.roleSessionName(configPrefix)))
+ .ifPresent(webIdentityBuilder::roleSessionName);
+
+ Optional.ofNullable(
+ configProps.getProperty(
+ AWSConfigConstants.webIdentityTokenFile(configPrefix)))
+ .map(Paths::get)
+ .ifPresent(webIdentityBuilder::webIdentityTokenFile);
+
+ return webIdentityBuilder.build();
+ }
+
+ public static SdkAsyncHttpClient createAsyncHttpClient(final Properties configProperties) {
+ return createAsyncHttpClient(configProperties, NettyNioAsyncHttpClient.builder());
+ }
+
+ public static SdkAsyncHttpClient createAsyncHttpClient(
+ final Properties configProperties,
+ final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+ final AttributeMap.Builder clientConfiguration =
+ AttributeMap.builder().put(SdkHttpConfigurationOption.TCP_KEEPALIVE, true);
+
+ Optional.ofNullable(
+ configProperties.getProperty(
+ AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY))
+ .map(Integer::parseInt)
+ .ifPresent(
+ integer ->
+ clientConfiguration.put(
+ SdkHttpConfigurationOption.MAX_CONNECTIONS, integer));
+
+ Optional.ofNullable(
+ configProperties.getProperty(
+ AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS))
+ .map(Integer::parseInt)
+ .map(Duration::ofMillis)
+ .ifPresent(
+ timeout ->
+ clientConfiguration.put(
+ SdkHttpConfigurationOption.READ_TIMEOUT, timeout));
+
+ Optional.ofNullable(configProperties.getProperty(AWSConfigConstants.TRUST_ALL_CERTIFICATES))
+ .map(Boolean::parseBoolean)
+ .ifPresent(
+ bool ->
+ clientConfiguration.put(
+ SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, bool));
+
+ Optional.ofNullable(configProperties.getProperty(AWSConfigConstants.HTTP_PROTOCOL_VERSION))
+ .map(Protocol::valueOf)
+ .ifPresent(
+ protocol ->
+ clientConfiguration.put(
+ SdkHttpConfigurationOption.PROTOCOL, protocol));
+ return createAsyncHttpClient(clientConfiguration.build(), httpClientBuilder);
+ }
+
+ public static SdkAsyncHttpClient createAsyncHttpClient(
+ final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+ return createAsyncHttpClient(AttributeMap.empty(), httpClientBuilder);
+ }
+
+ public static SdkAsyncHttpClient createAsyncHttpClient(
+ final AttributeMap config, final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+ httpClientBuilder
+ .connectionAcquisitionTimeout(CONNECTION_ACQUISITION_TIMEOUT)
+ .http2Configuration(
+ Http2Configuration.builder()
+ .healthCheckPingPeriod(HEALTH_CHECK_PING_PERIOD)
+ .initialWindowSize(INITIAL_WINDOW_SIZE_BYTES)
+ .build());
+ return httpClientBuilder.buildWithDefaults(config.merge(HTTP_CLIENT_DEFAULTS));
+ }
+
+ /**
+ * Creates a {@link Region} object from the given Properties.
+ *
+ * @param configProps the properties containing the region
+ * @return the region specified by the properties
+ */
+ public static Region getRegion(final Properties configProps) {
+ return Region.of(configProps.getProperty(AWSConfigConstants.AWS_REGION));
+ }
+
+ /**
+ * Checks whether or not a region is valid.
+ *
+ * @param region The AWS region to check
+ * @return true if the supplied region is valid, false otherwise
+ */
+ public static boolean isValidRegion(Region region) {
+ return Pattern.matches(
+ "^[a-z]+-([a-z]+[-]{0,1}[a-z]+-([0-9]|global)|global)$", region.id());
+ }
+
+ /**
+ * Validates configuration properties related to Amazon AWS service.
+ *
+ * @param config the properties to setup credentials and region
+ */
+ public static void validateAwsConfiguration(Properties config) {
+ if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
+
+ validateCredentialProvider(config);
+ // if BASIC type is used, also check that the Access Key ID and Secret Key is supplied
+ CredentialProvider credentialsProviderType =
+ getCredentialProviderType(config, AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
+ if (credentialsProviderType == CredentialProvider.BASIC) {
+ if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
+ || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+ throw new IllegalArgumentException(
+ "Please set values for AWS Access Key ID ('"
+ + AWSConfigConstants.AWS_ACCESS_KEY_ID
+ + "') "
+ + "and Secret Key ('"
+ + AWSConfigConstants.AWS_SECRET_ACCESS_KEY
+ + "') when using the BASIC AWS credential provider type.");
+ }
+ }
+ }
+
+ if (config.containsKey(AWSConfigConstants.AWS_REGION)) {
+ // specified AWS Region name must be recognizable
+ if (!isValidRegion(getRegion(config))) {
+ StringBuilder sb = new StringBuilder();
+ for (Region region : Region.regions()) {
+ sb.append(region).append(", ");
+ }
+ throw new IllegalArgumentException(
+ "Invalid AWS region set in config. Valid values are: " + sb.toString());
+ }
+ }
+ }
+
+ public static void closeResources(SdkAutoCloseable... resources) {
+ RuntimeException exception = null;
+ for (SdkAutoCloseable resource : resources) {
+ if (resource != null) {
+ try {
+ resource.close();
+ } catch (RuntimeException e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
+ public static void validateAwsCredentials(Properties config) {
+ validateAwsConfiguration(config);
+ getCredentialsProvider(config).resolveCredentials();
+ }
+
+ private static void validateCredentialProvider(Properties config) {
+ // value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be
+ // recognizable
+ try {
+ getCredentialsProvider(config);
+ } catch (IllegalArgumentException e) {
+ StringBuilder sb = new StringBuilder();
+ for (CredentialProvider type : CredentialProvider.values()) {
+ sb.append(type.toString()).append(", ");
+ }
+ throw new IllegalArgumentException(
+ "Invalid AWS Credential Provider Type set in config. Valid values are: "
+ + sb.toString());
+ }
+ }
+}
diff --git a/flink-connector-aws-base/src/main/resources/log4j2.properties b/flink-connector-aws-base/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..c64a340
--- /dev/null
+++ b/flink-connector-aws-base/src/main/resources/log4j2.properties
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = OFF
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
new file mode 100644
index 0000000..8d533a6
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.architecture;
+
+import org.apache.flink.architecture.common.ImportOptions;
+
+import com.tngtech.archunit.core.importer.ImportOption;
+import com.tngtech.archunit.junit.AnalyzeClasses;
+import com.tngtech.archunit.junit.ArchTest;
+import com.tngtech.archunit.junit.ArchTests;
+
+/** Architecture tests for test code. */
+@AnalyzeClasses(
+ packages = "org.apache.flink.connector.aws",
+ importOptions = {
+ ImportOption.OnlyIncludeTests.class,
+ ImportOptions.ExcludeScalaImportOption.class,
+ ImportOptions.ExcludeShadedImportOption.class
+ })
+public class TestCodeArchitectureTest {
+
+ @ArchTest
+ public static final ArchTests COMMON_TESTS = ArchTests.in(TestCodeArchitectureTestBase.class);
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AWSOptionsUtilTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AWSOptionsUtilTest.java
new file mode 100644
index 0000000..6786317
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AWSOptionsUtilTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.table.util;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/** Unit tests for {@link AWSOptionUtils}. */
+class AWSOptionsUtilTest {
+
+ @Test
+ void testAWSKeyMapper() {
+ AWSOptionUtils awsOptionUtils = new AWSOptionUtils(getDefaultAWSConfigurations());
+ Map<String, String> expectedProperties = getDefaultExpectedAWSConfigurations();
+
+ // process default aws options.
+ Map<String, String> actualMappedProperties = awsOptionUtils.getProcessedResolvedOptions();
+
+ Assertions.assertThat(actualMappedProperties).isEqualTo(expectedProperties);
+ }
+
+ @Test
+ void testAWSKeySelectionAndMapping() {
+ Map<String, String> resolvedTableOptions = getDefaultAWSConfigurations();
+ Map<String, String> expectedProperties = getDefaultExpectedAWSConfigurations();
+ // adding irrelevant configurations
+ resolvedTableOptions.put("non.aws.key1", "value1");
+ resolvedTableOptions.put("non.aws.key2", "value2");
+ resolvedTableOptions.put("non.aws.key3", "value3");
+ resolvedTableOptions.put("non.aws.key4", "value4");
+
+ AWSOptionUtils awsOptionUtils = new AWSOptionUtils(resolvedTableOptions);
+ Map<String, String> actualMappedProperties = awsOptionUtils.getProcessedResolvedOptions();
+
+ Assertions.assertThat(actualMappedProperties).isEqualTo(expectedProperties);
+ }
+
+ @Test
+ void testGoodAWSProperties() {
+ AWSOptionUtils awsOptionUtils = new AWSOptionUtils(getDefaultAWSConfigurations());
+ Properties expectedProperties = new Properties();
+ expectedProperties.putAll(getDefaultExpectedAWSConfigurations());
+ // extract aws configuration from properties
+ Properties actualProperties = awsOptionUtils.getValidatedConfigurations();
+
+ Assertions.assertThat(actualProperties).isEqualTo(expectedProperties);
+ }
+
+ @Test
+ void testBadAWSRegion() {
+ Map<String, String> defaultProperties = getDefaultAWSConfigurations();
+ defaultProperties.put("aws.region", "invalid-aws-region");
+
+ AWSOptionUtils awsOptionUtils = new AWSOptionUtils(defaultProperties);
+
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(awsOptionUtils::getValidatedConfigurations)
+ .withMessageContaining("Invalid AWS region set in config.");
+ }
+
+ @Test
+ void testMissingAWSCredentials() {
+ Map<String, String> defaultProperties = getDefaultAWSConfigurations();
+ defaultProperties.remove("aws.credentials.basic.accesskeyid");
+
+ AWSOptionUtils awsOptionUtils = new AWSOptionUtils(defaultProperties);
+
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(awsOptionUtils::getValidatedConfigurations)
+ .withMessageContaining(
+ String.format(
+ "Please set values for AWS Access Key ID ('%s') "
+ + "and Secret Key ('%s') when using the BASIC AWS credential provider type.",
+ AWSConfigConstants.AWS_ACCESS_KEY_ID,
+ AWSConfigConstants.AWS_SECRET_ACCESS_KEY));
+ }
+
+ @Test
+ void testInvalidTrustAllCertificatesOption() {
+ Map<String, String> defaultProperties = getDefaultAWSConfigurations();
+ defaultProperties.put("aws.trust.all.certificates", "invalid-boolean");
+
+ AWSOptionUtils awsOptionUtils = new AWSOptionUtils(defaultProperties);
+
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(awsOptionUtils::getValidatedConfigurations)
+ .withMessageContaining(
+ String.format(
+ "Invalid %s value, must be a boolean.",
+ AWSConfigConstants.TRUST_ALL_CERTIFICATES));
+ }
+
+ private Map<String, String> getDefaultAWSConfigurations() {
+ Map<String, String> defaultAWSConfigurations = new HashMap<String, String>();
+ defaultAWSConfigurations.put("aws.region", "us-west-2");
+ defaultAWSConfigurations.put("aws.credentials.provider", "BASIC");
+ defaultAWSConfigurations.put("aws.credentials.basic.accesskeyid", "ververicka");
+ defaultAWSConfigurations.put(
+ "aws.credentials.basic.secretkey", "SuperSecretSecretSquirrel");
+ defaultAWSConfigurations.put("aws.trust.all.certificates", "true");
+ return defaultAWSConfigurations;
+ }
+
+ private Map<String, String> getDefaultExpectedAWSConfigurations() {
+ Map<String, String> defaultExpectedAWSConfigurations = new HashMap<String, String>();
+ defaultExpectedAWSConfigurations.put("aws.region", "us-west-2");
+ defaultExpectedAWSConfigurations.put("aws.credentials.provider", "BASIC");
+ defaultExpectedAWSConfigurations.put(
+ "aws.credentials.provider.basic.accesskeyid", "ververicka");
+ defaultExpectedAWSConfigurations.put(
+ "aws.credentials.provider.basic.secretkey", "SuperSecretSecretSquirrel");
+ defaultExpectedAWSConfigurations.put("aws.trust.all.certificates", "true");
+ return defaultExpectedAWSConfigurations;
+ }
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtilsTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtilsTest.java
new file mode 100644
index 0000000..afc35e2
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/table/util/AsyncClientOptionsUtilsTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.table.util;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/** Unit tests for {@link AsyncClientOptionsUtils}. */
+class AsyncClientOptionsUtilsTest {
+
+ @Test
+ public void testGoodAsyncClientOptionsMapping() {
+ AsyncClientOptionsUtils asyncClientOptionsUtils =
+ new AsyncClientOptionsUtils(getDefaultClientOptions());
+
+ Map<String, String> expectedConfigurations = getDefaultExpectedClientOptions();
+ Map<String, String> actualConfigurations =
+ asyncClientOptionsUtils.getProcessedResolvedOptions();
+
+ Assertions.assertThat(actualConfigurations).isEqualTo(expectedConfigurations);
+ }
+
+ @Test
+ void testAsyncClientOptionsUtilsFilteringNonPrefixedOptions() {
+ Map<String, String> defaultClientOptions = getDefaultClientOptions();
+ defaultClientOptions.put("sink.not.http-client.some.option", "someValue");
+
+ AsyncClientOptionsUtils asyncClientOptionsUtils =
+ new AsyncClientOptionsUtils(defaultClientOptions);
+
+ Map<String, String> expectedConfigurations = getDefaultExpectedClientOptions();
+ Map<String, String> actualConfigurations =
+ asyncClientOptionsUtils.getProcessedResolvedOptions();
+
+ Assertions.assertThat(actualConfigurations).isEqualTo(expectedConfigurations);
+ }
+
+ @Test
+ void testAsyncClientOptionsUtilsExtractingCorrectConfiguration() {
+ AsyncClientOptionsUtils asyncClientOptionsUtils =
+ new AsyncClientOptionsUtils(getDefaultClientOptions());
+
+ Properties expectedConfigurations = getDefaultExpectedClientConfigs();
+ Properties actualConfigurations = asyncClientOptionsUtils.getValidatedConfigurations();
+
+ Assertions.assertThat(actualConfigurations).isEqualTo(expectedConfigurations);
+ }
+
+ @Test
+ void testAsyncClientOptionsUtilsFailOnInvalidMaxConcurrency() {
+ Map<String, String> defaultClientOptions = getDefaultClientOptions();
+ defaultClientOptions.put("sink.http-client.max-concurrency", "invalid-integer");
+
+ AsyncClientOptionsUtils asyncClientOptionsUtils =
+ new AsyncClientOptionsUtils(defaultClientOptions);
+
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(asyncClientOptionsUtils::getValidatedConfigurations)
+ .withMessageContaining(
+ "Invalid value given for HTTP client max concurrency. Must be positive integer.");
+ }
+
+ @Test
+ void testAsyncClientOptionsUtilsFailOnInvalidReadTimeout() {
+ Map<String, String> defaultClientOptions = getDefaultClientOptions();
+ defaultClientOptions.put("sink.http-client.read-timeout", "invalid-integer");
+
+ AsyncClientOptionsUtils asyncClientOptionsUtils =
+ new AsyncClientOptionsUtils(defaultClientOptions);
+
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(asyncClientOptionsUtils::getValidatedConfigurations)
+ .withMessageContaining(
+ "Invalid value given for HTTP read timeout. Must be positive integer.");
+ }
+
+ @Test
+ void testAsyncClientOptionsUtilsFailOnInvalidHttpProtocol() {
+ Map<String, String> defaultProperties = getDefaultClientOptions();
+ defaultProperties.put("sink.http-client.protocol.version", "invalid-http-protocol");
+
+ AsyncClientOptionsUtils asyncClientOptionsUtils =
+ new AsyncClientOptionsUtils(defaultProperties);
+
+ Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(asyncClientOptionsUtils::getValidatedConfigurations)
+ .withMessageContaining(
+ "Invalid value given for HTTP protocol. Must be HTTP1_1 or HTTP2.");
+ }
+
+ private static Map<String, String> getDefaultClientOptions() {
+ Map<String, String> defaultKinesisClientOptions = new HashMap<String, String>();
+ defaultKinesisClientOptions.put("aws.region", "us-east-1");
+ defaultKinesisClientOptions.put("sink.http-client.max-concurrency", "10000");
+ defaultKinesisClientOptions.put("sink.http-client.read-timeout", "360000");
+ defaultKinesisClientOptions.put("sink.http-client.protocol.version", "HTTP2");
+ return defaultKinesisClientOptions;
+ }
+
+ private static Map<String, String> getDefaultExpectedClientOptions() {
+ Map<String, String> defaultExpectedKinesisClientConfigurations =
+ new HashMap<String, String>();
+ defaultExpectedKinesisClientConfigurations.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+ defaultExpectedKinesisClientConfigurations.put(
+ AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY, "10000");
+ defaultExpectedKinesisClientConfigurations.put(
+ AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS, "360000");
+ defaultExpectedKinesisClientConfigurations.put(
+ AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP2");
+ return defaultExpectedKinesisClientConfigurations;
+ }
+
+ private static Properties getDefaultExpectedClientConfigs() {
+ Properties defaultExpectedKinesisClientConfigurations = new Properties();
+ defaultExpectedKinesisClientConfigurations.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+ defaultExpectedKinesisClientConfigurations.put(
+ AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY, "10000");
+ defaultExpectedKinesisClientConfigurations.put(
+ AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS, "360000");
+ defaultExpectedKinesisClientConfigurations.put(
+ AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP2");
+ return defaultExpectedKinesisClientConfigurations;
+ }
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
new file mode 100644
index 0000000..7d54c39
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.testutils;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.iam.IamClient;
+import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.waiters.S3Waiter;
+import software.amazon.awssdk.utils.AttributeMap;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the Localstack container.
+ */
+public class AWSServicesTestUtils {
+
+ private static final String ACCESS_KEY_ID = "accessKeyId";
+ private static final String SECRET_ACCESS_KEY = "secretAccessKey";
+
+ public static S3Client createS3Client(String endpoint, SdkHttpClient httpClient) {
+ return createAwsSyncClient(endpoint, httpClient, S3Client.builder());
+ }
+
+ public static IamClient createIamClient(String endpoint, SdkHttpClient httpClient) {
+ return createAwsSyncClient(endpoint, httpClient, IamClient.builder());
+ }
+
+ public static <
+ S extends SdkClient,
+ T extends
+ AwsSyncClientBuilder<? extends T, S> & AwsClientBuilder<? extends T, S>>
+ S createAwsSyncClient(String endpoint, SdkHttpClient httpClient, T clientBuilder) {
+ Properties config = createConfig(endpoint);
+ return clientBuilder
+ .httpClient(httpClient)
+ .endpointOverride(URI.create(endpoint))
+ .credentialsProvider(AWSGeneralUtil.getCredentialsProvider(config))
+ .region(AWSGeneralUtil.getRegion(config))
+ .build();
+ }
+
+ public static Properties createConfig(String endpoint) {
+ Properties config = new Properties();
+ config.setProperty(AWS_REGION, Region.AP_SOUTHEAST_1.toString());
+ config.setProperty(AWS_ENDPOINT, endpoint);
+ config.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), ACCESS_KEY_ID);
+ config.setProperty(
+ AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), SECRET_ACCESS_KEY);
+ config.setProperty(TRUST_ALL_CERTIFICATES, "true");
+ return config;
+ }
+
+ public static SdkHttpClient createHttpClient() {
+ AttributeMap.Builder attributeMapBuilder = AttributeMap.builder();
+ attributeMapBuilder.put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true);
+ attributeMapBuilder.put(SdkHttpConfigurationOption.PROTOCOL, Protocol.HTTP1_1);
+ return ApacheHttpClient.builder().buildWithDefaults(attributeMapBuilder.build());
+ }
+
+ public static void createBucket(S3Client s3Client, String bucketName) {
+ CreateBucketRequest bucketRequest =
+ CreateBucketRequest.builder().bucket(bucketName).build();
+ s3Client.createBucket(bucketRequest);
+
+ HeadBucketRequest bucketRequestWait =
+ HeadBucketRequest.builder().bucket(bucketName).build();
+
+ try (final S3Waiter waiter = s3Client.waiter()) {
+ waiter.waitUntilBucketExists(bucketRequestWait);
+ }
+ }
+
+ public static void createIAMRole(IamClient iam, String roleName) {
+ CreateRoleRequest request = CreateRoleRequest.builder().roleName(roleName).build();
+
+ iam.createRole(request);
+ }
+
+ public static List<S3Object> listBucketObjects(S3Client s3, String bucketName) {
+ ListObjectsV2Request listObjects =
+ ListObjectsV2Request.builder().bucket(bucketName).build();
+ ListObjectsV2Response res = s3.listObjectsV2(listObjects);
+ return res.contents();
+ }
+
+ public static <T> List<T> readObjectsFromS3Bucket(
+ S3Client s3Client,
+ List<S3Object> objects,
+ String bucketName,
+ Function<ResponseBytes<GetObjectResponse>, T> deserializer) {
+ return objects.stream()
+ .map(
+ object ->
+ GetObjectRequest.builder()
+ .bucket(bucketName)
+ .key(object.key())
+ .build())
+ .map(s3Client::getObjectAsBytes)
+ .map(deserializer)
+ .collect(Collectors.toList());
+ }
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java
new file mode 100644
index 0000000..e11ec27
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/LocalstackContainer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.testutils;
+
+import org.rnorth.ducttape.ratelimits.RateLimiter;
+import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
+import org.rnorth.ducttape.unreliables.Unreliables;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.AbstractWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import java.util.List;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * A class wrapping the Localstack container that provides mock implementations of many common AWS
+ * services.
+ */
+public class LocalstackContainer extends GenericContainer<LocalstackContainer> {
+
+ private static final int CONTAINER_PORT = 4566;
+
+ public LocalstackContainer(DockerImageName imageName) {
+ super(imageName);
+ withExposedPorts(CONTAINER_PORT);
+ waitingFor(new ListBucketObjectsWaitStrategy());
+ }
+
+ public String getEndpoint() {
+ return String.format("https://%s:%s", getHost(), getMappedPort(CONTAINER_PORT));
+ }
+
+ private class ListBucketObjectsWaitStrategy extends AbstractWaitStrategy {
+ private static final int TRANSACTIONS_PER_SECOND = 1;
+
+ private final RateLimiter rateLimiter =
+ RateLimiterBuilder.newBuilder()
+ .withRate(TRANSACTIONS_PER_SECOND, SECONDS)
+ .withConstantThroughput()
+ .build();
+
+ @Override
+ protected void waitUntilReady() {
+ try {
+ Thread.sleep(30_000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new IllegalStateException("Localstack Container startup was interrupted");
+ }
+ Unreliables.retryUntilSuccess(
+ (int) startupTimeout.getSeconds(),
+ SECONDS,
+ () -> rateLimiter.getWhenReady(this::list));
+ }
+
+ private List<S3Object> list() {
+ final String bucketName = "bucket-name-not-to-be-used";
+ try (final SdkHttpClient httpClient = AWSServicesTestUtils.createHttpClient();
+ final S3Client client =
+ AWSServicesTestUtils.createS3Client(getEndpoint(), httpClient)) {
+ AWSServicesTestUtils.createBucket(client, bucketName);
+ return AWSServicesTestUtils.listBucketObjects(client, bucketName);
+ }
+ }
+ }
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java
new file mode 100644
index 0000000..cfc12bf
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSAsyncSinkUtilTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
+import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.core.client.config.ClientAsyncConfiguration;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
+import software.amazon.awssdk.core.client.config.SdkClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.aws.util.AWSAsyncSinkUtil.formatFlinkUserAgentPrefix;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Tests for {@link AWSAsyncSinkUtil}. */
+class AWSAsyncSinkUtilTest {
+
+ private static final String DEFAULT_USER_AGENT_PREFIX_FORMAT =
+ "Apache Flink %s (%s) *Destination* Connector";
+ private static final String DEFAULT_USER_AGENT_PREFIX_FORMAT_V2 =
+ "Apache Flink %s (%s) *Destination* Connector V2";
+
+ @Test
+ void testCreateKinesisAsyncClient() {
+ Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
+ MockAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
+ ClientOverrideConfiguration clientOverrideConfiguration =
+ ClientOverrideConfiguration.builder().build();
+ SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build();
+
+ AWSAsyncSinkUtil.createAwsAsyncClient(
+ properties, builder, httpClient, clientOverrideConfiguration);
+
+ verify(builder).overrideConfiguration(clientOverrideConfiguration);
+ verify(builder).httpClient(httpClient);
+ verify(builder).region(Region.of("eu-west-2"));
+ verify(builder)
+ .credentialsProvider(argThat(cp -> cp instanceof DefaultCredentialsProvider));
+ verify(builder, never()).endpointOverride(any());
+ }
+
+ @Test
+ void testCreateKinesisAsyncClientWithEndpointOverride() {
+ Properties properties = TestUtil.properties(AWS_REGION, "eu-west-2");
+ properties.setProperty(AWS_ENDPOINT, "https://localhost");
+
+ MockAsyncClientBuilder builder = mockKinesisAsyncClientBuilder();
+ ClientOverrideConfiguration clientOverrideConfiguration =
+ ClientOverrideConfiguration.builder().build();
+ SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder().build();
+
+ AWSAsyncSinkUtil.createAwsAsyncClient(
+ properties, builder, httpClient, clientOverrideConfiguration);
+
+ verify(builder).endpointOverride(URI.create("https://localhost"));
+ }
+
+ @Test
+ void testClientOverrideConfigurationWithDefaults() {
+ SdkClientConfiguration clientConfiguration = SdkClientConfiguration.builder().build();
+
+ ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+ AWSAsyncSinkUtil.createClientOverrideConfiguration(
+ clientConfiguration,
+ builder,
+ formatFlinkUserAgentPrefix(
+ DEFAULT_USER_AGENT_PREFIX_FORMAT + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
+
+ verify(builder).build();
+ verify(builder)
+ .putAdvancedOption(
+ SdkAdvancedClientOption.USER_AGENT_PREFIX,
+ formatFlinkUserAgentPrefix(DEFAULT_USER_AGENT_PREFIX_FORMAT_V2));
+ verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, null);
+ verify(builder, never()).apiCallAttemptTimeout(any());
+ verify(builder, never()).apiCallTimeout(any());
+ }
+
+ @Test
+ void testClientOverrideConfigurationUserAgentSuffix() {
+ SdkClientConfiguration clientConfiguration =
+ SdkClientConfiguration.builder()
+ .option(SdkAdvancedClientOption.USER_AGENT_SUFFIX, "suffix")
+ .build();
+
+ ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+ AWSAsyncSinkUtil.createClientOverrideConfiguration(
+ clientConfiguration,
+ builder,
+ formatFlinkUserAgentPrefix(
+ DEFAULT_USER_AGENT_PREFIX_FORMAT + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
+
+ verify(builder).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, "suffix");
+ }
+
+ @Test
+ void testClientOverrideConfigurationApiCallAttemptTimeout() {
+ SdkClientConfiguration clientConfiguration =
+ SdkClientConfiguration.builder()
+ .option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT, Duration.ofMillis(500))
+ .build();
+
+ ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+ AWSAsyncSinkUtil.createClientOverrideConfiguration(
+ clientConfiguration,
+ builder,
+ formatFlinkUserAgentPrefix(
+ DEFAULT_USER_AGENT_PREFIX_FORMAT_V2
+ + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
+
+ verify(builder).apiCallAttemptTimeout(Duration.ofMillis(500));
+ }
+
+ @Test
+ void testClientOverrideConfigurationApiCallTimeout() {
+ SdkClientConfiguration clientConfiguration =
+ SdkClientConfiguration.builder()
+ .option(SdkClientOption.API_CALL_TIMEOUT, Duration.ofMillis(600))
+ .build();
+
+ ClientOverrideConfiguration.Builder builder = mockClientOverrideConfigurationBuilder();
+
+ AWSAsyncSinkUtil.createClientOverrideConfiguration(
+ clientConfiguration,
+ builder,
+ formatFlinkUserAgentPrefix(
+ DEFAULT_USER_AGENT_PREFIX_FORMAT_V2
+ + AWSAsyncSinkUtil.V2_USER_AGENT_SUFFIX));
+
+ verify(builder).apiCallTimeout(Duration.ofMillis(600));
+ }
+
+ private MockAsyncClientBuilder mockKinesisAsyncClientBuilder() {
+ MockAsyncClientBuilder builder = mock(MockAsyncClientBuilder.class);
+ when(builder.overrideConfiguration(any(ClientOverrideConfiguration.class)))
+ .thenReturn(builder);
+ when(builder.httpClient(any())).thenReturn(builder);
+ when(builder.credentialsProvider(any())).thenReturn(builder);
+ when(builder.region(any())).thenReturn(builder);
+
+ return builder;
+ }
+
+ private ClientOverrideConfiguration.Builder mockClientOverrideConfigurationBuilder() {
+ ClientOverrideConfiguration.Builder builder =
+ mock(ClientOverrideConfiguration.Builder.class);
+ when(builder.putAdvancedOption(any(), any())).thenReturn(builder);
+ when(builder.apiCallAttemptTimeout(any())).thenReturn(builder);
+ when(builder.apiCallTimeout(any())).thenReturn(builder);
+
+ return builder;
+ }
+
+ private static class MockAsyncClientBuilder
+ implements AwsAsyncClientBuilder<MockAsyncClientBuilder, SdkClient>,
+ AwsClientBuilder<MockAsyncClientBuilder, SdkClient> {
+
+ @Override
+ public MockAsyncClientBuilder asyncConfiguration(
+ ClientAsyncConfiguration clientAsyncConfiguration) {
+ return null;
+ }
+
+ @Override
+ public MockAsyncClientBuilder httpClient(SdkAsyncHttpClient sdkAsyncHttpClient) {
+ return null;
+ }
+
+ @Override
+ public MockAsyncClientBuilder httpClientBuilder(SdkAsyncHttpClient.Builder builder) {
+ return null;
+ }
+
+ @Override
+ public MockAsyncClientBuilder credentialsProvider(
+ AwsCredentialsProvider awsCredentialsProvider) {
+ return null;
+ }
+
+ @Override
+ public MockAsyncClientBuilder region(Region region) {
+ return null;
+ }
+
+ @Override
+ public MockAsyncClientBuilder dualstackEnabled(Boolean aBoolean) {
+ return null;
+ }
+
+ @Override
+ public MockAsyncClientBuilder fipsEnabled(Boolean aBoolean) {
+ return null;
+ }
+
+ @Override
+ public MockAsyncClientBuilder overrideConfiguration(
+ ClientOverrideConfiguration clientOverrideConfiguration) {
+ return null;
+ }
+
+ @Override
+ public ClientOverrideConfiguration overrideConfiguration() {
+ return null;
+ }
+
+ @Override
+ public MockAsyncClientBuilder endpointOverride(URI uri) {
+ return null;
+ }
+
+ @Override
+ public SdkClient build() {
+ return null;
+ }
+ }
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSGeneralUtilTest.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSGeneralUtilTest.java
new file mode 100644
index 0000000..ca5613a
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/AWSGeneralUtilTest.java
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.utils.AttributeMap;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.ASSUME_ROLE;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.AUTO;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.BASIC;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.WEB_IDENTITY_TOKEN;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.roleArn;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.roleSessionName;
+import static org.apache.flink.connector.aws.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+import static software.amazon.awssdk.http.Protocol.HTTP2;
+
+/** Tests for {@link AWSGeneralUtil}. */
+class AWSGeneralUtilTest {
+
+ @Test
+ void testGetCredentialsProviderTypeDefaultsAuto() {
+ assertThat(
+ AWSGeneralUtil.getCredentialProviderType(
+ new Properties(), AWS_CREDENTIALS_PROVIDER))
+ .isEqualTo(AUTO);
+ }
+
+ @Test
+ void testGetCredentialsProviderTypeBasic() {
+ Properties testConfig =
+ TestUtil.properties(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+ testConfig.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+ assertThat(AWSGeneralUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER))
+ .isEqualTo(BASIC);
+ }
+
+ @Test
+ void testGetCredentialsProviderTypeWebIdentityToken() {
+ Properties testConfig = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+ CredentialProvider type =
+ AWSGeneralUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER);
+ assertThat(type).isEqualTo(WEB_IDENTITY_TOKEN);
+ }
+
+ @Test
+ void testGetCredentialsProviderTypeAssumeRole() {
+ Properties testConfig = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE");
+
+ CredentialProvider type =
+ AWSGeneralUtil.getCredentialProviderType(testConfig, AWS_CREDENTIALS_PROVIDER);
+ assertThat(type).isEqualTo(ASSUME_ROLE);
+ }
+
+ @Test
+ void testGetCredentialsProviderEnvironmentVariables() {
+ Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+ AwsCredentialsProvider credentialsProvider =
+ AWSGeneralUtil.getCredentialsProvider(properties);
+
+ assertThat(credentialsProvider).isInstanceOf(EnvironmentVariableCredentialsProvider.class);
+ }
+
+ @Test
+ void testGetCredentialsProviderSystemProperties() {
+ Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+ AwsCredentialsProvider credentialsProvider =
+ AWSGeneralUtil.getCredentialsProvider(properties);
+
+ assertThat(credentialsProvider).isInstanceOf(SystemPropertyCredentialsProvider.class);
+ }
+
+ @Test
+ void testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+ Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+ AwsCredentialsProvider credentialsProvider =
+ AWSGeneralUtil.getCredentialsProvider(properties);
+
+ assertThat(credentialsProvider).isInstanceOf(WebIdentityTokenFileCredentialsProvider.class);
+ }
+
+ @Test
+ void testGetWebIdentityTokenFileCredentialsProvider() {
+ Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+ properties.setProperty(roleArn(AWS_CREDENTIALS_PROVIDER), "roleArn");
+ properties.setProperty(roleSessionName(AWS_CREDENTIALS_PROVIDER), "roleSessionName");
+
+ WebIdentityTokenFileCredentialsProvider.Builder builder =
+ mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+ AWSGeneralUtil.getWebIdentityTokenFileCredentialsProvider(
+ builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+ verify(builder).roleArn("roleArn");
+ verify(builder).roleSessionName("roleSessionName");
+ verify(builder, never()).webIdentityTokenFile(any());
+ }
+
+ @Test
+ void testGetWebIdentityTokenFileCredentialsProviderWithWebIdentityFile() {
+ Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+ properties.setProperty(
+ webIdentityTokenFile(AWS_CREDENTIALS_PROVIDER), "webIdentityTokenFile");
+
+ WebIdentityTokenFileCredentialsProvider.Builder builder =
+ mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+ AWSGeneralUtil.getWebIdentityTokenFileCredentialsProvider(
+ builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+ verify(builder).webIdentityTokenFile(Paths.get("webIdentityTokenFile"));
+ }
+
+ @Test
+ void testGetCredentialsProviderAuto() {
+ Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "AUTO");
+
+ AwsCredentialsProvider credentialsProvider =
+ AWSGeneralUtil.getCredentialsProvider(properties);
+
+ assertThat(credentialsProvider).isInstanceOf(DefaultCredentialsProvider.class);
+ }
+
+ @Test
+ void testGetCredentialsProviderFromMap() {
+ Map<String, Object> config = ImmutableMap.of(AWS_CREDENTIALS_PROVIDER, "AUTO");
+
+ AwsCredentialsProvider credentialsProvider = AWSGeneralUtil.getCredentialsProvider(config);
+
+ assertThat(credentialsProvider).isInstanceOf(DefaultCredentialsProvider.class);
+ }
+
+ @Test
+ void testGetCredentialsProviderAssumeRole() {
+ Properties properties = spy(TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE"));
+ properties.setProperty(AWS_REGION, "eu-west-2");
+
+ AwsCredentialsProvider credentialsProvider =
+ AWSGeneralUtil.getCredentialsProvider(properties);
+
+ assertThat(credentialsProvider).isInstanceOf(StsAssumeRoleCredentialsProvider.class);
+
+ verify(properties).getProperty(AWSConfigConstants.roleArn(AWS_CREDENTIALS_PROVIDER));
+ verify(properties)
+ .getProperty(AWSConfigConstants.roleSessionName(AWS_CREDENTIALS_PROVIDER));
+ verify(properties).getProperty(AWSConfigConstants.externalId(AWS_CREDENTIALS_PROVIDER));
+ verify(properties).getProperty(AWS_REGION);
+ }
+
+ @Test
+ void testGetCredentialsProviderBasic() {
+ Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "BASIC");
+ properties.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+ properties.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+ AwsCredentials credentials =
+ AWSGeneralUtil.getCredentialsProvider(properties).resolveCredentials();
+
+ assertThat(credentials.accessKeyId()).isEqualTo("ak");
+ assertThat(credentials.secretAccessKey()).isEqualTo("sk");
+ }
+
+ @Test
+ void testGetCredentialsProviderProfile() {
+ Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+ properties.put(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "default");
+ properties.put(
+ AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER),
+ "src/test/resources/profile");
+
+ AwsCredentialsProvider credentialsProvider =
+ AWSGeneralUtil.getCredentialsProvider(properties);
+
+ assertThat(credentialsProvider).isInstanceOf(ProfileCredentialsProvider.class);
+
+ AwsCredentials credentials = credentialsProvider.resolveCredentials();
+ assertThat(credentials.accessKeyId()).isEqualTo("11111111111111111111");
+ assertThat(credentials.secretAccessKey())
+ .isEqualTo("wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111");
+ }
+
+ @Test
+ void testGetCredentialsProviderNamedProfile() {
+ Properties properties = TestUtil.properties(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+ properties.setProperty(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "foo");
+ properties.setProperty(
+ AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER),
+ "src/test/resources/profile");
+
+ AwsCredentialsProvider credentialsProvider =
+ AWSGeneralUtil.getCredentialsProvider(properties);
+
+ assertThat(credentialsProvider).isInstanceOf(ProfileCredentialsProvider.class);
+
+ AwsCredentials credentials = credentialsProvider.resolveCredentials();
+ assertThat(credentials.accessKeyId()).isEqualTo("22222222222222222222");
+ assertThat(credentials.secretAccessKey())
+ .isEqualTo("wJalrXUtnFEMI/K7MDENG/bPxRfiCY2222222222");
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithPropertyTcpKeepAlive() throws Exception {
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(new Properties());
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.tcpKeepAlive()).isTrue();
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithPropertyMaxConcurrency() throws Exception {
+ int maxConnections = 45678;
+ Properties properties = new Properties();
+ properties.setProperty(
+ AWSConfigConstants.HTTP_CLIENT_MAX_CONCURRENCY, String.valueOf(maxConnections));
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(properties);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.maxConnections()).isEqualTo(maxConnections);
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithPropertyReadTimeout() throws Exception {
+ int readTimeoutMillis = 45678;
+ Properties properties = new Properties();
+ properties.setProperty(
+ AWSConfigConstants.HTTP_CLIENT_READ_TIMEOUT_MILLIS,
+ String.valueOf(readTimeoutMillis));
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(properties);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.readTimeoutMillis()).isEqualTo(readTimeoutMillis);
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithPropertyTrustAllCertificates() throws Exception {
+ boolean trustAllCerts = true;
+ Properties properties = new Properties();
+ properties.setProperty(
+ AWSConfigConstants.TRUST_ALL_CERTIFICATES, String.valueOf(trustAllCerts));
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(properties);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.trustAllCertificates()).isEqualTo(trustAllCerts);
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithPropertyProtocol() throws Exception {
+ Protocol httpVersion = HTTP1_1;
+ Properties properties = new Properties();
+ properties.setProperty(
+ AWSConfigConstants.HTTP_PROTOCOL_VERSION, String.valueOf(httpVersion));
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(properties);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.attribute(SdkHttpConfigurationOption.PROTOCOL))
+ .isEqualTo(httpVersion);
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsConnectionAcquireTimeout() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.connectionAcquireTimeoutMillis()).isEqualTo(60_000);
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsConnectionTtl() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+ NettyConfiguration nettyDefaultConfiguration =
+ TestUtil.getNettyConfiguration(httpDefaultClient);
+
+ assertThat(nettyConfiguration.connectionTtlMillis())
+ .isEqualTo(nettyDefaultConfiguration.connectionTtlMillis());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsConnectionTimeout() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+ NettyConfiguration nettyDefaultConfiguration =
+ TestUtil.getNettyConfiguration(httpDefaultClient);
+
+ assertThat(nettyDefaultConfiguration.connectTimeoutMillis())
+ .isEqualTo(nettyConfiguration.connectTimeoutMillis());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsIdleTimeout() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+ NettyConfiguration nettyDefaultConfiguration =
+ TestUtil.getNettyConfiguration(httpDefaultClient);
+
+ assertThat(nettyConfiguration.idleTimeoutMillis())
+ .isEqualTo(nettyDefaultConfiguration.idleTimeoutMillis());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsMaxConnections() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.maxConnections()).isEqualTo(10_000);
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsMaxPendingConnectionAcquires() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+ NettyConfiguration nettyDefaultConfiguration =
+ TestUtil.getNettyConfiguration(httpDefaultClient);
+
+ assertThat(nettyConfiguration.maxPendingConnectionAcquires())
+ .isEqualTo(nettyDefaultConfiguration.maxPendingConnectionAcquires());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsReadTimeout() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.readTimeoutMillis()).isEqualTo(360_000);
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsReapIdleConnections() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+ NettyConfiguration nettyDefaultConfiguration =
+ TestUtil.getNettyConfiguration(httpDefaultClient);
+
+ assertThat(nettyConfiguration.reapIdleConnections())
+ .isEqualTo(nettyDefaultConfiguration.reapIdleConnections());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsTcpKeepAlive() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+ NettyConfiguration nettyDefaultConfiguration =
+ TestUtil.getNettyConfiguration(httpDefaultClient);
+
+ assertThat(nettyConfiguration.tcpKeepAlive())
+ .isEqualTo(nettyDefaultConfiguration.tcpKeepAlive());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsTlsKeyManagersProvider() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+ NettyConfiguration nettyDefaultConfiguration =
+ TestUtil.getNettyConfiguration(httpDefaultClient);
+
+ assertThat(nettyConfiguration.tlsKeyManagersProvider())
+ .isEqualTo(nettyDefaultConfiguration.tlsKeyManagersProvider());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsTlsTrustManagersProvider() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+ NettyConfiguration nettyDefaultConfiguration =
+ TestUtil.getNettyConfiguration(httpDefaultClient);
+
+ assertThat(nettyConfiguration.tlsTrustManagersProvider())
+ .isEqualTo(nettyDefaultConfiguration.tlsTrustManagersProvider());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsTrustAllCertificates() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.trustAllCertificates()).isFalse();
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsWriteTimeout() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ SdkAsyncHttpClient httpDefaultClient = NettyNioAsyncHttpClient.create();
+ NettyConfiguration nettyDefaultConfiguration =
+ TestUtil.getNettyConfiguration(httpDefaultClient);
+
+ assertThat(nettyConfiguration.writeTimeoutMillis())
+ .isEqualTo(nettyDefaultConfiguration.writeTimeoutMillis());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWithDefaultsProtocol() throws Exception {
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+
+ SdkAsyncHttpClient httpClient = AWSGeneralUtil.createAsyncHttpClient(builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.attribute(SdkHttpConfigurationOption.PROTOCOL))
+ .isEqualTo(HTTP2);
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientReadTimeout() throws Exception {
+ Duration readTimeout = Duration.ofMillis(1234);
+
+ AttributeMap clientConfiguration =
+ AttributeMap.builder()
+ .put(SdkHttpConfigurationOption.READ_TIMEOUT, readTimeout)
+ .build();
+
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+ SdkAsyncHttpClient httpClient =
+ AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.readTimeoutMillis()).isEqualTo(readTimeout.toMillis());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientTcpKeepAlive() throws Exception {
+ boolean tcpKeepAlive = true;
+
+ AttributeMap clientConfiguration =
+ AttributeMap.builder()
+ .put(SdkHttpConfigurationOption.TCP_KEEPALIVE, tcpKeepAlive)
+ .build();
+
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+ SdkAsyncHttpClient httpClient =
+ AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.tcpKeepAlive()).isEqualTo(tcpKeepAlive);
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientConnectionTimeout() throws Exception {
+ Duration connectionTimeout = Duration.ofMillis(1000);
+
+ AttributeMap clientConfiguration =
+ AttributeMap.builder()
+ .put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, connectionTimeout)
+ .build();
+
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+ SdkAsyncHttpClient httpClient =
+ AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.connectTimeoutMillis())
+ .isEqualTo(connectionTimeout.toMillis());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientMaxConcurrency() throws Exception {
+ int maxConnections = 123;
+
+ AttributeMap clientConfiguration =
+ AttributeMap.builder()
+ .put(SdkHttpConfigurationOption.MAX_CONNECTIONS, maxConnections)
+ .build();
+
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+ SdkAsyncHttpClient httpClient =
+ AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.maxConnections()).isEqualTo(maxConnections);
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientWriteTimeout() throws Exception {
+ Duration writeTimeout = Duration.ofMillis(3000);
+
+ AttributeMap clientConfiguration =
+ AttributeMap.builder()
+ .put(SdkHttpConfigurationOption.WRITE_TIMEOUT, writeTimeout)
+ .build();
+
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+ SdkAsyncHttpClient httpClient =
+ AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.writeTimeoutMillis()).isEqualTo(writeTimeout.toMillis());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientConnectionMaxIdleTime() throws Exception {
+ Duration maxIdleTime = Duration.ofMillis(2000);
+
+ AttributeMap clientConfiguration =
+ AttributeMap.builder()
+ .put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, maxIdleTime)
+ .build();
+
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+ SdkAsyncHttpClient httpClient =
+ AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.idleTimeoutMillis()).isEqualTo(maxIdleTime.toMillis());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientIdleConnectionReaper() throws Exception {
+ boolean reapIdleConnections = false;
+
+ AttributeMap clientConfiguration =
+ AttributeMap.builder()
+ .put(SdkHttpConfigurationOption.REAP_IDLE_CONNECTIONS, reapIdleConnections)
+ .build();
+
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+ SdkAsyncHttpClient httpClient =
+ AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.reapIdleConnections()).isEqualTo(reapIdleConnections);
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientIdleConnectionTtl() throws Exception {
+ Duration connectionTtl = Duration.ofMillis(5000);
+
+ AttributeMap clientConfiguration =
+ AttributeMap.builder()
+ .put(SdkHttpConfigurationOption.CONNECTION_TIME_TO_LIVE, connectionTtl)
+ .build();
+
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+ SdkAsyncHttpClient httpClient =
+ AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.connectionTtlMillis()).isEqualTo(connectionTtl.toMillis());
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientTrustAllCertificates() throws Exception {
+ boolean trustAllCertificates = true;
+
+ AttributeMap clientConfiguration =
+ AttributeMap.builder()
+ .put(
+ SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
+ trustAllCertificates)
+ .build();
+
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+ SdkAsyncHttpClient httpClient =
+ AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.trustAllCertificates()).isEqualTo(trustAllCertificates);
+ }
+
+ @Test
+ void testCreateNettyAsyncHttpClientHttpVersion() throws Exception {
+ Protocol httpVersion = HTTP1_1;
+
+ AttributeMap clientConfiguration =
+ AttributeMap.builder()
+ .put(SdkHttpConfigurationOption.PROTOCOL, httpVersion)
+ .build();
+
+ NettyNioAsyncHttpClient.Builder builder = NettyNioAsyncHttpClient.builder();
+ SdkAsyncHttpClient httpClient =
+ AWSGeneralUtil.createAsyncHttpClient(clientConfiguration, builder);
+ NettyConfiguration nettyConfiguration = TestUtil.getNettyConfiguration(httpClient);
+
+ assertThat(nettyConfiguration.attribute(SdkHttpConfigurationOption.PROTOCOL))
+ .isEqualTo(httpVersion);
+ }
+
+ @Test
+ void testGetRegion() {
+ Region region = AWSGeneralUtil.getRegion(TestUtil.properties(AWS_REGION, "eu-west-2"));
+
+ assertThat(region).isEqualTo(Region.EU_WEST_2);
+ }
+
+ @Test
+ void testValidRegion() {
+ assertThat(AWSGeneralUtil.isValidRegion(Region.of("us-east-1"))).isTrue();
+ assertThat(AWSGeneralUtil.isValidRegion(Region.of("us-gov-west-1"))).isTrue();
+ assertThat(AWSGeneralUtil.isValidRegion(Region.of("us-isob-east-1"))).isTrue();
+ assertThat(AWSGeneralUtil.isValidRegion(Region.of("aws-global"))).isTrue();
+ assertThat(AWSGeneralUtil.isValidRegion(Region.of("aws-iso-global"))).isTrue();
+ assertThat(AWSGeneralUtil.isValidRegion(Region.of("aws-iso-b-global"))).isTrue();
+ }
+
+ @Test
+ void testInvalidRegion() {
+ assertThat(AWSGeneralUtil.isValidRegion(Region.of("unstructured-string"))).isFalse();
+ }
+
+ @Test
+ void testUnrecognizableAwsRegionInConfig() {
+
+ Properties testConfig = TestUtil.properties(AWSConfigConstants.AWS_REGION, "wrongRegionId");
+ testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ assertThatThrownBy(() -> AWSGeneralUtil.validateAwsConfiguration(testConfig))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid AWS region");
+ }
+
+ @Test
+ void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
+ Properties testConfig = TestUtil.properties(AWSConfigConstants.AWS_REGION, "us-east-1");
+ testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
+
+ assertThatThrownBy(() -> AWSGeneralUtil.validateAwsConfiguration(testConfig))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Please set values for AWS Access Key ID ('"
+ + AWSConfigConstants.AWS_ACCESS_KEY_ID
+ + "') "
+ + "and Secret Key ('"
+ + AWSConfigConstants.AWS_SECRET_ACCESS_KEY
+ + "') when using the BASIC AWS credential provider type.");
+ }
+
+ @Test
+ void testUnrecognizableCredentialProviderTypeInConfig() {
+ Properties testConfig = TestUtil.getStandardProperties();
+ testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "wrongProviderType");
+
+ assertThatThrownBy(() -> AWSGeneralUtil.validateAwsConfiguration(testConfig))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid AWS Credential Provider Type");
+ }
+
+ @Test
+ void testMissingWebIdentityTokenFileInCredentials() {
+ Properties properties = TestUtil.getStandardProperties();
+ properties.setProperty(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+ assertThatThrownBy(() -> AWSGeneralUtil.validateAwsCredentials(properties))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set.");
+ }
+
+ @Test
+ void testMissingEnvironmentVariableCredentials() {
+ Properties properties = TestUtil.getStandardProperties();
+ properties.setProperty(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+ assertThatThrownBy(() -> AWSGeneralUtil.validateAwsCredentials(properties))
+ .isInstanceOf(SdkClientException.class)
+ .hasMessageContaining(
+ "Access key must be specified either via environment variable");
+ }
+
+ @Test
+ void testFailedSystemPropertiesCredentialsValidationsOnMissingAccessKey() {
+ Properties properties = TestUtil.getStandardProperties();
+ properties.setProperty(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+ assertThatThrownBy(() -> AWSGeneralUtil.validateAwsCredentials(properties))
+ .isInstanceOf(SdkClientException.class)
+ .hasMessageContaining(
+ "Access key must be specified either via environment variable (AWS_ACCESS_KEY_ID) or system property (aws.accessKeyId)");
+ }
+
+ @Test
+ void testFailedSystemPropertiesCredentialsValidationsOnMissingSecretKey() {
+ System.setProperty("aws.accessKeyId", "accesKeyId");
+ Properties properties = TestUtil.getStandardProperties();
+ properties.setProperty(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+ assertThatThrownBy(() -> AWSGeneralUtil.validateAwsCredentials(properties))
+ .isInstanceOf(SdkClientException.class)
+ .hasMessageContaining(
+ "Secret key must be specified either via environment variable (AWS_SECRET_ACCESS_KEY) or system property (aws.secretAccessKey)");
+ }
+
+ private WebIdentityTokenFileCredentialsProvider.Builder
+ mockWebIdentityTokenFileCredentialsProviderBuilder() {
+ WebIdentityTokenFileCredentialsProvider.Builder builder =
+ mock(WebIdentityTokenFileCredentialsProvider.Builder.class);
+ when(builder.roleArn(any())).thenReturn(builder);
+ when(builder.roleSessionName(any())).thenReturn(builder);
+ when(builder.webIdentityTokenFile(any())).thenReturn(builder);
+
+ return builder;
+ }
+}
diff --git a/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/TestUtil.java b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/TestUtil.java
new file mode 100644
index 0000000..cc52d17
--- /dev/null
+++ b/flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/util/TestUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.aws.util;
+
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+
+/** Utilities for tests in the package. */
+public class TestUtil {
+ public static Properties properties(final String key, final String value) {
+ Properties properties = new Properties();
+ properties.setProperty(key, value);
+ return properties;
+ }
+
+ public static Properties getStandardProperties() {
+ Properties config = properties(AWSConfigConstants.AWS_REGION, "us-east-1");
+ config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKeyId");
+ config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+ return config;
+ }
+
+ public static NettyConfiguration getNettyConfiguration(final SdkAsyncHttpClient httpClient)
+ throws Exception {
+ return getField("configuration", httpClient);
+ }
+
+ public static <T> T getField(String fieldName, Object obj) throws Exception {
+ Field field = obj.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return (T) field.get(obj);
+ }
+}
diff --git a/flink-connector-aws-base/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension b/flink-connector-aws-base/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
new file mode 100644
index 0000000..2899913
--- /dev/null
+++ b/flink-connector-aws-base/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension
\ No newline at end of file
diff --git a/flink-connector-aws-base/src/test/resources/archunit.properties b/flink-connector-aws-base/src/test/resources/archunit.properties
new file mode 100644
index 0000000..15be88c
--- /dev/null
+++ b/flink-connector-aws-base/src/test/resources/archunit.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# By default we allow removing existing violations, but fail when new violations are added.
+freeze.store.default.allowStoreUpdate=true
+
+# Enable this if a new (frozen) rule has been added in order to create the initial store and record the existing violations.
+#freeze.store.default.allowStoreCreation=true
+
+# Enable this to add allow new violations to be recorded.
+# NOTE: Adding new violations should be avoided when possible. If the rule was correct to flag a new
+# violation, please try to avoid creating the violation. If the violation was created due to a
+# shortcoming of the rule, file a JIRA issue so the rule can be improved.
+#freeze.refreeze=true
+
+freeze.store.default.path=archunit-violations
diff --git a/flink-connector-aws-base/src/test/resources/log4j2-test.properties b/flink-connector-aws-base/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..c4fa187
--- /dev/null
+++ b/flink-connector-aws-base/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-connector-aws-base/src/test/resources/profile b/flink-connector-aws-base/src/test/resources/profile
new file mode 100644
index 0000000..2573fd6
--- /dev/null
+++ b/flink-connector-aws-base/src/test/resources/profile
@@ -0,0 +1,7 @@
+[default]
+aws_access_key_id=11111111111111111111
+aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111
+
+[foo]
+aws_access_key_id=22222222222222222222
+aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCY2222222222
diff --git a/pom.xml b/pom.xml
index fbf9545..0dc0342 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,17 +62,13 @@ under the License.
<assertj.version>3.21.0</assertj.version>
<archunit.version>0.22.0</archunit.version>
<testcontainers.version>1.17.2</testcontainers.version>
- <mockito.version>2.21.0</mockito.version>
-
- <!-- For dependency convergence -->
- <kryo.version>2.24.0</kryo.version>
- <slf4j.version>1.7.36</slf4j.version>
- <snappy.version>1.1.8.3</snappy.version>
+ <mockito.version>3.4.6</mockito.version>
<flink.parent.artifactId>flink-connector-aws-parent</flink.parent.artifactId>
</properties>
<modules>
+ <module>flink-connector-aws-base</module>
<module>flink-connector-dynamodb</module>
<module>flink-sql-connector-dynamodb</module>
</modules>
@@ -102,6 +98,20 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <type>jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
@@ -141,6 +151,13 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-architecture-tests-test</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
@@ -168,23 +185,28 @@ under the License.
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
+ <version>1.7.36</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
- <version>${kryo.version}</version>
+ <version>2.24.0</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
- <version>${snappy.version}</version>
+ <version>1.1.8.3</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>1.3.9</version>
</dependency>
+ <dependency>
+ <groupId>org.objenesis</groupId>
+ <artifactId>objenesis</artifactId>
+ <version>2.1</version>
+ </dependency>
</dependencies>
</dependencyManagement>