You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/11/23 17:57:27 UTC
[flink-connector-aws] branch main updated: [FLINK-29900][Connectors/DynamoDB] Make configurations for DynamoDB Table API sink follow the standard AWS and HTTP client convention
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new a1d9391 [FLINK-29900][Connectors/DynamoDB] Make configurations for DynamoDB Table API sink follow the standard AWS and HTTP client convention
a1d9391 is described below
commit a1d939113fd997c9e96745d310273518d6d8cb7f
Author: Hong Liang Teoh <li...@amazon.com>
AuthorDate: Tue Nov 22 22:41:37 2022 +0000
[FLINK-29900][Connectors/DynamoDB] Make configurations for DynamoDB Table API sink follow the standard AWS and HTTP client convention
---
.../dynamodb/table/DynamoDbConfiguration.java | 29 +++++--
.../dynamodb/table/DynamoDbDynamicSinkFactory.java | 15 +---
.../table/DynamoDbDynamicSinkFactoryTest.java | 88 ++++++++++++++++++----
3 files changed, 103 insertions(+), 29 deletions(-)
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java
index ad4ca3c..b3fe27d 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java
@@ -20,6 +20,11 @@ package org.apache.flink.connector.dynamodb.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.aws.table.util.AsyncClientOptionsUtils;
+import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator;
+
+import java.util.Map;
+import java.util.Properties;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME;
@@ -28,17 +33,31 @@ import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions
@Internal
public class DynamoDbConfiguration {
- private final ReadableConfig config;
+ private final Map<String, String> rawTableOptions;
+ private final ReadableConfig tableOptions;
+ private final AsyncSinkConfigurationValidator asyncSinkConfigurationValidator;
+ private final AsyncClientOptionsUtils asyncClientOptionsUtils;
- public DynamoDbConfiguration(ReadableConfig config) {
- this.config = config;
+ public DynamoDbConfiguration(Map<String, String> rawTableOptions, ReadableConfig tableOptions) {
+ this.rawTableOptions = rawTableOptions;
+ this.tableOptions = tableOptions;
+ this.asyncSinkConfigurationValidator = new AsyncSinkConfigurationValidator(tableOptions);
+ this.asyncClientOptionsUtils = new AsyncClientOptionsUtils(rawTableOptions);
}
public String getTableName() {
- return config.get(TABLE_NAME);
+ return tableOptions.get(TABLE_NAME);
}
public boolean getFailOnError() {
- return config.get(FAIL_ON_ERROR);
+ return tableOptions.get(FAIL_ON_ERROR);
+ }
+
+ public Properties getAsyncSinkProperties() {
+ return asyncSinkConfigurationValidator.getValidatedConfigurations();
+ }
+
+ public Properties getSinkClientProperties() {
+ return asyncClientOptionsUtils.getValidatedConfigurations();
}
}
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java
index 8072a39..055caf2 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.dynamodb.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;
-import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.FactoryUtil;
@@ -29,7 +28,6 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
import java.util.HashSet;
-import java.util.Properties;
import java.util.Set;
import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.AWS_REGION;
@@ -50,10 +48,7 @@ public class DynamoDbDynamicSinkFactory extends AsyncDynamicTableSinkFactory {
FactoryUtil.validateFactoryOptions(this, factoryHelper.getOptions());
DynamoDbConfiguration dynamoDbConfiguration =
- new DynamoDbConfiguration(factoryHelper.getOptions());
-
- Properties tableProperties = new Properties();
- tableProperties.putAll(catalogTable.getOptions());
+ new DynamoDbConfiguration(catalogTable.getOptions(), factoryHelper.getOptions());
DynamoDbDynamicSink.DynamoDbDynamicTableSinkBuilder builder =
DynamoDbDynamicSink.builder()
@@ -62,12 +57,10 @@ public class DynamoDbDynamicSinkFactory extends AsyncDynamicTableSinkFactory {
.setPhysicalDataType(
catalogTable.getResolvedSchema().toPhysicalRowDataType())
.setOverwriteByPartitionKeys(new HashSet<>(catalogTable.getPartitionKeys()))
- .setDynamoDbClientProperties(tableProperties);
+ .setDynamoDbClientProperties(
+ dynamoDbConfiguration.getSinkClientProperties());
- AsyncSinkConfigurationValidator asyncSinkConfigurationValidator =
- new AsyncSinkConfigurationValidator(factoryHelper.getOptions());
- addAsyncOptionsToBuilder(
- asyncSinkConfigurationValidator.getValidatedConfigurations(), builder);
+ addAsyncOptionsToBuilder(dynamoDbConfiguration.getAsyncSinkProperties(), builder);
return builder.build();
}
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
index df146ac..97248b8 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
@@ -71,7 +71,7 @@ public class DynamoDbDynamicSinkFactoryTest {
// Construct expected sink
Properties dynamoDbClientProperties = new Properties();
- dynamoDbClientProperties.putAll(sinkOptions);
+ dynamoDbClientProperties.put("aws.region", "us-east-1");
DynamoDbDynamicSink expectedSink =
(DynamoDbDynamicSink)
DynamoDbDynamicSink.builder()
@@ -104,14 +104,12 @@ public class DynamoDbDynamicSinkFactoryTest {
(DynamoDbDynamicSink) createTableSink(sinkSchema, sinkOptions);
// Construct expected sink
- Properties dynamoDbClientProperties = new Properties();
- dynamoDbClientProperties.putAll(sinkOptions);
DynamoDbDynamicSink expectedSink =
(DynamoDbDynamicSink)
DynamoDbDynamicSink.builder()
.setTableName(DYNAMO_DB_TABLE_NAME)
.setOverwriteByPartitionKeys(new HashSet<>())
- .setDynamoDbClientProperties(dynamoDbClientProperties)
+ .setDynamoDbClientProperties(defaultSinkProperties())
.setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
.build();
@@ -124,14 +122,12 @@ public class DynamoDbDynamicSinkFactoryTest {
Map<String, String> sinkOptions = defaultSinkOptions().build();
// Construct expected sink
- Properties dynamoDbClientProperties = new Properties();
- dynamoDbClientProperties.putAll(sinkOptions);
DynamoDbDynamicSink originalSink =
(DynamoDbDynamicSink)
DynamoDbDynamicSink.builder()
.setTableName(DYNAMO_DB_TABLE_NAME)
.setOverwriteByPartitionKeys(new HashSet<>())
- .setDynamoDbClientProperties(dynamoDbClientProperties)
+ .setDynamoDbClientProperties(defaultSinkProperties())
.setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
.build();
@@ -150,14 +146,76 @@ public class DynamoDbDynamicSinkFactoryTest {
(DynamoDbDynamicSink) createTableSink(sinkSchema, partitionKeys, sinkOptions);
// Construct expected sink
- Properties dynamoDbClientProperties = new Properties();
- dynamoDbClientProperties.putAll(sinkOptions);
DynamoDbDynamicSink expectedSink =
(DynamoDbDynamicSink)
DynamoDbDynamicSink.builder()
.setTableName(DYNAMO_DB_TABLE_NAME)
.setOverwriteByPartitionKeys(new HashSet<>(partitionKeys))
- .setDynamoDbClientProperties(dynamoDbClientProperties)
+ .setDynamoDbClientProperties(defaultSinkProperties())
+ .setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
+ .setFailOnError(true)
+ .build();
+
+ assertThat(actualSink).usingRecursiveComparison().isEqualTo(expectedSink);
+ }
+
+ @Test
+ void testGoodTableSinkWithAwsCredentialOptions() {
+ ResolvedSchema sinkSchema = defaultSinkSchema();
+ Map<String, String> sinkOptions =
+ defaultSinkOptions().withTableOption(FAIL_ON_ERROR, "true").build();
+ sinkOptions.put("aws.credentials.provider", "BASIC");
+ sinkOptions.put("aws.credentials.basic.accesskeyid", "1234");
+ sinkOptions.put("aws.credentials.basic.secretkey", "5678");
+ List<String> partitionKeys = Collections.singletonList("partition_key");
+
+ // Construct actual sink
+ DynamoDbDynamicSink actualSink =
+ (DynamoDbDynamicSink) createTableSink(sinkSchema, partitionKeys, sinkOptions);
+
+ // Construct expected sink
+ Properties expectedSinkProperties = defaultSinkProperties();
+ expectedSinkProperties.put("aws.credentials.provider", "BASIC");
+ expectedSinkProperties.put("aws.credentials.provider.basic.accesskeyid", "1234");
+ expectedSinkProperties.put("aws.credentials.provider.basic.secretkey", "5678");
+ DynamoDbDynamicSink expectedSink =
+ (DynamoDbDynamicSink)
+ DynamoDbDynamicSink.builder()
+ .setTableName(DYNAMO_DB_TABLE_NAME)
+ .setOverwriteByPartitionKeys(new HashSet<>(partitionKeys))
+ .setDynamoDbClientProperties(expectedSinkProperties)
+ .setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
+ .setFailOnError(true)
+ .build();
+
+ assertThat(actualSink).usingRecursiveComparison().isEqualTo(expectedSink);
+ }
+
+ @Test
+ void testGoodTableSinkWithHttpClientOptions() {
+ ResolvedSchema sinkSchema = defaultSinkSchema();
+ Map<String, String> sinkOptions =
+ defaultSinkOptions().withTableOption(FAIL_ON_ERROR, "true").build();
+ sinkOptions.put("sink.http-client.max-concurrency", "123");
+ sinkOptions.put("sink.http-client.read-timeout", "456");
+ sinkOptions.put("sink.http-client.protocol.version", "HTTP1_1");
+ List<String> partitionKeys = Collections.singletonList("partition_key");
+
+ // Construct actual sink
+ DynamoDbDynamicSink actualSink =
+ (DynamoDbDynamicSink) createTableSink(sinkSchema, partitionKeys, sinkOptions);
+
+ // Construct expected sink
+ Properties expectedSinkProperties = defaultSinkProperties();
+ expectedSinkProperties.put("aws.http-client.max-concurrency", "123");
+ expectedSinkProperties.put("aws.http-client.read-timeout", "456");
+ expectedSinkProperties.put("aws.http.protocol.version", "HTTP1_1");
+ DynamoDbDynamicSink expectedSink =
+ (DynamoDbDynamicSink)
+ DynamoDbDynamicSink.builder()
+ .setTableName(DYNAMO_DB_TABLE_NAME)
+ .setOverwriteByPartitionKeys(new HashSet<>(partitionKeys))
+ .setDynamoDbClientProperties(expectedSinkProperties)
.setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
.setFailOnError(true)
.build();
@@ -183,8 +241,6 @@ public class DynamoDbDynamicSinkFactoryTest {
(DynamoDbDynamicSink) createTableSink(sinkSchema, partitionKeys, sinkOptions);
// Construct expected sink
- Properties dynamoDbClientProperties = new Properties();
- dynamoDbClientProperties.putAll(sinkOptions);
DynamoDbDynamicSink expectedSink =
(DynamoDbDynamicSink)
DynamoDbDynamicSink.builder()
@@ -195,7 +251,7 @@ public class DynamoDbDynamicSinkFactoryTest {
.setMaxTimeInBufferMS(1000)
.setTableName(DYNAMO_DB_TABLE_NAME)
.setOverwriteByPartitionKeys(new HashSet<>(partitionKeys))
- .setDynamoDbClientProperties(dynamoDbClientProperties)
+ .setDynamoDbClientProperties(defaultSinkProperties())
.setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
.build();
@@ -292,4 +348,10 @@ public class DynamoDbDynamicSinkFactoryTest {
.withTableOption(TABLE_NAME, DYNAMO_DB_TABLE_NAME)
.withTableOption("aws.region", "us-east-1");
}
+
+ private Properties defaultSinkProperties() {
+ Properties properties = new Properties();
+ properties.put("aws.region", "us-east-1");
+ return properties;
+ }
}