You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/11/25 14:16:33 UTC

[flink-connector-aws] branch main updated: [FLINK-29895][Connectors/DynamoDB] Further increase unit test coverage for DynamoDbSink

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e09d57  [FLINK-29895][Connectors/DynamoDB] Further increase unit test coverage for DynamoDbSink
9e09d57 is described below

commit 9e09d57210c4b44a9f88cc25781e96c452b2ec49
Author: Hong Liang Teoh <li...@amazon.com>
AuthorDate: Thu Nov 24 16:36:47 2022 +0000

    [FLINK-29895][Connectors/DynamoDB] Further increase unit test coverage for DynamoDbSink
---
 .../connector/dynamodb/sink/DynamoDbSink.java      |   3 +-
 .../dynamodb/sink/DynamoDbSinkWriter.java          |  51 +-
 .../dynamodb/sink/DynamoDbWriteRequest.java        |  21 +-
 .../sink/client/DynamoDbAsyncClientProvider.java   |  75 +++
 .../dynamodb/sink/client/SdkClientProvider.java    |  36 ++
 .../connector/dynamodb/sink/DynamoDbSinkTest.java  |  17 +
 .../dynamodb/sink/DynamoDbSinkWriterTest.java      | 592 +++++++++++++++++++++
 .../dynamodb/sink/DynamoDbWriteRequestTest.java    |  17 +
 .../sink/DynamoDbWriterStateSerializerTest.java    |  10 +-
 .../table/RowDataElementConverterTest.java         |   8 +-
 .../util/DynamoDbSerializationUtilTest.java        |  26 +-
 11 files changed, 791 insertions(+), 65 deletions(-)

diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
index e17d5f8..c90aa19 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.connector.base.sink.AsyncSinkBase;
 import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.dynamodb.sink.client.DynamoDbAsyncClientProvider;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
@@ -151,7 +152,7 @@ public class DynamoDbSink<InputT> extends AsyncSinkBase<InputT, DynamoDbWriteReq
                 failOnError,
                 tableName,
                 overwriteByPartitionKeys,
-                dynamoDbClientProperties,
+                new DynamoDbAsyncClientProvider(dynamoDbClientProperties),
                 recoveredState);
     }
 
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
index d371727..2e6b26f 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
@@ -20,13 +20,12 @@ package org.apache.flink.connector.dynamodb.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink2.Sink.InitContext;
-import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
 import org.apache.flink.connector.aws.util.AWSGeneralUtil;
 import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
 import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
 import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
 import org.apache.flink.connector.dynamodb.util.PrimaryKeyBuilder;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -36,7 +35,6 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
 import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
 import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
 import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
@@ -53,7 +51,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
@@ -96,7 +93,6 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
                     (err) ->
                             err instanceof DynamoDbException
                                     && ((DynamoDbException) err)
-                                            .toBuilder()
                                             .awsErrorDetails()
                                             .errorCode()
                                             .equalsIgnoreCase("ValidationException"),
@@ -123,9 +119,7 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
     /* The sink writer metric group */
     private final SinkWriterMetricGroup metrics;
 
-    /* The asynchronous http client for the asynchronous DynamoDb client */
-    private final SdkAsyncHttpClient httpClient;
-    private final DynamoDbAsyncClient client;
+    private final SdkClientProvider<DynamoDbAsyncClient> clientProvider;
     private final boolean failOnError;
     private final String tableName;
 
@@ -143,7 +137,7 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
             boolean failOnError,
             String tableName,
             List<String> overwriteByPartitionKeys,
-            Properties dynamoDbClientProperties,
+            SdkClientProvider<DynamoDbAsyncClient> clientProvider,
             Collection<BufferedRequestState<DynamoDbWriteRequest>> states) {
         super(
                 elementConverter,
@@ -161,31 +155,7 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
         this.metrics = context.metricGroup();
         this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
         this.numRecordsSendPartialFailure = metrics.counter("numRecordsSendPartialFailure");
-        this.httpClient =
-                AWSGeneralUtil.createAsyncHttpClient(
-                        overrideClientProperties(dynamoDbClientProperties));
-        this.client = buildClient(dynamoDbClientProperties, httpClient);
-    }
-
-    private Properties overrideClientProperties(Properties dynamoDbClientProperties) {
-        Properties overridenProperties = new Properties();
-        overridenProperties.putAll(dynamoDbClientProperties);
-
-        // Specify HTTP1_1 protocol since DynamoDB endpoint doesn't support HTTP2
-        overridenProperties.put(AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP1_1");
-        return overridenProperties;
-    }
-
-    private DynamoDbAsyncClient buildClient(
-            Properties dynamoDbClientProperties, SdkAsyncHttpClient httpClient) {
-        AWSGeneralUtil.validateAwsCredentials(dynamoDbClientProperties);
-
-        return AWSAsyncSinkUtil.createAwsAsyncClient(
-                dynamoDbClientProperties,
-                httpClient,
-                DynamoDbAsyncClient.builder(),
-                DynamoDbConfigConstants.BASE_DYNAMODB_USER_AGENT_PREFIX_FORMAT,
-                DynamoDbConfigConstants.DYNAMODB_CLIENT_USER_AGENT_PREFIX);
+        this.clientProvider = clientProvider;
     }
 
     @Override
@@ -211,10 +181,12 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
         }
 
         CompletableFuture<BatchWriteItemResponse> future =
-                client.batchWriteItem(
-                        BatchWriteItemRequest.builder()
-                                .requestItems(ImmutableMap.of(tableName, items))
-                                .build());
+                clientProvider
+                        .getClient()
+                        .batchWriteItem(
+                                BatchWriteItemRequest.builder()
+                                        .requestItems(ImmutableMap.of(tableName, items))
+                                        .build());
 
         future.whenComplete(
                 (response, err) -> {
@@ -259,6 +231,7 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
     }
 
     private boolean isRetryable(Throwable err) {
+        // isFatal() is really isNotFatal()
         if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, getFatalExceptionCons())) {
             return false;
         }
@@ -281,7 +254,7 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
 
     @Override
     public void close() {
-        AWSGeneralUtil.closeResources(httpClient, client);
+        AWSGeneralUtil.closeResources(clientProvider);
     }
 
     private WriteRequest convertToWriteRequest(DynamoDbWriteRequest dynamoDbWriteRequest) {
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java
index 4fd340a..c8e3c88 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java
@@ -25,7 +25,6 @@ import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 
 import java.io.Serializable;
 import java.util.Map;
-import java.util.Objects;
 
 /**
  * Represents a single Write Request to DynamoDb. Contains the item to be written as well as the
@@ -52,25 +51,13 @@ public class DynamoDbWriteRequest implements Serializable {
         return type;
     }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        DynamoDbWriteRequest that = (DynamoDbWriteRequest) o;
-        return item.equals(that.item) && type == that.type;
+    public static Builder builder() {
+        return new Builder();
     }
 
     @Override
-    public int hashCode() {
-        return Objects.hash(item, type);
-    }
-
-    public static Builder builder() {
-        return new Builder();
+    public String toString() {
+        return "DynamoDbWriteRequest{" + "item=" + item + ", type=" + type + '}';
     }
 
     /** Builder for DynamoDbWriteRequest. */
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/client/DynamoDbAsyncClientProvider.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/client/DynamoDbAsyncClientProvider.java
new file mode 100644
index 0000000..f72e304
--- /dev/null
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/client/DynamoDbAsyncClientProvider.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink.client;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbConfigConstants;
+
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+
+import java.util.Properties;
+
+/** Provides a {@link DynamoDbAsyncClient}. */
+@Internal
+public class DynamoDbAsyncClientProvider implements SdkClientProvider<DynamoDbAsyncClient> {
+
+    private final SdkAsyncHttpClient httpClient;
+    private final DynamoDbAsyncClient dynamoDbAsyncClient;
+
+    public DynamoDbAsyncClientProvider(Properties clientProperties) {
+        this.httpClient =
+                AWSGeneralUtil.createAsyncHttpClient(overrideClientProperties(clientProperties));
+        this.dynamoDbAsyncClient = buildClient(clientProperties, httpClient);
+    }
+
+    @Override
+    public DynamoDbAsyncClient getClient() {
+        return dynamoDbAsyncClient;
+    }
+
+    @Override
+    public void close() {
+        AWSGeneralUtil.closeResources(httpClient, dynamoDbAsyncClient);
+    }
+
+    private Properties overrideClientProperties(Properties dynamoDbClientProperties) {
+        Properties overridenProperties = new Properties();
+        overridenProperties.putAll(dynamoDbClientProperties);
+
+        // Specify HTTP1_1 protocol since DynamoDB endpoint doesn't support HTTP2
+        overridenProperties.putIfAbsent(AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP1_1");
+        return overridenProperties;
+    }
+
+    private DynamoDbAsyncClient buildClient(
+            Properties dynamoDbClientProperties, SdkAsyncHttpClient httpClient) {
+        AWSGeneralUtil.validateAwsCredentials(dynamoDbClientProperties);
+
+        return AWSAsyncSinkUtil.createAwsAsyncClient(
+                dynamoDbClientProperties,
+                httpClient,
+                DynamoDbAsyncClient.builder(),
+                DynamoDbConfigConstants.BASE_DYNAMODB_USER_AGENT_PREFIX_FORMAT,
+                DynamoDbConfigConstants.DYNAMODB_CLIENT_USER_AGENT_PREFIX);
+    }
+}
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/client/SdkClientProvider.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/client/SdkClientProvider.java
new file mode 100644
index 0000000..c4ce097
--- /dev/null
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/client/SdkClientProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink.client;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.utils.SdkAutoCloseable;
+
+/** Provides a {@link SdkClient}. */
+@Internal
+public interface SdkClientProvider<T extends SdkClient> extends SdkAutoCloseable {
+
+    /**
+     * Returns {@link T}.
+     *
+     * @return the AWS SDK client
+     */
+    T getClient();
+}
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
index c2f88eb..9cf198e 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
@@ -34,6 +34,7 @@ import static org.apache.flink.connector.aws.config.AWSConfigConstants.Credentia
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.SYS_PROP;
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.WEB_IDENTITY_TOKEN;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 /** Test for {@link DynamoDbSink}. */
 public class DynamoDbSinkTest {
@@ -229,6 +230,22 @@ public class DynamoDbSinkTest {
                 .withMessageContaining("Invalid AWS Credential Provider Type set in config.");
     }
 
+    @Test
+    public void testGetWriterStateSerializer() {
+        Properties properties = getDefaultProperties();
+        properties.put(AWS_CREDENTIALS_PROVIDER, "INVALID_CREDENTIALS_PROVIDER");
+        DynamoDbSink<Map<String, AttributeValue>> sink =
+                DynamoDbSink.<Map<String, AttributeValue>>builder()
+                        .setElementConverter(new TestDynamoDbElementConverter())
+                        .setDynamoDbProperties(properties)
+                        .setTableName("test_table")
+                        .build();
+
+        assertThat(sink.getWriterStateSerializer())
+                .usingRecursiveComparison()
+                .isEqualTo(new DynamoDbWriterStateSerializer());
+    }
+
     private Properties getDefaultProperties() {
         Properties properties = new Properties();
         properties.setProperty(AWS_REGION, "us-east-1");
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
new file mode 100644
index 0000000..9a178e7
--- /dev/null
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
+import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+import software.amazon.awssdk.services.sts.model.StsException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DynamoDbSinkWriter}. */
+public class DynamoDbSinkWriterTest {
+
+    private static final String PARTITION_KEY = "partition_key";
+    private static final String SORT_KEY = "sort_key";
+    private static final String TABLE_NAME = "table_name";
+    private static final long FUTURE_TIMEOUT_MS = 1000;
+
+    @Test
+    public void testSuccessfulRequestWithNoDeduplication() throws Exception {
+        List<String> overwriteByPartitionKeys = Collections.emptyList();
+        List<DynamoDbWriteRequest> inputRequests =
+                ImmutableList.of(
+                        sinkPutRequest(item("pk", "1")),
+                        sinkPutRequest(item("pk", "2")),
+                        sinkDeleteRequest(item("pk", "3")),
+                        sinkDeleteRequest(item("pk", "4")));
+        List<WriteRequest> expectedClientRequests =
+                ImmutableList.of(
+                        dynamoDbPutRequest(item("pk", "1")),
+                        dynamoDbPutRequest(item("pk", "2")),
+                        dynamoDbDeleteRequest(item("pk", "3")),
+                        dynamoDbDeleteRequest(item("pk", "4")));
+
+        TrackingDynamoDbAsyncClient trackingDynamoDbAsyncClient = new TrackingDynamoDbAsyncClient();
+        DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+                getDefaultSinkWriter(
+                        true, overwriteByPartitionKeys, () -> trackingDynamoDbAsyncClient);
+        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+        dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer);
+        assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
+                .isNotEmpty()
+                .containsExactly(expectedClientRequests);
+        assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)).isEmpty();
+    }
+
+    @Test
+    public void testPutRequestPartitionKeyDeduplication() throws Exception {
+        List<String> overwriteByPartitionKeys = ImmutableList.of(PARTITION_KEY);
+        List<DynamoDbWriteRequest> inputRequests =
+                ImmutableList.of(sinkPutRequest(item("pk", "1")), sinkPutRequest(item("pk", "2")));
+        List<WriteRequest> expectedClientRequests =
+                ImmutableList.of(dynamoDbPutRequest(item("pk", "2")));
+
+        TrackingDynamoDbAsyncClient trackingDynamoDbAsyncClient = new TrackingDynamoDbAsyncClient();
+        DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+                getDefaultSinkWriter(
+                        true, overwriteByPartitionKeys, () -> trackingDynamoDbAsyncClient);
+        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+        dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer);
+        assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
+                .isNotEmpty()
+                .containsExactly(expectedClientRequests);
+        assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)).isEmpty();
+    }
+
+    @Test
+    public void testDeleteRequestPartitionKeyDeduplication() throws Exception {
+        List<String> overwriteByPartitionKeys = ImmutableList.of(PARTITION_KEY);
+        List<DynamoDbWriteRequest> inputRequests =
+                ImmutableList.of(
+                        sinkDeleteRequest(item("pk", "1")), sinkDeleteRequest(item("pk", "2")));
+        List<WriteRequest> expectedClientRequests =
+                ImmutableList.of(dynamoDbDeleteRequest(item("pk", "2")));
+
+        TrackingDynamoDbAsyncClient trackingDynamoDbAsyncClient = new TrackingDynamoDbAsyncClient();
+        DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+                getDefaultSinkWriter(
+                        true, overwriteByPartitionKeys, () -> trackingDynamoDbAsyncClient);
+        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+        dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer);
+        assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
+                .isNotEmpty()
+                .containsExactly(expectedClientRequests);
+        assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)).isEmpty();
+    }
+
+    @Test
+    public void testMultipleKeyDeduplication() throws Exception {
+        List<String> overwriteByPartitionKeys = ImmutableList.of(PARTITION_KEY, SORT_KEY);
+        List<DynamoDbWriteRequest> inputRequests =
+                ImmutableList.of(
+                        sinkPutRequest(item("pk", "1")),
+                        sinkPutRequest(item("pk", "2")),
+                        sinkPutRequest(itemWithPayload("pk", "2", "string_payload_1")),
+                        sinkPutRequest(itemWithPayload("pk", "2", "string_payload_2")));
+        List<WriteRequest> expectedClientRequests =
+                ImmutableList.of(
+                        dynamoDbPutRequest(item("pk", "1")),
+                        dynamoDbPutRequest(itemWithPayload("pk", "2", "string_payload_2")));
+
+        TrackingDynamoDbAsyncClient trackingDynamoDbAsyncClient = new TrackingDynamoDbAsyncClient();
+        DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+                getDefaultSinkWriter(
+                        true, overwriteByPartitionKeys, () -> trackingDynamoDbAsyncClient);
+        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+        dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer);
+        // Order does not matter in a batch write request
+        assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
+                .isNotEmpty()
+                .allSatisfy(
+                        clientBatchRequest ->
+                                assertThat(clientBatchRequest)
+                                        .containsExactlyInAnyOrderElementsOf(
+                                                expectedClientRequests));
+        assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)).isEmpty();
+    }
+
+    @Test
+    public void testRetryableExceptionWhenFailOnErrorOnWillNotRetry() {
+        Optional<Exception> exceptionToThrow = getGenericRetryableException();
+        boolean failOnError = true;
+
+        assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+    }
+
+    @Test
+    public void testRetryableExceptionWhenFailOnErrorOffWillRetry() throws Exception {
+        Optional<Exception> exceptionToThrow = getGenericRetryableException();
+        boolean failOnError = false;
+
+        List<DynamoDbWriteRequest> inputRequests = getDefaultInputRequests();
+        ThrowingDynamoDbAsyncClient<Exception> throwingDynamoDbAsyncClient =
+                new ThrowingDynamoDbAsyncClient<>(exceptionToThrow, str -> true);
+        DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+                getDefaultSinkWriter(
+                        failOnError, Collections.emptyList(), () -> throwingDynamoDbAsyncClient);
+        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+        dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer);
+
+        assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS))
+                .containsExactlyInAnyOrderElementsOf(inputRequests);
+    }
+
+    @Test
+    public void testPartiallyFailedRequestRetriesFailedRecords() throws Exception {
+        boolean failOnError = false;
+        List<DynamoDbWriteRequest> inputRequests =
+                ImmutableList.of(
+                        sinkPutRequest(item("put_will_fail_pk", "1")),
+                        sinkPutRequest(item("put_will_not_fail_pk", "2")),
+                        sinkDeleteRequest(item("delete_will_fail_pk", "3")),
+                        sinkDeleteRequest(item("delete_will_not_fail_pk", "4")));
+        List<DynamoDbWriteRequest> expectedRetriedRecords =
+                ImmutableList.of(
+                        sinkPutRequest(item("put_will_fail_pk", "1")),
+                        sinkDeleteRequest(item("delete_will_fail_pk", "3")));
+        Predicate<String> failWhenPartitionKeyMatcher = str -> str.contains("will_fail_pk");
+
+        FailingRecordsDynamoDbAsyncClient failingRecordsDynamoDbAsyncClient =
+                new FailingRecordsDynamoDbAsyncClient(failWhenPartitionKeyMatcher);
+        DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+                getDefaultSinkWriter(
+                        failOnError,
+                        Collections.emptyList(),
+                        () -> failingRecordsDynamoDbAsyncClient);
+        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+        dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer);
+
+        assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS))
+                .usingRecursiveComparison()
+                .isEqualTo(expectedRetriedRecords);
+    }
+
+    @Test
+    public void testNonRetryableExceptionWhenFailOnErrorOnWillNotRetry() {
+        Optional<Exception> exceptionToThrow = getGenericNonRetryableException();
+        boolean failOnError = true;
+
+        assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+    }
+
+    @Test
+    public void testNonRetryableExceptionWhenFailOnErrorOffWillNotRetry() {
+        Optional<Exception> exceptionToThrow = getGenericNonRetryableException();
+        boolean failOnError = false;
+
+        assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+    }
+
+    @Test
+    public void testInterruptedExceptionIsNonRetryable() {
+        Optional<Exception> exceptionToThrow = Optional.of(new InterruptedException());
+        boolean failOnError = false;
+
+        assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+    }
+
+    @Test
+    public void testInvalidCredentialsExceptionIsNonRetryable() {
+        Optional<Exception> exceptionToThrow = Optional.of(StsException.builder().build());
+        boolean failOnError = false;
+
+        assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+    }
+
+    @Test
+    public void testResourceNotFoundExceptionIsNonRetryable() {
+        Optional<Exception> exceptionToThrow =
+                Optional.of(ResourceNotFoundException.builder().build());
+        boolean failOnError = false;
+
+        assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+    }
+
+    @Test
+    public void testConditionalCheckFailedExceptionIsNonRetryable() {
+        Optional<Exception> exceptionToThrow =
+                Optional.of(ConditionalCheckFailedException.builder().build());
+        boolean failOnError = false;
+
+        assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+    }
+
+    @Test
+    public void testValidationExceptionIsNonRetryable() {
+        Optional<Exception> exceptionToThrow =
+                Optional.of(
+                        DynamoDbException.builder()
+                                .awsErrorDetails(
+                                        AwsErrorDetails.builder()
+                                                .errorCode("ValidationException")
+                                                .build())
+                                .build());
+        boolean failOnError = false;
+
+        assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+    }
+
+    @Test
+    public void testSdkClientExceptionIsNonRetryable() {
+        Optional<Exception> exceptionToThrow = Optional.of(SdkClientException.builder().build());
+        boolean failOnError = false;
+
+        assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+    }
+
+    @Test
+    public void testGetSizeInBytesNotImplemented() {
+        DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+                getDefaultSinkWriter(
+                        false, Collections.emptyList(), () -> new TrackingDynamoDbAsyncClient());
+        assertThat(dynamoDbSinkWriter.getSizeInBytes(sinkPutRequest(item("pk", "1")))).isEqualTo(0);
+    }
+
+    @Test
+    public void testClientClosesWhenWriterIsClosed() {
+        TestAsyncDynamoDbClientProvider testAsyncDynamoDbClientProvider =
+                new TestAsyncDynamoDbClientProvider(new TrackingDynamoDbAsyncClient());
+        DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+                getDefaultSinkWriter(
+                        false, Collections.emptyList(), testAsyncDynamoDbClientProvider);
+        dynamoDbSinkWriter.close();
+
+        assertThat(testAsyncDynamoDbClientProvider.getCloseCount()).isEqualTo(1);
+    }
+
+    private void assertThatRequestsAreNotRetried(
+            boolean failOnError, Optional<Exception> exceptionToThrow) {
+        ThrowingDynamoDbAsyncClient<Exception> throwingDynamoDbAsyncClient =
+                new ThrowingDynamoDbAsyncClient<>(exceptionToThrow, str -> true);
+        DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+                getDefaultSinkWriter(
+                        failOnError, Collections.emptyList(), () -> throwingDynamoDbAsyncClient);
+        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+        dynamoDbSinkWriter.submitRequestEntries(getDefaultInputRequests(), failedRequestConsumer);
+        assertThat(failedRequests).isNotCompleted();
+    }
+
+    private DynamoDbSinkWriter<Map<String, AttributeValue>> getDefaultSinkWriter(
+            boolean failOnError,
+            List<String> overwriteByPartitionKeys,
+            Supplier<DynamoDbAsyncClient> clientSupplier) {
+        return getDefaultSinkWriter(
+                failOnError,
+                overwriteByPartitionKeys,
+                new TestAsyncDynamoDbClientProvider(clientSupplier.get()));
+    }
+
+    private DynamoDbSinkWriter<Map<String, AttributeValue>> getDefaultSinkWriter(
+            boolean failOnError,
+            List<String> overwriteByPartitionKeys,
+            SdkClientProvider<DynamoDbAsyncClient> dynamoDbAsyncClientProvider) {
+        Sink.InitContext initContext = new TestSinkInitContext();
+        return new DynamoDbSinkWriter(
+                new TestDynamoDbElementConverter(),
+                initContext,
+                2,
+                1,
+                10,
+                1024,
+                1000,
+                1024,
+                failOnError,
+                TABLE_NAME,
+                overwriteByPartitionKeys,
+                dynamoDbAsyncClientProvider,
+                Collections.emptyList());
+    }
+
+    private List<DynamoDbWriteRequest> getDefaultInputRequests() {
+        return ImmutableList.of(sinkPutRequest(item("pk", "1")), sinkPutRequest(item("pk", "2")));
+    }
+
+    private Optional<Exception> getGenericRetryableException() {
+        return Optional.of(
+                ProvisionedThroughputExceededException.builder()
+                        .awsErrorDetails(
+                                AwsErrorDetails.builder()
+                                        .errorCode("SomeErrorCodeThatIsNotUsed")
+                                        .build())
+                        .build());
+    }
+
+    private Optional<Exception> getGenericNonRetryableException() {
+        return Optional.of(
+                ResourceNotFoundException.builder()
+                        .awsErrorDetails(
+                                AwsErrorDetails.builder()
+                                        .errorCode("SomeErrorCodeThatIsNotUsed")
+                                        .build())
+                        .build());
+    }
+
+    private DynamoDbWriteRequest sinkPutRequest(Map<String, AttributeValue> item) {
+        return DynamoDbWriteRequest.builder()
+                .setType(DynamoDbWriteRequestType.PUT)
+                .setItem(item)
+                .build();
+    }
+
+    private DynamoDbWriteRequest sinkDeleteRequest(Map<String, AttributeValue> item) {
+        return DynamoDbWriteRequest.builder()
+                .setType(DynamoDbWriteRequestType.DELETE)
+                .setItem(item)
+                .build();
+    }
+
+    private WriteRequest dynamoDbPutRequest(Map<String, AttributeValue> item) {
+        return WriteRequest.builder().putRequest(PutRequest.builder().item(item).build()).build();
+    }
+
+    private WriteRequest dynamoDbDeleteRequest(Map<String, AttributeValue> item) {
+        return WriteRequest.builder()
+                .deleteRequest(DeleteRequest.builder().key(item).build())
+                .build();
+    }
+
+    private Map<String, AttributeValue> item(String partitionKey, String sortKey) {
+        return ImmutableMap.<String, AttributeValue>builder()
+                .put(PARTITION_KEY, AttributeValue.builder().s(partitionKey).build())
+                .put(SORT_KEY, AttributeValue.builder().n(sortKey).build())
+                .put("string_payload", AttributeValue.builder().s("some_strings").build())
+                .put("number_payload", AttributeValue.builder().n("1234").build())
+                .build();
+    }
+
+    private Map<String, AttributeValue> itemWithPayload(
+            String partitionKey, String sortKey, String payload) {
+        return ImmutableMap.<String, AttributeValue>builder()
+                .put(PARTITION_KEY, AttributeValue.builder().s(partitionKey).build())
+                .put(SORT_KEY, AttributeValue.builder().n(sortKey).build())
+                .put("string_payload", AttributeValue.builder().s(payload).build())
+                .put("number_payload", AttributeValue.builder().n("1234").build())
+                .build();
+    }
+
+    private static class TestAsyncDynamoDbClientProvider
+            implements SdkClientProvider<DynamoDbAsyncClient> {
+
+        private final DynamoDbAsyncClient dynamoDbAsyncClient;
+        private int closeCount = 0;
+
+        private TestAsyncDynamoDbClientProvider(DynamoDbAsyncClient dynamoDbAsyncClient) {
+            this.dynamoDbAsyncClient = dynamoDbAsyncClient;
+        }
+
+        @Override
+        public DynamoDbAsyncClient getClient() {
+            return dynamoDbAsyncClient;
+        }
+
+        @Override
+        public void close() {
+            closeCount++;
+        }
+
+        public int getCloseCount() {
+            return closeCount;
+        }
+    }
+
+    private static class TrackingDynamoDbAsyncClient implements DynamoDbAsyncClient {
+
+        private List<List<WriteRequest>> requestHistory = new ArrayList<>();
+
+        @Override
+        public String serviceName() {
+            return "DynamoDB";
+        }
+
+        @Override
+        public void close() {}
+
+        @Override
+        public CompletableFuture<BatchWriteItemResponse> batchWriteItem(
+                BatchWriteItemRequest batchWriteItemRequest) {
+            requestHistory.add(batchWriteItemRequest.requestItems().get(TABLE_NAME));
+            return CompletableFuture.completedFuture(BatchWriteItemResponse.builder().build());
+        }
+
+        public List<List<WriteRequest>> getRequestHistory() {
+            return requestHistory;
+        }
+    }
+
+    private static class ThrowingDynamoDbAsyncClient<T extends Throwable>
+            implements DynamoDbAsyncClient {
+
+        private final Optional<T> errorToReturn;
+        private final Predicate<String> failWhenMatched;
+
+        private ThrowingDynamoDbAsyncClient(
+                Optional<T> errorToReturn, Predicate<String> failWhenMatched) {
+            this.errorToReturn = errorToReturn;
+            this.failWhenMatched = failWhenMatched;
+        }
+
+        @Override
+        public String serviceName() {
+            return "DynamoDB";
+        }
+
+        @Override
+        public void close() {}
+
+        @Override
+        public CompletableFuture<BatchWriteItemResponse> batchWriteItem(
+                BatchWriteItemRequest batchWriteItemRequest) {
+            if (errorToReturn.isPresent()) {
+                CompletableFuture<BatchWriteItemResponse> future = new CompletableFuture<>();
+                future.completeExceptionally(
+                        DynamoDbException.builder().cause((errorToReturn.get())).build());
+                return future;
+            }
+
+            List<WriteRequest> failedRequests =
+                    batchWriteItemRequest.requestItems().get(TABLE_NAME).stream()
+                            .filter(
+                                    writeRequest ->
+                                            failWhenMatched.test(
+                                                    writeRequest
+                                                            .putRequest()
+                                                            .item()
+                                                            .get(PARTITION_KEY)
+                                                            .s()))
+                            .collect(Collectors.toList());
+
+            BatchWriteItemResponse.Builder responseBuilder = BatchWriteItemResponse.builder();
+            if (!failedRequests.isEmpty()) {
+                responseBuilder =
+                        responseBuilder.unprocessedItems(
+                                ImmutableMap.of(TABLE_NAME, failedRequests));
+            }
+            return CompletableFuture.completedFuture(responseBuilder.build());
+        }
+    }
+
+    private static class FailingRecordsDynamoDbAsyncClient implements DynamoDbAsyncClient {
+        private final Predicate<String> failWhenMatched;
+
+        private FailingRecordsDynamoDbAsyncClient(Predicate<String> failWhenMatched) {
+            this.failWhenMatched = failWhenMatched;
+        }
+
+        @Override
+        public String serviceName() {
+            return "DynamoDB";
+        }
+
+        @Override
+        public void close() {}
+
+        @Override
+        public CompletableFuture<BatchWriteItemResponse> batchWriteItem(
+                BatchWriteItemRequest batchWriteItemRequest) {
+            List<WriteRequest> failedRequests =
+                    batchWriteItemRequest.requestItems().get(TABLE_NAME).stream()
+                            .filter(
+                                    writeRequest -> {
+                                        if (writeRequest.putRequest() != null) {
+                                            return failWhenMatched.test(
+                                                    writeRequest
+                                                            .putRequest()
+                                                            .item()
+                                                            .get(PARTITION_KEY)
+                                                            .s());
+                                        } else if (writeRequest.deleteRequest() != null) {
+                                            return failWhenMatched.test(
+                                                    writeRequest
+                                                            .deleteRequest()
+                                                            .key()
+                                                            .get(PARTITION_KEY)
+                                                            .s());
+                                        } else {
+                                            throw new RuntimeException(
+                                                    "Write request cannot be empty");
+                                        }
+                                    })
+                            .collect(Collectors.toList());
+
+            BatchWriteItemResponse.Builder responseBuilder = BatchWriteItemResponse.builder();
+            if (!failedRequests.isEmpty()) {
+                responseBuilder =
+                        responseBuilder.unprocessedItems(
+                                ImmutableMap.of(TABLE_NAME, failedRequests));
+            }
+            return CompletableFuture.completedFuture(responseBuilder.build());
+        }
+    }
+}
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java
index 49a8894..f41cac0 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.dynamodb.sink;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 
@@ -44,4 +46,19 @@ class DynamoDbWriteRequestTest {
                                 + "We need to check this, and update the DynamoDbWriterStateSerializer if required.")
                 .hasSize(11);
     }
+
+    @Test
+    public void testToString() {
+        DynamoDbWriteRequest dynamoDbWriteRequest =
+                DynamoDbWriteRequest.builder()
+                        .setItem(
+                                ImmutableMap.of(
+                                        "testKey", AttributeValue.builder().s("testValue").build()))
+                        .setType(DynamoDbWriteRequestType.PUT)
+                        .build();
+        assertThat(dynamoDbWriteRequest.toString())
+                .contains("testKey")
+                .contains("testValue")
+                .contains(DynamoDbWriteRequestType.PUT.toString());
+    }
 }
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java
index b88000e..3925119 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java
@@ -29,8 +29,8 @@ import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
-import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
 import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.getTestState;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 /**
  * Tests for serializing and deserialzing a collection of {@link DynamoDbWriteRequest} with {@link
@@ -56,7 +56,13 @@ public class DynamoDbWriterStateSerializerTest {
         BufferedRequestState<DynamoDbWriteRequest> actualState =
                 serializer.deserialize(1, serializer.serialize(expectedState));
 
-        assertThatBufferStatesAreEqual(actualState, expectedState);
+        assertThat(actualState).usingRecursiveComparison().isEqualTo(expectedState);
+    }
+
+    @Test
+    public void testVersion() {
+        DynamoDbWriterStateSerializer serializer = new DynamoDbWriterStateSerializer();
+        assertThat(serializer.getVersion()).isEqualTo(1);
     }
 
     private int getRequestSize(DynamoDbWriteRequest requestEntry) {
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java
index 8ec38f2..1f92bf1 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java
@@ -60,7 +60,7 @@ public class RowDataElementConverterTest {
                         .setItem(rowDataToAttributeValueConverter.convertRowData(rowData))
                         .build();
 
-        assertThat(actualWriteRequest).isEqualTo(expectedWriterequest);
+        assertThat(actualWriteRequest).usingRecursiveComparison().isEqualTo(expectedWriterequest);
     }
 
     @Test
@@ -73,7 +73,7 @@ public class RowDataElementConverterTest {
                         .setItem(rowDataToAttributeValueConverter.convertRowData(rowData))
                         .build();
 
-        assertThat(actualWriteRequest).isEqualTo(expectedWriterequest);
+        assertThat(actualWriteRequest).usingRecursiveComparison().isEqualTo(expectedWriterequest);
     }
 
     @Test
@@ -98,7 +98,7 @@ public class RowDataElementConverterTest {
                         .setItem(rowDataToAttributeValueConverter.convertRowData(rowData))
                         .build();
 
-        assertThat(actualWriteRequest).isEqualTo(expectedWriterequest);
+        assertThat(actualWriteRequest).usingRecursiveComparison().isEqualTo(expectedWriterequest);
     }
 
     @Test
@@ -123,7 +123,7 @@ public class RowDataElementConverterTest {
 
         assertThat(transformedConverter).extracting("rowDataToAttributeValueConverter").isNotNull();
 
-        assertThat(actualWriteRequest).isEqualTo(expectedWriterequest);
+        assertThat(actualWriteRequest).usingRecursiveComparison().isEqualTo(expectedWriterequest);
     }
 
     private RowData createElement(RowKind kind) {
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
index 367fe23..25b3fad 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
@@ -100,7 +100,9 @@ public class DynamoDbSerializationUtilTest {
                 DataInputStream dataInputStream = new DataInputStream(inputStream)) {
             DynamoDbWriteRequest deserializedWriteRequest =
                     DynamoDbSerializationUtil.deserializeWriteRequest(dataInputStream);
-            assertThat(deserializedWriteRequest).isEqualTo(dynamoDbWriteRequest);
+            assertThat(deserializedWriteRequest)
+                    .usingRecursiveComparison()
+                    .isEqualTo(dynamoDbWriteRequest);
         }
     }
 
@@ -126,7 +128,9 @@ public class DynamoDbSerializationUtilTest {
                 DataInputStream dataInputStream = new DataInputStream(inputStream)) {
             DynamoDbWriteRequest deserializedWriteRequest =
                     DynamoDbSerializationUtil.deserializeWriteRequest(dataInputStream);
-            assertThat(deserializedWriteRequest).isEqualTo(dynamoDbWriteRequest);
+            assertThat(deserializedWriteRequest)
+                    .usingRecursiveComparison()
+                    .isEqualTo(dynamoDbWriteRequest);
         }
     }
 
@@ -152,4 +156,22 @@ public class DynamoDbSerializationUtilTest {
             throw new RuntimeException(e);
         }
     }
+
+    @Test
+    public void testUnsupportedDynamoDbType() {
+        assertThatExceptionOfType(UnsupportedOperationException.class)
+                .isThrownBy(
+                        () -> DynamoDbType.fromByteValue((byte) (DynamoDbType.values().length + 1)))
+                .withMessageContaining("Unsupported byte value");
+    }
+
+    @Test
+    public void testUnsupportedDynamoDbWriteRequestType() {
+        assertThatExceptionOfType(UnsupportedOperationException.class)
+                .isThrownBy(
+                        () ->
+                                DynamoDbWriteRequestType.fromByteValue(
+                                        (byte) (DynamoDbWriteRequestType.values().length + 1)))
+                .withMessageContaining("Unsupported byte value");
+    }
 }