You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/14 15:21:52 UTC

[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #15: [FLINK-29900][Connectors/DynamoDB] Implement Table API for DynamoDB sink

dannycranmer commented on code in PR #15:
URL: https://github.com/apache/flink-connector-aws/pull/15#discussion_r1021656848


##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java:
##########
@@ -0,0 +1,24 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR;
+import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME;
+
+/** DynamoDb specific configuration. */
+public class DynamoDbConfiguration {

Review Comment:
   Missing support annotation. Please make non-test classes have `@Internal/@PublicEvolving` annotation



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java:
##########
@@ -0,0 +1,28 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Base options for the DynamoDb connector. Needs to be public so that the {@link
+ * org.apache.flink.table.api.TableDescriptor} can access it.

Review Comment:
   nit: add an import for TableDescriptor and simplify comment



##########
flink-connector-dynamodb/pom.xml:
##########
@@ -77,6 +77,22 @@ under the License.
 			<artifactId>dynamodb-enhanced</artifactId>
 		</dependency>
 
+		<!-- Table ecosystem -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-api-java-bridge</artifactId>
+			<version>${flink.version}</version>
+			<scope>provided</scope>
+			<optional>true</optional>
+		</dependency>
+		<dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+            <optional>true</optional>
+        </dependency>

Review Comment:
   Formatting looks squiffy 



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConnectorOptions.java:
##########
@@ -0,0 +1,28 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Base options for the DynamoDb connector. Needs to be public so that the {@link
+ * org.apache.flink.table.api.TableDescriptor} can access it.
+ */
+public class DynamoDbConnectorOptions {
+
+    private DynamoDbConnectorOptions() {
+        // private constructor to prevent initialization of static class
+    }
+
+    public static final ConfigOption<String> TABLE_NAME =
+            ConfigOptions.key("table")

Review Comment:
   Should we keep this consistent with the DataStream field? `destination-table-name` or rename both to `table-name`?



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java:
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+/** Implementation of an {@link ElementConverter} specific for DynamoDb sink. */

Review Comment:
   Please elaborate here



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java:
##########
@@ -0,0 +1,24 @@
+package org.apache.flink.connector.dynamodb.table;

Review Comment:
   Missing copyright, please check all other files too



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java:
##########
@@ -0,0 +1,53 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME;
+
+/** */

Review Comment:
   todo



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java:
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+/** Implementation of an {@link ElementConverter} specific for DynamoDb sink. */
+public class RowDataElementConverter implements ElementConverter<RowData, DynamoDbWriteRequest> {
+
+    private final DataType physicalDataType;
+    private final RowDataToAttributeValueConverter rowDataToAttributeValueConverter;
+
+    public RowDataElementConverter(DataType physicalDataType) {
+        this.physicalDataType = physicalDataType;
+        this.rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(physicalDataType);
+    }
+
+    @Override
+    public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context context) {
+        switch (element.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                return getPutRequest(element);
+            case UPDATE_BEFORE:

Review Comment:
   Why does `UPDATE_BEFORE` map to delete?



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java:
##########
@@ -0,0 +1,198 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbSink;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbSinkBuilder;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link DynamoDbSink} from a logical
+ * description.
+ */
+@Internal
+public class DynamoDbDynamicSink extends AsyncDynamicTableSink<DynamoDbWriteRequest>
+        implements SupportsPartitioning {
+
+    private final String destinationTableName;
+    private final boolean failOnError;
+    private final Properties dynamoDbClientProperties;
+    private final DataType physicalDataType;
+    private final Set<String> overwriteByPartitionKeys;
+
+    protected DynamoDbDynamicSink(
+            @Nullable Integer maxBatchSize,
+            @Nullable Integer maxInFlightRequests,
+            @Nullable Integer maxBufferedRequests,
+            @Nullable Long maxBufferSizeInBytes,
+            @Nullable Long maxTimeInBufferMS,
+            String destinationTableName,
+            boolean failOnError,
+            Properties dynamoDbClientProperties,
+            DataType physicalDataType,
+            Set<String> overwriteByPartitionKeys) {
+        super(
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBufferSizeInBytes,
+                maxTimeInBufferMS);
+        this.destinationTableName = destinationTableName;
+        this.failOnError = failOnError;
+        this.dynamoDbClientProperties = dynamoDbClientProperties;
+        this.physicalDataType = physicalDataType;
+        this.overwriteByPartitionKeys = overwriteByPartitionKeys;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
+        // TODO: We can support consuming from CDC streams here
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        DynamoDbSinkBuilder<RowData> builder =
+                DynamoDbSink.<RowData>builder()
+                        .setDestinationTableName(destinationTableName)
+                        .setFailOnError(failOnError)
+                        .setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys))
+                        .setDynamoDbProperties(dynamoDbClientProperties)
+                        .setElementConverter(new RowDataElementConverter(physicalDataType));
+
+        addAsyncOptionsToSinkBuilder(builder);
+
+        // TODO: check if parallelism needed here
+        return SinkV2Provider.of(builder.build());
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new DynamoDbDynamicSink(
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBufferSizeInBytes,
+                maxTimeInBufferMS,
+                destinationTableName,
+                failOnError,
+                dynamoDbClientProperties,
+                physicalDataType,
+                overwriteByPartitionKeys);
+    }
+
+    @Override
+    public String asSummaryString() {
+        // TODO: check when this is called/returned
+        return "DynamoDB";
+    }
+
+    @Override
+    public void applyStaticPartition(Map<String, String> partitions) {
+        this.overwriteByPartitionKeys.addAll(partitions.keySet());
+    }
+
+    public static DynamoDbDynamicTableSinkBuilder builder() {
+        return new DynamoDbDynamicTableSinkBuilder();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        DynamoDbDynamicSink that = (DynamoDbDynamicSink) o;
+        return failOnError == that.failOnError
+                && Objects.equals(destinationTableName, that.destinationTableName)
+                && Objects.equals(dynamoDbClientProperties, that.dynamoDbClientProperties)
+                && Objects.equals(physicalDataType, that.physicalDataType)
+                && Objects.equals(overwriteByPartitionKeys, that.overwriteByPartitionKeys);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                super.hashCode(),
+                destinationTableName,
+                failOnError,
+                dynamoDbClientProperties,
+                physicalDataType,
+                overwriteByPartitionKeys);
+    }

Review Comment:
   Why are these needed? Please review https://flink.apache.org/contributing/code-style-and-quality-java.html#equals--hashcode and remove if we can



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java:
##########
@@ -0,0 +1,72 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.DataType;
+
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/** Converts from {@link RowData} to {@link AttributeValue}. */
+public class RowDataToAttributeValueConverter implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final DataType physicalDataType;
+    private transient TableSchema<RowData> tableSchema;
+
+    public RowDataToAttributeValueConverter(DataType physicalDataType) {
+        this.physicalDataType = physicalDataType;
+        this.tableSchema = createTableSchema();
+    }
+
+    private StaticTableSchema<RowData> createTableSchema() {
+        List<DataTypes.Field> fields = DataType.getFields(physicalDataType);
+        StaticTableSchema.Builder<RowData> builder = TableSchema.builder(RowData.class);
+        for (int i = 0; i < fields.size(); i++) {
+            DataTypes.Field field = fields.get(i);
+            RowData.FieldGetter fieldGetter =
+                    createFieldGetter(field.getDataType().getLogicalType(), i);
+
+            builder =
+                    addAttribute(
+                            builder, field.getDataType().getConversionClass(), field, fieldGetter);
+        }
+        return builder.build();
+    }
+
+    private <T> StaticTableSchema.Builder<RowData> addAttribute(
+            StaticTableSchema.Builder<RowData> builder,
+            Class<T> convertedClass,
+            DataTypes.Field field,
+            RowData.FieldGetter fieldGetter) {
+
+        return builder.addAttribute(
+                convertedClass,
+                a ->
+                        a.name(field.getName())
+                                .getter(
+                                        rowData ->
+                                                (T)
+                                                        DataStructureConverters.getConverter(
+                                                                        field.getDataType())
+                                                                .toExternal(
+                                                                        fieldGetter.getFieldOrNull(
+                                                                                rowData)))
+                                .setter(((rowData, t) -> {})));
+    }
+
+    public Map<String, AttributeValue> convertRowData(RowData row) {

Review Comment:
   nit: Public methods should typically go above private, for readability



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java:
##########
@@ -0,0 +1,72 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.DataType;
+
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/** Converts from {@link RowData} to {@link AttributeValue}. */
+public class RowDataToAttributeValueConverter implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final DataType physicalDataType;
+    private transient TableSchema<RowData> tableSchema;
+
+    public RowDataToAttributeValueConverter(DataType physicalDataType) {
+        this.physicalDataType = physicalDataType;
+        this.tableSchema = createTableSchema();
+    }
+
+    private StaticTableSchema<RowData> createTableSchema() {
+        List<DataTypes.Field> fields = DataType.getFields(physicalDataType);
+        StaticTableSchema.Builder<RowData> builder = TableSchema.builder(RowData.class);
+        for (int i = 0; i < fields.size(); i++) {
+            DataTypes.Field field = fields.get(i);
+            RowData.FieldGetter fieldGetter =
+                    createFieldGetter(field.getDataType().getLogicalType(), i);
+
+            builder =
+                    addAttribute(
+                            builder, field.getDataType().getConversionClass(), field, fieldGetter);
+        }
+        return builder.build();
+    }
+
+    private <T> StaticTableSchema.Builder<RowData> addAttribute(
+            StaticTableSchema.Builder<RowData> builder,
+            Class<T> convertedClass,
+            DataTypes.Field field,
+            RowData.FieldGetter fieldGetter) {
+
+        return builder.addAttribute(
+                convertedClass,
+                a ->
+                        a.name(field.getName())
+                                .getter(
+                                        rowData ->
+                                                (T)
+                                                        DataStructureConverters.getConverter(
+                                                                        field.getDataType())
+                                                                .toExternal(
+                                                                        fieldGetter.getFieldOrNull(
+                                                                                rowData)))
+                                .setter(((rowData, t) -> {})));
+    }

Review Comment:
   Yikes, spotless added some spots here :/ 



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java:
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+/** Implementation of an {@link ElementConverter} specific for DynamoDb sink. */
+public class RowDataElementConverter implements ElementConverter<RowData, DynamoDbWriteRequest> {
+
+    private final DataType physicalDataType;
+    private final RowDataToAttributeValueConverter rowDataToAttributeValueConverter;
+
+    public RowDataElementConverter(DataType physicalDataType) {
+        this.physicalDataType = physicalDataType;
+        this.rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(physicalDataType);
+    }
+
+    @Override
+    public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context context) {
+        switch (element.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                return getPutRequest(element);
+            case UPDATE_BEFORE:
+            case DELETE:
+                return getDeleteRequest(element);
+            default:
+                throw new TableException("Unsupported message kind: " + element.getRowKind());
+        }
+    }
+
+    private DynamoDbWriteRequest getPutRequest(RowData row) {
+        return DynamoDbWriteRequest.builder()
+                .setType(DynamoDbWriteRequestType.PUT)
+                .setItem(rowDataToAttributeValueConverter.convertRowData(row))
+                .build();
+    }
+
+    private DynamoDbWriteRequest getDeleteRequest(RowData row) {
+        return DynamoDbWriteRequest.builder()
+                .setType(DynamoDbWriteRequestType.DELETE)
+                .setItem(rowDataToAttributeValueConverter.convertRowData(row))
+                .build();
+    }

Review Comment:
   nit: I like this, however you could create the builder in the previous method
   
   ```
   @Override
       public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context context) {
         var builder = DynamoDbWriteRequest.builder()
                   .setItem(rowDataToAttributeValueConverter.convertRowData(row));
       
           switch (element.getRowKind()) {
               case INSERT:
               case UPDATE_AFTER:
                   builder.setType(DynamoDbWriteRequestType.PUT);
               case UPDATE_BEFORE:
               case DELETE:
                   builder.setType(DynamoDbWriteRequestType.DELETE);
               default:
                   throw new TableException("Unsupported message kind: " + element.getRowKind());
           }
   
         return builder.build();
       }
   ```



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java:
##########
@@ -0,0 +1,198 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbSink;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbSinkBuilder;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link DynamoDbSink} from a logical
+ * description.
+ */
+@Internal
+public class DynamoDbDynamicSink extends AsyncDynamicTableSink<DynamoDbWriteRequest>
+        implements SupportsPartitioning {
+
+    private final String destinationTableName;
+    private final boolean failOnError;
+    private final Properties dynamoDbClientProperties;
+    private final DataType physicalDataType;
+    private final Set<String> overwriteByPartitionKeys;
+
+    protected DynamoDbDynamicSink(
+            @Nullable Integer maxBatchSize,
+            @Nullable Integer maxInFlightRequests,
+            @Nullable Integer maxBufferedRequests,
+            @Nullable Long maxBufferSizeInBytes,
+            @Nullable Long maxTimeInBufferMS,
+            String destinationTableName,
+            boolean failOnError,
+            Properties dynamoDbClientProperties,
+            DataType physicalDataType,
+            Set<String> overwriteByPartitionKeys) {
+        super(
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBufferSizeInBytes,
+                maxTimeInBufferMS);
+        this.destinationTableName = destinationTableName;
+        this.failOnError = failOnError;
+        this.dynamoDbClientProperties = dynamoDbClientProperties;
+        this.physicalDataType = physicalDataType;
+        this.overwriteByPartitionKeys = overwriteByPartitionKeys;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
+        // TODO: We can support consuming from CDC streams here
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        DynamoDbSinkBuilder<RowData> builder =
+                DynamoDbSink.<RowData>builder()
+                        .setDestinationTableName(destinationTableName)
+                        .setFailOnError(failOnError)
+                        .setOverwriteByPartitionKeys(new ArrayList<>(overwriteByPartitionKeys))
+                        .setDynamoDbProperties(dynamoDbClientProperties)
+                        .setElementConverter(new RowDataElementConverter(physicalDataType));
+
+        addAsyncOptionsToSinkBuilder(builder);
+
+        // TODO: check if parallelism needed here
+        return SinkV2Provider.of(builder.build());
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new DynamoDbDynamicSink(
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBufferSizeInBytes,
+                maxTimeInBufferMS,
+                destinationTableName,
+                failOnError,
+                dynamoDbClientProperties,
+                physicalDataType,
+                overwriteByPartitionKeys);
+    }
+
+    @Override
+    public String asSummaryString() {
+        // TODO: check when this is called/returned
+        return "DynamoDB";
+    }
+
+    @Override
+    public void applyStaticPartition(Map<String, String> partitions) {
+        this.overwriteByPartitionKeys.addAll(partitions.keySet());

Review Comment:
   nit: remove `this`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org