You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/11/24 16:50:13 UTC
[flink-connector-aws] 02/02: [FLINK-29895][Connectors/DynamoDB] Increase unit test coverage for DynamoDb sink
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
commit 89aba34abd6b80549949ca6a0e8e880f1fe20a03
Author: Hong Liang Teoh <li...@amazon.com>
AuthorDate: Sat Nov 19 23:41:24 2022 +0000
[FLINK-29895][Connectors/DynamoDB] Increase unit test coverage for DynamoDb sink
---
.../dynamodb/sink/DynamoDbSinkException.java | 5 -
.../connector/dynamodb/util/PrimaryKeyBuilder.java | 4 +-
.../dynamodb/sink/DynamoDbSinkITCase.java | 21 +-
.../connector/dynamodb/sink/DynamoDbSinkTest.java | 264 ++++++++++-----------
.../dynamodb/sink/DynamoDbSinkWriterTest.java | 10 -
.../util/DynamoDbSerializationUtilTest.java | 74 ++++--
.../flink/connector/dynamodb/util/Order.java | 18 ++
.../dynamodb/util/PrimaryKeyBuilderTest.java | 19 ++
8 files changed, 229 insertions(+), 186 deletions(-)
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkException.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkException.java
index 0782395..423ea34 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkException.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkException.java
@@ -38,11 +38,6 @@ public class DynamoDbSinkException extends RuntimeException {
@PublicEvolving
static class DynamoDbSinkFailFastException extends DynamoDbSinkException {
- public DynamoDbSinkFailFastException() {
- super(
- "Encountered an exception while persisting records, not retrying due to {failOnError} being set.");
- }
-
public DynamoDbSinkFailFastException(final Throwable cause) {
super(
"Encountered an exception while persisting records, not retrying due to {failOnError} being set.",
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilder.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilder.java
index 00cf0b7..61bb7f0 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilder.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilder.java
@@ -41,7 +41,7 @@ public class PrimaryKeyBuilder {
public PrimaryKeyBuilder(List<String> partitionKeys) {
if (CollectionUtil.isNullOrEmpty(partitionKeys)) {
throw new InvalidConfigurationException(
- "Can not construct partition key as overwriteByPartitionKeys configuration not provided");
+ "Unable to construct partition key as overwriteByPartitionKeys configuration not provided.");
}
this.partitionKeys = partitionKeys;
@@ -115,7 +115,7 @@ public class PrimaryKeyBuilder {
"DeleteItemRequest " + request + " does not contain request key.");
}
} else {
- throw new InvalidRequestException("Empty write request" + request);
+ throw new InvalidRequestException("Empty write request " + request);
}
}
}
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java
index 24d1aab..d73157d 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkITCase.java
@@ -19,7 +19,6 @@
package org.apache.flink.connector.dynamodb.sink;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.connector.aws.config.AWSConfigConstants;
import org.apache.flink.connector.dynamodb.testutils.DynamoDBHelpers;
import org.apache.flink.connector.dynamodb.testutils.DynamoDbContainer;
import org.apache.flink.connector.dynamodb.testutils.Item;
@@ -117,17 +116,17 @@ public class DynamoDbSinkITCase {
@Test
public void nonExistentTableNameShouldResultInFailureWhenFailOnErrorIsFalse() {
List<Map<String, AttributeValue>> items =
- Items.builder().item(Item.builder().attr("1", "1").build()).build();
+ Items.builder().item(Item.builder().attr("1", "1").build()).build();
Assertions.assertThatExceptionOfType(JobExecutionException.class)
- .isThrownBy(
- () ->
- new Scenario(env.fromCollection(items))
- .withTableName("NonExistentTableName")
- .withFailOnError(false)
- .runScenario())
- .havingCause()
- .havingCause()
- .withMessageContaining("Encountered non-recoverable exception");
+ .isThrownBy(
+ () ->
+ new Scenario(env.fromCollection(items))
+ .withTableName("NonExistentTableName")
+ .withFailOnError(false)
+ .runScenario())
+ .havingCause()
+ .havingCause()
+ .withMessageContaining("Encountered non-recoverable exception");
}
@Test
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
index ae41e8b..c2f88eb 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
@@ -1,8 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.flink.connector.dynamodb.sink;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -10,7 +28,6 @@ import java.util.Map;
import java.util.Properties;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
-import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.BASIC;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.ENV_VAR;
@@ -18,222 +35,198 @@ import static org.apache.flink.connector.aws.config.AWSConfigConstants.Credentia
import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.WEB_IDENTITY_TOKEN;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-/**
- * Test for {@link DynamoDbSink}.
- */
+/** Test for {@link DynamoDbSink}. */
public class DynamoDbSinkTest {
@Test
public void testSuccessfullyCreateWithMinimalConfiguration() {
DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setTableName("test_table")
- .build();
+ .setElementConverter(new TestDynamoDbElementConverter())
+ .setTableName("test_table")
+ .build();
}
@Test
public void testElementConverterRequired() {
assertThatExceptionOfType(NullPointerException.class)
- .isThrownBy(
- () ->
- DynamoDbSink.builder()
- .setTableName("test_table")
- .setFailOnError(true)
- .build())
- .withMessageContaining(
- "ElementConverter must be not null when initializing the AsyncSinkBase.");
+ .isThrownBy(
+ () ->
+ DynamoDbSink.builder()
+ .setTableName("test_table")
+ .setFailOnError(true)
+ .build())
+ .withMessageContaining(
+ "ElementConverter must be not null when initializing the AsyncSinkBase.");
}
@Test
public void testTableNameRequired() {
assertThatExceptionOfType(NullPointerException.class)
- .isThrownBy(
- () ->
- DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setFailOnError(true)
- .build())
- .withMessageContaining(
- "Destination table name must be set when initializing the DynamoDB Sink.");
+ .isThrownBy(
+ () ->
+ DynamoDbSink.<Map<String, AttributeValue>>builder()
+ .setElementConverter(new TestDynamoDbElementConverter())
+ .setFailOnError(true)
+ .build())
+ .withMessageContaining(
+ "Destination table name must be set when initializing the DynamoDB Sink.");
}
@Test
public void testTableNameNotEmpty() {
assertThatExceptionOfType(IllegalArgumentException.class)
- .isThrownBy(
- () ->
- DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setTableName("")
- .setFailOnError(true)
- .build())
- .withMessageContaining(
- "Destination table name must be set when initializing the DynamoDB Sink.");
+ .isThrownBy(
+ () ->
+ DynamoDbSink.<Map<String, AttributeValue>>builder()
+ .setElementConverter(new TestDynamoDbElementConverter())
+ .setTableName("")
+ .setFailOnError(true)
+ .build())
+ .withMessageContaining(
+ "Destination table name must be set when initializing the DynamoDB Sink.");
}
@Test
public void testInvalidMaxBatchSize() {
assertThatExceptionOfType(IllegalArgumentException.class)
- .isThrownBy(
- () ->
- DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setTableName("test_table")
- .setMaxBatchSize(50)
- .setFailOnError(true)
- .build())
- .withMessageContaining(
- "DynamoDB client supports only up to 25 elements in the batch.");
+ .isThrownBy(
+ () ->
+ DynamoDbSink.<Map<String, AttributeValue>>builder()
+ .setElementConverter(new TestDynamoDbElementConverter())
+ .setTableName("test_table")
+ .setMaxBatchSize(50)
+ .setFailOnError(true)
+ .build())
+ .withMessageContaining(
+ "DynamoDB client supports only up to 25 elements in the batch.");
}
@Test
public void testMaxBatchSizeInBytesThrowsNotImplemented() {
assertThatExceptionOfType(InvalidConfigurationException.class)
- .isThrownBy(
- () ->
- DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setTableName("test_table")
- .setMaxBatchSizeInBytes(100)
- .setFailOnError(true)
- .build())
- .withMessageContaining(
- "Max batch size in bytes is not supported by the DynamoDB sink implementation.");
+ .isThrownBy(
+ () ->
+ DynamoDbSink.<Map<String, AttributeValue>>builder()
+ .setElementConverter(new TestDynamoDbElementConverter())
+ .setTableName("test_table")
+ .setMaxBatchSizeInBytes(100)
+ .setFailOnError(true)
+ .build())
+ .withMessageContaining(
+ "Max batch size in bytes is not supported by the DynamoDB sink implementation.");
}
@Test
public void testMaxRecordSizeInBytesThrowsNotImplemented() {
assertThatExceptionOfType(InvalidConfigurationException.class)
- .isThrownBy(
- () ->
- DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setTableName("test_table")
- .setMaxRecordSizeInBytes(100)
- .setFailOnError(true)
- .build())
- .withMessageContaining(
- "Max record size in bytes is not supported by the DynamoDB sink implementation.");
+ .isThrownBy(
+ () ->
+ DynamoDbSink.<Map<String, AttributeValue>>builder()
+ .setElementConverter(new TestDynamoDbElementConverter())
+ .setTableName("test_table")
+ .setMaxRecordSizeInBytes(100)
+ .setFailOnError(true)
+ .build())
+ .withMessageContaining(
+ "Max record size in bytes is not supported by the DynamoDB sink implementation.");
}
@Test
public void testInvalidAwsRegionThrowsException() {
Properties properties = getDefaultProperties();
properties.setProperty(AWS_REGION, "some-invalid-region");
- DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setDynamoDbProperties(properties)
- .setTableName("test_table")
- .build();
+ DynamoDbSink<Map<String, AttributeValue>> sink =
+ DynamoDbSink.<Map<String, AttributeValue>>builder()
+ .setElementConverter(new TestDynamoDbElementConverter())
+ .setDynamoDbProperties(properties)
+ .setTableName("test_table")
+ .build();
assertThatExceptionOfType(IllegalArgumentException.class)
- .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
- .withMessageContaining("Invalid AWS region set in config.");
- }
-
- @Test
- public void testAwsRegionNotSpecifiedThrowsException() {
- Properties properties = getDefaultProperties();
- properties.remove(AWS_REGION);
- DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setDynamoDbProperties(properties)
- .setTableName("test_table")
- .build();
-
- assertThatExceptionOfType(NullPointerException.class)
- .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
- .withMessageContaining("region must not be null.");
- }
-
- @Test
- public void testInvalidEndpointNoUriThrowsException() {
- Properties properties = getDefaultProperties();
- properties.put(AWS_ENDPOINT, "invalid-endpoint-no-uri");
- DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setDynamoDbProperties(properties)
- .setTableName("test_table")
- .build();
-
- assertThatExceptionOfType(NullPointerException.class)
- .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
- .withMessageContaining("The URI scheme of endpointOverride must not be null.");
+ .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
+ .withMessageContaining("Invalid AWS region set in config.");
}
@Test
public void testIncompleteEnvironmentCredentialsProviderThrowsException() {
Properties properties = getDefaultProperties();
properties.put(AWS_CREDENTIALS_PROVIDER, ENV_VAR.toString());
- DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setDynamoDbProperties(properties)
- .setTableName("test_table")
- .build();
+ DynamoDbSink<Map<String, AttributeValue>> sink =
+ DynamoDbSink.<Map<String, AttributeValue>>builder()
+ .setElementConverter(new TestDynamoDbElementConverter())
+ .setDynamoDbProperties(properties)
+ .setTableName("test_table")
+ .build();
assertThatExceptionOfType(SdkClientException.class)
- .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
- .withMessageContaining("Unable to load credentials from system settings.");
+ .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
+ .withMessageContaining("Unable to load credentials from system settings.");
}
@Test
public void testIncompleteSystemPropertyCredentialsProviderThrowsException() {
Properties properties = getDefaultProperties();
properties.put(AWS_CREDENTIALS_PROVIDER, SYS_PROP.toString());
- DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setDynamoDbProperties(properties)
- .setTableName("test_table")
- .build();
+ DynamoDbSink<Map<String, AttributeValue>> sink =
+ DynamoDbSink.<Map<String, AttributeValue>>builder()
+ .setElementConverter(new TestDynamoDbElementConverter())
+ .setDynamoDbProperties(properties)
+ .setTableName("test_table")
+ .build();
assertThatExceptionOfType(SdkClientException.class)
- .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
- .withMessageContaining("Unable to load credentials from system settings.");
+ .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
+ .withMessageContaining("Unable to load credentials from system settings.");
}
@Test
public void testIncompleteBasicCredentialsProviderThrowsException() {
Properties properties = getDefaultProperties();
properties.put(AWS_CREDENTIALS_PROVIDER, BASIC.toString());
- DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setDynamoDbProperties(properties)
- .setTableName("test_table")
- .build();
+ DynamoDbSink<Map<String, AttributeValue>> sink =
+ DynamoDbSink.<Map<String, AttributeValue>>builder()
+ .setElementConverter(new TestDynamoDbElementConverter())
+ .setDynamoDbProperties(properties)
+ .setTableName("test_table")
+ .build();
assertThatExceptionOfType(IllegalArgumentException.class)
- .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
- .withMessageContaining("Please set values for AWS Access Key ID ('aws.credentials.provider.basic.accesskeyid') and Secret Key ('aws.credentials.provider.basic.secretkey') when using the BASIC AWS credential provider type.");
+ .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
+ .withMessageContaining(
+ "Please set values for AWS Access Key ID ('aws.credentials.provider.basic.accesskeyid') and Secret Key ('aws.credentials.provider.basic.secretkey') when using the BASIC AWS credential provider type.");
}
@Test
public void testIncompleteWebIdentityTokenCredentialsProviderThrowsException() {
Properties properties = getDefaultProperties();
properties.put(AWS_CREDENTIALS_PROVIDER, WEB_IDENTITY_TOKEN.toString());
- DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setDynamoDbProperties(properties)
- .setTableName("test_table")
- .build();
+ DynamoDbSink<Map<String, AttributeValue>> sink =
+ DynamoDbSink.<Map<String, AttributeValue>>builder()
+ .setElementConverter(new TestDynamoDbElementConverter())
+ .setDynamoDbProperties(properties)
+ .setTableName("test_table")
+ .build();
assertThatExceptionOfType(IllegalStateException.class)
- .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
- .withMessageContaining("Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set.");
+ .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
+ .withMessageContaining(
+ "Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set.");
}
@Test
public void testInvalidCredentialsProviderThrowsException() {
Properties properties = getDefaultProperties();
properties.put(AWS_CREDENTIALS_PROVIDER, "INVALID_CREDENTIALS_PROVIDER");
- DynamoDbSink<Map<String, AttributeValue>> sink = DynamoDbSink.<Map<String, AttributeValue>>builder()
- .setElementConverter(new TestDynamoDbElementConverter())
- .setDynamoDbProperties(properties)
- .setTableName("test_table")
- .build();
+ DynamoDbSink<Map<String, AttributeValue>> sink =
+ DynamoDbSink.<Map<String, AttributeValue>>builder()
+ .setElementConverter(new TestDynamoDbElementConverter())
+ .setDynamoDbProperties(properties)
+ .setTableName("test_table")
+ .build();
assertThatExceptionOfType(IllegalArgumentException.class)
- .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
- .withMessageContaining("Invalid AWS Credential Provider Type set in config.");
+ .isThrownBy(() -> sink.createWriter(new TestSinkInitContext()))
+ .withMessageContaining("Invalid AWS Credential Provider Type set in config.");
}
private Properties getDefaultProperties() {
@@ -241,5 +234,4 @@ public class DynamoDbSinkTest {
properties.setProperty(AWS_REGION, "us-east-1");
return properties;
}
-
}
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
deleted file mode 100644
index 5fd58f2..0000000
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.apache.flink.connector.dynamodb.sink;
-
-import org.junit.Test;
-
-/**
- * Test for {@link DynamoDbSinkWriter}.
- */
-public class DynamoDbSinkWriterTest {
-
-}
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
index 0c64026..367fe23 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
@@ -37,6 +37,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
/** Tests for {@link DynamoDbSerializationUtil}. */
public class DynamoDbSerializationUtilTest {
@@ -87,17 +88,20 @@ public class DynamoDbSerializationUtilTest {
.setItem(item)
.setType(DynamoDbWriteRequestType.PUT)
.build();
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(outputStream);
- DynamoDbSerializationUtil.serializeWriteRequest(dynamoDbWriteRequest, out);
- outputStream.close();
- out.close();
- InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
- DataInputStream dataInputStream = new DataInputStream(inputStream);
- DynamoDbWriteRequest deserializedWriteRequest =
- DynamoDbSerializationUtil.deserializeWriteRequest(dataInputStream);
- inputStream.close();
- assertThat(deserializedWriteRequest).isEqualTo(dynamoDbWriteRequest);
+
+ byte[] serialized;
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(outputStream)) {
+ DynamoDbSerializationUtil.serializeWriteRequest(dynamoDbWriteRequest, out);
+ serialized = outputStream.toByteArray();
+ }
+
+ try (InputStream inputStream = new ByteArrayInputStream(serialized);
+ DataInputStream dataInputStream = new DataInputStream(inputStream)) {
+ DynamoDbWriteRequest deserializedWriteRequest =
+ DynamoDbSerializationUtil.deserializeWriteRequest(dataInputStream);
+ assertThat(deserializedWriteRequest).isEqualTo(dynamoDbWriteRequest);
+ }
}
@Test
@@ -110,16 +114,42 @@ public class DynamoDbSerializationUtilTest {
.setItem(key)
.setType(DynamoDbWriteRequestType.DELETE)
.build();
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(outputStream);
- DynamoDbSerializationUtil.serializeWriteRequest(dynamoDbWriteRequest, out);
- outputStream.close();
- out.close();
- InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
- DataInputStream dataInputStream = new DataInputStream(inputStream);
- DynamoDbWriteRequest deserializedWriteRequest =
- DynamoDbSerializationUtil.deserializeWriteRequest(dataInputStream);
- inputStream.close();
- assertThat(deserializedWriteRequest).isEqualTo(dynamoDbWriteRequest);
+
+ byte[] serialized;
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(outputStream)) {
+ DynamoDbSerializationUtil.serializeWriteRequest(dynamoDbWriteRequest, out);
+ serialized = outputStream.toByteArray();
+ }
+
+ try (InputStream inputStream = new ByteArrayInputStream(serialized);
+ DataInputStream dataInputStream = new DataInputStream(inputStream)) {
+ DynamoDbWriteRequest deserializedWriteRequest =
+ DynamoDbSerializationUtil.deserializeWriteRequest(dataInputStream);
+ assertThat(deserializedWriteRequest).isEqualTo(dynamoDbWriteRequest);
+ }
+ }
+
+ @Test
+ public void testSerializeEmptyAttributeValueThrowsException() {
+ final Map<String, AttributeValue> item = new HashMap<>();
+ item.put("empty", AttributeValue.builder().build());
+ DynamoDbWriteRequest dynamoDbWriteRequest =
+ DynamoDbWriteRequest.builder()
+ .setItem(item)
+ .setType(DynamoDbWriteRequestType.PUT)
+ .build();
+
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(outputStream)) {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(
+ () ->
+ DynamoDbSerializationUtil.serializeWriteRequest(
+ dynamoDbWriteRequest, out))
+ .withMessageContaining("Attribute value must not be empty: AttributeValue()");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java
index 08a50ec..108d70b 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.flink.connector.dynamodb.util;
import org.apache.flink.connector.dynamodb.sink.DynamoDbBeanElementConverter;
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java
index 09e22b7..663b246 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/PrimaryKeyBuilderTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.dynamodb.util;
+import org.apache.flink.connector.dynamodb.sink.InvalidConfigurationException;
import org.apache.flink.connector.dynamodb.sink.InvalidRequestException;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
@@ -107,6 +108,24 @@ public class PrimaryKeyBuilderTest {
.isEqualTo(keyBuilder.build(createDeleteItemRequest(createItemValues())));
}
+ @Test
+ public void testExceptOnEmptyPartitionKeys() {
+ assertThatExceptionOfType(InvalidConfigurationException.class)
+ .isThrownBy(() -> new PrimaryKeyBuilder(ImmutableList.of()))
+ .withMessageContaining(
+ "Unable to construct partition key as overwriteByPartitionKeys configuration not provided.");
+ }
+
+ @Test
+ public void testExceptOnEmptyWriteRequest() {
+ assertThatExceptionOfType(InvalidRequestException.class)
+ .isThrownBy(
+ () ->
+ new PrimaryKeyBuilder(ImmutableList.of(PARTITION_KEY_NAME))
+ .build(WriteRequest.builder().build()))
+ .withMessageContaining("Empty write request");
+ }
+
@Test
public void testExceptOnEmptyPutRequest() {
assertThatExceptionOfType(InvalidRequestException.class)