You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/11/24 16:50:13 UTC

[flink-connector-aws] 02/02: [FLINK-29895][Connectors/DynamoDB] Increase unit test coverage for DynamoDb sink

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git

commit 89aba34abd6b80549949ca6a0e8e880f1fe20a03
Author: Hong Liang Teoh <li...@amazon.com>
AuthorDate: Sat Nov 19 23:41:24 2022 +0000

    [FLINK-29895][Connectors/DynamoDB] Increase unit test coverage for DynamoDb sink
---
 .../dynamodb/sink/DynamoDbSinkException.java       |   5 -
 .../connector/dynamodb/util/PrimaryKeyBuilder.java |   4 +-
 .../dynamodb/sink/DynamoDbSinkITCase.java          |  21 +-
 .../connector/dynamodb/sink/DynamoDbSinkTest.java  | 264 ++++++++++-----------
 .../dynamodb/sink/DynamoDbSinkWriterTest.java      |  10 -
 .../util/DynamoDbSerializationUtilTest.java        |  74 ++++--
 .../flink/connector/dynamodb/util/Order.java       |  18 ++
 .../dynamodb/util/PrimaryKeyBuilderTest.java       |  19 ++
 8 files changed, 229 insertions(+), 186 deletions(-)

diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkException.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkException.java
index 0782395..423ea34 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkException.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkException.java
@@ -38,11 +38,6 @@ public class DynamoDbSinkException extends RuntimeException {
     @PublicEvolving
     static class DynamoDbSinkFailFastException extends DynamoDbSinkException {
 
-        public DynamoDbSinkFailFastException() {
-            super(
-                    "Encountered an exception while persisting records, not retrying due to {failOnError} being set.");
-        }
-
         public DynamoDbSinkFailFastException(final Throwable cause) {
             super(
                     "Encountered an exception while persisting records, not retrying due to {failOnError} being set.",
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilder.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilder.java
index 00cf0b7..61bb7f0 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilder.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilder.java
@@ -41,7 +41,7 @@ public class PrimaryKeyBuilder {
     public PrimaryKeyBuilder(List<String> partitionKeys) {
         if (CollectionUtil.isNullOrEmpty(partitionKeys)) {
             throw new InvalidConfigurationException(
-                    "Can not construct partition key as overwriteByPartitionKeys configuration not provided");
+                    "Unable to construct partition key as overwriteByPartitionKeys configuration not provided.");
         }
 
         this.partitionKeys = partitionKeys;
@@ -115,7 +115,7 @@ public class PrimaryKeyBuilder {
                         "DeleteItemRequest " + request + " does not contain request key.");
             }
         } else {
-            throw new InvalidRequestException("Empty write request" + request);
+            throw new InvalidRequestException("Empty write request " + request);
         }
     }
 }
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java
index 24d1aab..d73157d 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.connector.dynamodb.sink;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.connector.aws.config.AWSConfigConstants;
 import org.apache.flink.connector.dynamodb.testutils.DynamoDBHelpers;
 import org.apache.flink.connector.dynamodb.testutils.DynamoDbContainer;
 import org.apache.flink.connector.dynamodb.testutils.Item;
@@ -117,17 +116,17 @@ public class DynamoDbSinkITCase {
     @Test
     public void nonExistentTableNameShouldResultInFailureWhenFailOnErrorIsFalse() {
         List<Map<String, AttributeValue>> items =
-            Items.builder().item(Item.builder().attr("1", "1").build()).build();
+                Items.builder().item(Item.builder().attr("1", "1").build()).build();
         Assertions.assertThatExceptionOfType(JobExecutionException.class)
-            .isThrownBy(
-                () ->
-                    new Scenario(env.fromCollection(items))
-                        .withTableName("NonExistentTableName")
-                        .withFailOnError(false)
-                        .runScenario())
-            .havingCause()
-            .havingCause()
-            .withMessageContaining("Encountered non-recoverable exception");
+                .isThrownBy(
+                        () ->
+                                new Scenario(env.fromCollection(items))
+                                        .withTableName("NonExistentTableName")
+                                        .withFailOnError(false)
+                                        .runScenario())
+                .havingCause()
+                .havingCause()
+                .withMessageContaining("Encountered non-recoverable exception");
     }
 
     @Test
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
index ae41e8b..c2f88eb 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
@@ -1,8 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.connector.dynamodb.sink;
 
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.core.exception.SdkClientException;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
 
@@ -10,7 +28,6 @@ import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
-import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.BASIC;
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.ENV_VAR;
@@ -18,222 +35,198 @@ import static org.apache.flink.connector.aws.config.AWSConfigConstants.Credentia
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.WEB_IDENTITY_TOKEN;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 
-/**
- * Test for {@link DynamoDbSink}.
- */
+/** Test for {@link DynamoDbSink}. */
 public class DynamoDbSinkTest {
 
     @Test
     public void testSuccessfullyCreateWithMinimalConfiguration() {
         DynamoDbSink.<Map<String, AttributeValue>>builder()
-            .setElementConverter(new TestDynamoDbElementConverter())
-            .setTableName("test_table")
-            .build();
+                .setElementConverter(new TestDynamoDbElementConverter())
+                .setTableName("test_table")
+                .build();
     }
 
     @Test
     public void testElementConverterRequired() {
         assertThatExceptionOfType(NullPointerException.class)
-            .isThrownBy(
-                () ->
-                    DynamoDbSink.builder()
-                        .setTableName("test_table")
-                        .setFailOnError(true)
-                        .build())
-            .withMessageContaining(
-                "ElementConverter must be not null when initializing the AsyncSinkBase.");
+                .isThrownBy(
+                        () ->
+                                DynamoDbSink.builder()
+                                        .setTableName("test_table")
+                                        .setFailOnError(true)
+                                        .build())
+                .withMessageContaining(
+                        "ElementConverter must be not null when initializing the AsyncSinkBase.");
     }
 
     @Test
     public void testTableNameRequired() {
         assertThatExceptionOfType(NullPointerException.class)
-            .isThrownBy(
-                () ->
-                    DynamoDbSink.<Map<String, AttributeValue>>builder()
-                        .setElementConverter(new TestDynamoDbElementConverter())
-                        .setFailOnError(true)
-                        .build())
-            .withMessageContaining(
-                "Destination table name must be set when initializing the DynamoDB Sink.");
+                .isThrownBy(
+                        () ->
+                                DynamoDbSink.<Map<String, AttributeValue>>builder()
+                                        .setElementConverter(new TestDynamoDbElementConverter())
+                                        .setFailOnError(true)
+                                        .build())
+                .withMessageContaining(
+                        "Destination table name must be set when initializing the DynamoDB Sink.");
     }
 
     @Test
     public void testTableNameNotEmpty() {
         assertThatExceptionOfType(IllegalArgumentException.class)
-            .isThrownBy(
-                () ->
-                    DynamoDbSink.<Map<String, AttributeValue>>builder()
-                        .setElementConverter(new TestDynamoDbElementConverter())
-                        .setTableName("")
-                        .setFailOnError(true)
-                        .build())
-            .withMessageContaining(
-                "Destination table name must be set when initializing the DynamoDB Sink.");
+                .isThrownBy(
+                        () ->
+                                DynamoDbSink.<Map<String, AttributeValue>>builder()
+                                        .setElementConverter(new TestDynamoDbElementConverter())
+                                        .setTableName("")
+                                        .setFailOnError(true)
+                                        .build())
+                .withMessageContaining(
+                        "Destination table name must be set when initializing the DynamoDB Sink.");
     }
 
     @Test
     public void testInvalidMaxBatchSize() {
         assertThatExceptionOfType(IllegalArgumentException.class)
-            .isThrownBy(
-                () ->
-                    DynamoDbSink.<Map<String, AttributeValue>>builder()
-                        .setElementConverter(new TestDynamoDbElementConverter())
-                        .setTableName("test_table")
-                        .setMaxBatchSize(50)
-                        .setFailOnError(true)
-                        .build())
-            .withMessageContaining(
-                "DynamoDB client supports only up to 25 elements in the batch.");
+                .isThrownBy(
+                        () ->
+                                DynamoDbSink.<Map<String, AttributeValue>>builder()
+                                        .setElementConverter(new TestDynamoDbElementConverter())
+                                        .setTableName("test_table")
+                                        .setMaxBatchSize(50)
+                                        .setFailOnError(true)
+                                        .build())
+                .withMessageContaining(
+                        "DynamoDB client supports only up to 25 elements in the batch.");
     }
 
     @Test
     public void testMaxBatchSizeInBytesThrowsNotImplemented() {
         assertThatExceptionOfType(InvalidConfigurationException.class)
-            .isThrownBy(
-                () ->
-                    DynamoDbSink.<Map<String, AttributeValue>>builder()
-                        .setElementConverter(new TestDynamoDbElementConverter())
-                        .setTableName("test_table")
-                        .setMaxBatchSizeInBytes(100)
-                        .setFailOnError(true)
-                        .build())
-            .withMessageContaining(
-                "Max batch size in bytes is not supported by the DynamoDB sink implementation.");
+                .isThrownBy(
+                        () ->
+                                DynamoDbSink.<Map<String, AttributeValue>>builder()
+                                        .setElementConverter(new TestDynamoDbElementConverter())
+                                        .setTableName("test_table")
+                                        .setMaxBatchSizeInBytes(100)
+                                        .setFailOnError(true)
+                                        .build())
+                .withMessageContaining(
+                        "Max batch size in bytes is not supported by the DynamoDB sink implementation.");
     }
 
     @Test
     public void testMaxRecordSizeInBytesThrowsNotImplemented() {
         assertThatExceptionOfType(InvalidConfigurationException.class)
-            .isThrownBy(
-                () ->
-                    DynamoDbSink.<Map<String, AttributeValue>>builder()
-                        .setElementConverter(new TestDynamoDbElementConverter())
-                        .setTableName("test_table")
-                        .setMaxRecordSizeInBytes(100)
-                        .setFailOnError(true)
-                        .build())
-            .withMessageContaining(
-                "Max record size in bytes is not supported by the DynamoDB sink implementation.");
+                .isThrownBy(
+                        () ->
+                                DynamoDbSink.<Map<String, AttributeValue>>builder()
+                                        .setElementConverter(new TestDynamoDbElementConverter())
+                                        .setTableName("test_table")
+                                        .setMaxRecordSizeInBytes(100)
+                                        .setFailOnError(true)
+                                        .build())
+                .withMessageContaining(
+                        "Max record size in bytes is not supported by the DynamoDB sink implementation.");
     }
 
     @Test
     public void testInvalidAwsRegionThrowsException() {
         Properties properties = getDefaultProperties();
         properties.setProperty(AWS_REGION, "some-invalid-region");
-        DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
-            .setElementConverter(new TestDynamoDbElementConverter())
-            .setDynamoDbProperties(properties)
-            .setTableName("test_table")
-            .build();
+        DynamoDbSink<Map<String, AttributeValue>> sink =
+                DynamoDbSink.<Map<String, AttributeValue>>builder()
+                        .setElementConverter(new TestDynamoDbElementConverter())
+                        .setDynamoDbProperties(properties)
+                        .setTableName("test_table")
+                        .build();
 
         assertThatExceptionOfType(IllegalArgumentException.class)
-            .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
-            .withMessageContaining("Invalid AWS region set in config.");
-    }
-
-    @Test
-    public void testAwsRegionNotSpecifiedThrowsException() {
-        Properties properties = getDefaultProperties();
-        properties.remove(AWS_REGION);
-        DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
-            .setElementConverter(new TestDynamoDbElementConverter())
-            .setDynamoDbProperties(properties)
-            .setTableName("test_table")
-            .build();
-
-        assertThatExceptionOfType(NullPointerException.class)
-            .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
-            .withMessageContaining("region must not be null.");
-    }
-
-    @Test
-    public void testInvalidEndpointNoUriThrowsException() {
-        Properties properties = getDefaultProperties();
-        properties.put(AWS_ENDPOINT, "invalid-endpoint-no-uri");
-        DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
-            .setElementConverter(new TestDynamoDbElementConverter())
-            .setDynamoDbProperties(properties)
-            .setTableName("test_table")
-            .build();
-
-        assertThatExceptionOfType(NullPointerException.class)
-            .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
-            .withMessageContaining("The URI scheme of endpointOverride must not be null.");
+                .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
+                .withMessageContaining("Invalid AWS region set in config.");
     }
 
     @Test
     public void testIncompleteEnvironmentCredentialsProviderThrowsException() {
         Properties properties = getDefaultProperties();
         properties.put(AWS_CREDENTIALS_PROVIDER, ENV_VAR.toString());
-        DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
-            .setElementConverter(new TestDynamoDbElementConverter())
-            .setDynamoDbProperties(properties)
-            .setTableName("test_table")
-            .build();
+        DynamoDbSink<Map<String, AttributeValue>> sink =
+                DynamoDbSink.<Map<String, AttributeValue>>builder()
+                        .setElementConverter(new TestDynamoDbElementConverter())
+                        .setDynamoDbProperties(properties)
+                        .setTableName("test_table")
+                        .build();
 
         assertThatExceptionOfType(SdkClientException.class)
-            .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
-            .withMessageContaining("Unable to load credentials from system settings.");
+                .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
+                .withMessageContaining("Unable to load credentials from system settings.");
     }
 
     @Test
     public void testIncompleteSystemPropertyCredentialsProviderThrowsException() {
         Properties properties = getDefaultProperties();
         properties.put(AWS_CREDENTIALS_PROVIDER, SYS_PROP.toString());
-        DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
-            .setElementConverter(new TestDynamoDbElementConverter())
-            .setDynamoDbProperties(properties)
-            .setTableName("test_table")
-            .build();
+        DynamoDbSink<Map<String, AttributeValue>> sink =
+                DynamoDbSink.<Map<String, AttributeValue>>builder()
+                        .setElementConverter(new TestDynamoDbElementConverter())
+                        .setDynamoDbProperties(properties)
+                        .setTableName("test_table")
+                        .build();
 
         assertThatExceptionOfType(SdkClientException.class)
-            .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
-            .withMessageContaining("Unable to load credentials from system settings.");
+                .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
+                .withMessageContaining("Unable to load credentials from system settings.");
     }
 
     @Test
     public void testIncompleteBasicCredentialsProviderThrowsException() {
         Properties properties = getDefaultProperties();
         properties.put(AWS_CREDENTIALS_PROVIDER, BASIC.toString());
-        DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
-            .setElementConverter(new TestDynamoDbElementConverter())
-            .setDynamoDbProperties(properties)
-            .setTableName("test_table")
-            .build();
+        DynamoDbSink<Map<String, AttributeValue>> sink =
+                DynamoDbSink.<Map<String, AttributeValue>>builder()
+                        .setElementConverter(new TestDynamoDbElementConverter())
+                        .setDynamoDbProperties(properties)
+                        .setTableName("test_table")
+                        .build();
 
         assertThatExceptionOfType(IllegalArgumentException.class)
-            .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
-            .withMessageContaining("Please set values for AWS Access Key ID ('aws.credentials.provider.basic.accesskeyid') and Secret Key ('aws.credentials.provider.basic.secretkey') when using the BASIC AWS credential provider type.");
+                .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
+                .withMessageContaining(
+                        "Please set values for AWS Access Key ID ('aws.credentials.provider.basic.accesskeyid') and Secret Key ('aws.credentials.provider.basic.secretkey') when using the BASIC AWS credential provider type.");
     }
 
     @Test
     public void testIncompleteWebIdentityTokenCredentialsProviderThrowsException() {
         Properties properties = getDefaultProperties();
         properties.put(AWS_CREDENTIALS_PROVIDER, WEB_IDENTITY_TOKEN.toString());
-        DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
-            .setElementConverter(new TestDynamoDbElementConverter())
-            .setDynamoDbProperties(properties)
-            .setTableName("test_table")
-            .build();
+        DynamoDbSink<Map<String, AttributeValue>> sink =
+                DynamoDbSink.<Map<String, AttributeValue>>builder()
+                        .setElementConverter(new TestDynamoDbElementConverter())
+                        .setDynamoDbProperties(properties)
+                        .setTableName("test_table")
+                        .build();
 
         assertThatExceptionOfType(IllegalStateException.class)
-            .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
-            .withMessageContaining("Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set.");
+                .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
+                .withMessageContaining(
+                        "Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set.");
     }
 
     @Test
     public void testInvalidCredentialsProviderThrowsException() {
         Properties properties = getDefaultProperties();
         properties.put(AWS_CREDENTIALS_PROVIDER, "INVALID_CREDENTIALS_PROVIDER");
-        DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
-            .setElementConverter(new TestDynamoDbElementConverter())
-            .setDynamoDbProperties(properties)
-            .setTableName("test_table")
-            .build();
+        DynamoDbSink<Map<String, AttributeValue>> sink =
+                DynamoDbSink.<Map<String, AttributeValue>>builder()
+                        .setElementConverter(new TestDynamoDbElementConverter())
+                        .setDynamoDbProperties(properties)
+                        .setTableName("test_table")
+                        .build();
 
         assertThatExceptionOfType(IllegalArgumentException.class)
-            .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
-            .withMessageContaining("Invalid AWS Credential Provider Type set in config.");
+                .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
+                .withMessageContaining("Invalid AWS Credential Provider Type set in config.");
     }
 
     private Properties getDefaultProperties() {
@@ -241,5 +234,4 @@ public class DynamoDbSinkTest {
         properties.setProperty(AWS_REGION, "us-east-1");
         return properties;
     }
-
 }
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
deleted file mode 100644
index 5fd58f2..0000000
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.flink.connector.dynamodb.sink;
-
-import org.junit.Test;
-
-/**
- * Test for {@link DynamoDbSinkWriter}.
- */
-public class DynamoDbSinkWriterTest {
-
-}
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
index 0c64026..367fe23 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
@@ -37,6 +37,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 
 /** Tests for {@link DynamoDbSerializationUtil}. */
 public class DynamoDbSerializationUtilTest {
@@ -87,17 +88,20 @@ public class DynamoDbSerializationUtilTest {
                         .setItem(item)
                         .setType(DynamoDbWriteRequestType.PUT)
                         .build();
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(outputStream);
-        DynamoDbSerializationUtil.serializeWriteRequest(dynamoDbWriteRequest, out);
-        outputStream.close();
-        out.close();
-        InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
-        DataInputStream dataInputStream = new DataInputStream(inputStream);
-        DynamoDbWriteRequest deserializedWriteRequest =
-                DynamoDbSerializationUtil.deserializeWriteRequest(dataInputStream);
-        inputStream.close();
-        assertThat(deserializedWriteRequest).isEqualTo(dynamoDbWriteRequest);
+
+        byte[] serialized;
+        try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(outputStream)) {
+            DynamoDbSerializationUtil.serializeWriteRequest(dynamoDbWriteRequest, out);
+            serialized = outputStream.toByteArray();
+        }
+
+        try (InputStream inputStream = new ByteArrayInputStream(serialized);
+                DataInputStream dataInputStream = new DataInputStream(inputStream)) {
+            DynamoDbWriteRequest deserializedWriteRequest =
+                    DynamoDbSerializationUtil.deserializeWriteRequest(dataInputStream);
+            assertThat(deserializedWriteRequest).isEqualTo(dynamoDbWriteRequest);
+        }
     }
 
     @Test
@@ -110,16 +114,42 @@ public class DynamoDbSerializationUtilTest {
                         .setItem(key)
                         .setType(DynamoDbWriteRequestType.DELETE)
                         .build();
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(outputStream);
-        DynamoDbSerializationUtil.serializeWriteRequest(dynamoDbWriteRequest, out);
-        outputStream.close();
-        out.close();
-        InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
-        DataInputStream dataInputStream = new DataInputStream(inputStream);
-        DynamoDbWriteRequest deserializedWriteRequest =
-                DynamoDbSerializationUtil.deserializeWriteRequest(dataInputStream);
-        inputStream.close();
-        assertThat(deserializedWriteRequest).isEqualTo(dynamoDbWriteRequest);
+
+        byte[] serialized;
+        try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(outputStream)) {
+            DynamoDbSerializationUtil.serializeWriteRequest(dynamoDbWriteRequest, out);
+            serialized = outputStream.toByteArray();
+        }
+
+        try (InputStream inputStream = new ByteArrayInputStream(serialized);
+                DataInputStream dataInputStream = new DataInputStream(inputStream)) {
+            DynamoDbWriteRequest deserializedWriteRequest =
+                    DynamoDbSerializationUtil.deserializeWriteRequest(dataInputStream);
+            assertThat(deserializedWriteRequest).isEqualTo(dynamoDbWriteRequest);
+        }
+    }
+
+    @Test
+    public void testSerializeEmptyAttributeValueThrowsException() {
+        final Map<String, AttributeValue> item = new HashMap<>();
+        item.put("empty", AttributeValue.builder().build());
+        DynamoDbWriteRequest dynamoDbWriteRequest =
+                DynamoDbWriteRequest.builder()
+                        .setItem(item)
+                        .setType(DynamoDbWriteRequestType.PUT)
+                        .build();
+
+        try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(outputStream)) {
+            assertThatExceptionOfType(IllegalArgumentException.class)
+                    .isThrownBy(
+                            () ->
+                                    DynamoDbSerializationUtil.serializeWriteRequest(
+                                            dynamoDbWriteRequest, out))
+                    .withMessageContaining("Attribute value must not be empty: AttributeValue()");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
 }
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java
index 08a50ec..108d70b 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.connector.dynamodb.util;
 
 import org.apache.flink.connector.dynamodb.sink.DynamoDbBeanElementConverter;
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java
index 09e22b7..663b246 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.dynamodb.util;
 
+import org.apache.flink.connector.dynamodb.sink.InvalidConfigurationException;
 import org.apache.flink.connector.dynamodb.sink.InvalidRequestException;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
@@ -107,6 +108,24 @@ public class PrimaryKeyBuilderTest {
                 .isEqualTo(keyBuilder.build(createDeleteItemRequest(createItemValues())));
     }
 
+    @Test
+    public void testExceptOnEmptyPartitionKeys() {
+        assertThatExceptionOfType(InvalidConfigurationException.class)
+                .isThrownBy(() -> new PrimaryKeyBuilder(ImmutableList.of()))
+                .withMessageContaining(
+                        "Unable to construct partition key as overwriteByPartitionKeys configuration not provided.");
+    }
+
+    @Test
+    public void testExceptOnEmptyWriteRequest() {
+        assertThatExceptionOfType(InvalidRequestException.class)
+                .isThrownBy(
+                        () ->
+                                new PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME))
+                                        .build(WriteRequest.builder().build()))
+                .withMessageContaining("Empty write request");
+    }
+
     @Test
     public void testExceptOnEmptyPutRequest() {
         assertThatExceptionOfType(InvalidRequestException.class)