You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/11/23 17:57:27 UTC

[flink-connector-aws] branch main updated: [FLINK-29900][Connectors/DynamoDB] Make configurations for DynamoDB Table API sink follow the standard AWS and HTTP client convention

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new a1d9391  [FLINK-29900][Connectors/DynamoDB] Make configurations for DynamoDB Table API sink follow the standard AWS and HTTP client convention
a1d9391 is described below

commit a1d939113fd997c9e96745d310273518d6d8cb7f
Author: Hong Liang Teoh <li...@amazon.com>
AuthorDate: Tue Nov 22 22:41:37 2022 +0000

    [FLINK-29900][Connectors/DynamoDB] Make configurations for DynamoDB Table API sink follow the standard AWS and HTTP client convention
---
 .../dynamodb/table/DynamoDbConfiguration.java      | 29 +++++--
 .../dynamodb/table/DynamoDbDynamicSinkFactory.java | 15 +---
 .../table/DynamoDbDynamicSinkFactoryTest.java      | 88 ++++++++++++++++++----
 3 files changed, 103 insertions(+), 29 deletions(-)

diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java
index ad4ca3c..b3fe27d 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java
@@ -20,6 +20,11 @@ package org.apache.flink.connector.dynamodb.table;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.aws.table.util.AsyncClientOptionsUtils;
+import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator;
+
+import java.util.Map;
+import java.util.Properties;
 
 import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR;
 import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME;
@@ -28,17 +33,31 @@ import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions
 @Internal
 public class DynamoDbConfiguration {
 
-    private final ReadableConfig config;
+    private final Map<String, String> rawTableOptions;
+    private final ReadableConfig tableOptions;
+    private final AsyncSinkConfigurationValidator asyncSinkConfigurationValidator;
+    private final AsyncClientOptionsUtils asyncClientOptionsUtils;
 
-    public DynamoDbConfiguration(ReadableConfig config) {
-        this.config = config;
+    public DynamoDbConfiguration(Map<String, String> rawTableOptions, ReadableConfig tableOptions) {
+        this.rawTableOptions = rawTableOptions;
+        this.tableOptions = tableOptions;
+        this.asyncSinkConfigurationValidator = new AsyncSinkConfigurationValidator(tableOptions);
+        this.asyncClientOptionsUtils = new AsyncClientOptionsUtils(rawTableOptions);
     }
 
     public String getTableName() {
-        return config.get(TABLE_NAME);
+        return tableOptions.get(TABLE_NAME);
     }
 
     public boolean getFailOnError() {
-        return config.get(FAIL_ON_ERROR);
+        return tableOptions.get(FAIL_ON_ERROR);
+    }
+
+    public Properties getAsyncSinkProperties() {
+        return asyncSinkConfigurationValidator.getValidatedConfigurations();
+    }
+
+    public Properties getSinkClientProperties() {
+        return asyncClientOptionsUtils.getValidatedConfigurations();
     }
 }
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java
index 8072a39..055caf2 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.dynamodb.table;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;
-import org.apache.flink.connector.base.table.sink.options.AsyncSinkConfigurationValidator;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.factories.FactoryUtil;
@@ -29,7 +28,6 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
 
 import java.util.HashSet;
-import java.util.Properties;
 import java.util.Set;
 
 import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.AWS_REGION;
@@ -50,10 +48,7 @@ public class DynamoDbDynamicSinkFactory extends AsyncDynamicTableSinkFactory {
         FactoryUtil.validateFactoryOptions(this, factoryHelper.getOptions());
 
         DynamoDbConfiguration dynamoDbConfiguration =
-                new DynamoDbConfiguration(factoryHelper.getOptions());
-
-        Properties tableProperties = new Properties();
-        tableProperties.putAll(catalogTable.getOptions());
+                new DynamoDbConfiguration(catalogTable.getOptions(), factoryHelper.getOptions());
 
         DynamoDbDynamicSink.DynamoDbDynamicTableSinkBuilder builder =
                 DynamoDbDynamicSink.builder()
@@ -62,12 +57,10 @@ public class DynamoDbDynamicSinkFactory extends AsyncDynamicTableSinkFactory {
                         .setPhysicalDataType(
                                 catalogTable.getResolvedSchema().toPhysicalRowDataType())
                         .setOverwriteByPartitionKeys(new HashSet<>(catalogTable.getPartitionKeys()))
-                        .setDynamoDbClientProperties(tableProperties);
+                        .setDynamoDbClientProperties(
+                                dynamoDbConfiguration.getSinkClientProperties());
 
-        AsyncSinkConfigurationValidator asyncSinkConfigurationValidator =
-                new AsyncSinkConfigurationValidator(factoryHelper.getOptions());
-        addAsyncOptionsToBuilder(
-                asyncSinkConfigurationValidator.getValidatedConfigurations(), builder);
+        addAsyncOptionsToBuilder(dynamoDbConfiguration.getAsyncSinkProperties(), builder);
 
         return builder.build();
     }
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
index df146ac..97248b8 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
@@ -71,7 +71,7 @@ public class DynamoDbDynamicSinkFactoryTest {
 
         // Construct expected sink
         Properties dynamoDbClientProperties = new Properties();
-        dynamoDbClientProperties.putAll(sinkOptions);
+        dynamoDbClientProperties.put("aws.region", "us-east-1");
         DynamoDbDynamicSink expectedSink =
                 (DynamoDbDynamicSink)
                         DynamoDbDynamicSink.builder()
@@ -104,14 +104,12 @@ public class DynamoDbDynamicSinkFactoryTest {
                 (DynamoDbDynamicSink) createTableSink(sinkSchema, sinkOptions);
 
         // Construct expected sink
-        Properties dynamoDbClientProperties = new Properties();
-        dynamoDbClientProperties.putAll(sinkOptions);
         DynamoDbDynamicSink expectedSink =
                 (DynamoDbDynamicSink)
                         DynamoDbDynamicSink.builder()
                                 .setTableName(DYNAMO_DB_TABLE_NAME)
                                 .setOverwriteByPartitionKeys(new HashSet<>())
-                                .setDynamoDbClientProperties(dynamoDbClientProperties)
+                                .setDynamoDbClientProperties(defaultSinkProperties())
                                 .setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
                                 .build();
 
@@ -124,14 +122,12 @@ public class DynamoDbDynamicSinkFactoryTest {
         Map<String, String> sinkOptions = defaultSinkOptions().build();
 
         // Construct expected sink
-        Properties dynamoDbClientProperties = new Properties();
-        dynamoDbClientProperties.putAll(sinkOptions);
         DynamoDbDynamicSink originalSink =
                 (DynamoDbDynamicSink)
                         DynamoDbDynamicSink.builder()
                                 .setTableName(DYNAMO_DB_TABLE_NAME)
                                 .setOverwriteByPartitionKeys(new HashSet<>())
-                                .setDynamoDbClientProperties(dynamoDbClientProperties)
+                                .setDynamoDbClientProperties(defaultSinkProperties())
                                 .setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
                                 .build();
 
@@ -150,14 +146,76 @@ public class DynamoDbDynamicSinkFactoryTest {
                 (DynamoDbDynamicSink) createTableSink(sinkSchema, partitionKeys, sinkOptions);
 
         // Construct expected sink
-        Properties dynamoDbClientProperties = new Properties();
-        dynamoDbClientProperties.putAll(sinkOptions);
         DynamoDbDynamicSink expectedSink =
                 (DynamoDbDynamicSink)
                         DynamoDbDynamicSink.builder()
                                 .setTableName(DYNAMO_DB_TABLE_NAME)
                                 .setOverwriteByPartitionKeys(new HashSet<>(partitionKeys))
-                                .setDynamoDbClientProperties(dynamoDbClientProperties)
+                                .setDynamoDbClientProperties(defaultSinkProperties())
+                                .setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
+                                .setFailOnError(true)
+                                .build();
+
+        assertThat(actualSink).usingRecursiveComparison().isEqualTo(expectedSink);
+    }
+
+    @Test
+    void testGoodTableSinkWithAwsCredentialOptions() {
+        ResolvedSchema sinkSchema = defaultSinkSchema();
+        Map<String, String> sinkOptions =
+                defaultSinkOptions().withTableOption(FAIL_ON_ERROR, "true").build();
+        sinkOptions.put("aws.credentials.provider", "BASIC");
+        sinkOptions.put("aws.credentials.basic.accesskeyid", "1234");
+        sinkOptions.put("aws.credentials.basic.secretkey", "5678");
+        List<String> partitionKeys = Collections.singletonList("partition_key");
+
+        // Construct actual sink
+        DynamoDbDynamicSink actualSink =
+                (DynamoDbDynamicSink) createTableSink(sinkSchema, partitionKeys, sinkOptions);
+
+        // Construct expected sink
+        Properties expectedSinkProperties = defaultSinkProperties();
+        expectedSinkProperties.put("aws.credentials.provider", "BASIC");
+        expectedSinkProperties.put("aws.credentials.provider.basic.accesskeyid", "1234");
+        expectedSinkProperties.put("aws.credentials.provider.basic.secretkey", "5678");
+        DynamoDbDynamicSink expectedSink =
+                (DynamoDbDynamicSink)
+                        DynamoDbDynamicSink.builder()
+                                .setTableName(DYNAMO_DB_TABLE_NAME)
+                                .setOverwriteByPartitionKeys(new HashSet<>(partitionKeys))
+                                .setDynamoDbClientProperties(expectedSinkProperties)
+                                .setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
+                                .setFailOnError(true)
+                                .build();
+
+        assertThat(actualSink).usingRecursiveComparison().isEqualTo(expectedSink);
+    }
+
+    @Test
+    void testGoodTableSinkWithHttpClientOptions() {
+        ResolvedSchema sinkSchema = defaultSinkSchema();
+        Map<String, String> sinkOptions =
+                defaultSinkOptions().withTableOption(FAIL_ON_ERROR, "true").build();
+        sinkOptions.put("sink.http-client.max-concurrency", "123");
+        sinkOptions.put("sink.http-client.read-timeout", "456");
+        sinkOptions.put("sink.http-client.protocol.version", "HTTP1_1");
+        List<String> partitionKeys = Collections.singletonList("partition_key");
+
+        // Construct actual sink
+        DynamoDbDynamicSink actualSink =
+                (DynamoDbDynamicSink) createTableSink(sinkSchema, partitionKeys, sinkOptions);
+
+        // Construct expected sink
+        Properties expectedSinkProperties = defaultSinkProperties();
+        expectedSinkProperties.put("aws.http-client.max-concurrency", "123");
+        expectedSinkProperties.put("aws.http-client.read-timeout", "456");
+        expectedSinkProperties.put("aws.http.protocol.version", "HTTP1_1");
+        DynamoDbDynamicSink expectedSink =
+                (DynamoDbDynamicSink)
+                        DynamoDbDynamicSink.builder()
+                                .setTableName(DYNAMO_DB_TABLE_NAME)
+                                .setOverwriteByPartitionKeys(new HashSet<>(partitionKeys))
+                                .setDynamoDbClientProperties(expectedSinkProperties)
                                 .setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
                                 .setFailOnError(true)
                                 .build();
@@ -183,8 +241,6 @@ public class DynamoDbDynamicSinkFactoryTest {
                 (DynamoDbDynamicSink) createTableSink(sinkSchema, partitionKeys, sinkOptions);
 
         // Construct expected sink
-        Properties dynamoDbClientProperties = new Properties();
-        dynamoDbClientProperties.putAll(sinkOptions);
         DynamoDbDynamicSink expectedSink =
                 (DynamoDbDynamicSink)
                         DynamoDbDynamicSink.builder()
@@ -195,7 +251,7 @@ public class DynamoDbDynamicSinkFactoryTest {
                                 .setMaxTimeInBufferMS(1000)
                                 .setTableName(DYNAMO_DB_TABLE_NAME)
                                 .setOverwriteByPartitionKeys(new HashSet<>(partitionKeys))
-                                .setDynamoDbClientProperties(dynamoDbClientProperties)
+                                .setDynamoDbClientProperties(defaultSinkProperties())
                                 .setPhysicalDataType(sinkSchema.toPhysicalRowDataType())
                                 .build();
 
@@ -292,4 +348,10 @@ public class DynamoDbDynamicSinkFactoryTest {
                 .withTableOption(TABLE_NAME, DYNAMO_DB_TABLE_NAME)
                 .withTableOption("aws.region", "us-east-1");
     }
+
+    private Properties defaultSinkProperties() {
+        Properties properties = new Properties();
+        properties.put("aws.region", "us-east-1");
+        return properties;
+    }
 }