You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/11/25 14:16:33 UTC
[flink-connector-aws] branch main updated: [FLINK-29895][Connectors/DynamoDB] Further increase unit test coverage for DynamoDbSink
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 9e09d57 [FLINK-29895][Connectors/DynamoDB] Further increase unit test coverage for DynamoDbSink
9e09d57 is described below
commit 9e09d57210c4b44a9f88cc25781e96c452b2ec49
Author: Hong Liang Teoh <li...@amazon.com>
AuthorDate: Thu Nov 24 16:36:47 2022 +0000
[FLINK-29895][Connectors/DynamoDB] Further increase unit test coverage for DynamoDbSink
---
.../connector/dynamodb/sink/DynamoDbSink.java | 3 +-
.../dynamodb/sink/DynamoDbSinkWriter.java | 51 +-
.../dynamodb/sink/DynamoDbWriteRequest.java | 21 +-
.../sink/client/DynamoDbAsyncClientProvider.java | 75 +++
.../dynamodb/sink/client/SdkClientProvider.java | 36 ++
.../connector/dynamodb/sink/DynamoDbSinkTest.java | 17 +
.../dynamodb/sink/DynamoDbSinkWriterTest.java | 592 +++++++++++++++++++++
.../dynamodb/sink/DynamoDbWriteRequestTest.java | 17 +
.../sink/DynamoDbWriterStateSerializerTest.java | 10 +-
.../table/RowDataElementConverterTest.java | 8 +-
.../util/DynamoDbSerializationUtilTest.java | 26 +-
11 files changed, 791 insertions(+), 65 deletions(-)
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
index e17d5f8..c90aa19 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.dynamodb.sink.client.DynamoDbAsyncClientProvider;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
@@ -151,7 +152,7 @@ public class DynamoDbSink<InputT> extends AsyncSinkBase<InputT, DynamoDbWriteReq
failOnError,
tableName,
overwriteByPartitionKeys,
- dynamoDbClientProperties,
+ new DynamoDbAsyncClientProvider(dynamoDbClientProperties),
recoveredState);
}
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
index d371727..2e6b26f 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
@@ -20,13 +20,12 @@ package org.apache.flink.connector.dynamodb.sink;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
-import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
import org.apache.flink.connector.dynamodb.util.PrimaryKeyBuilder;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -36,7 +35,6 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
@@ -53,7 +51,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
@@ -96,7 +93,6 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
(err) ->
err instanceof DynamoDbException
&& ((DynamoDbException) err)
- .toBuilder()
.awsErrorDetails()
.errorCode()
.equalsIgnoreCase("ValidationException"),
@@ -123,9 +119,7 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
/* The sink writer metric group */
private final SinkWriterMetricGroup metrics;
- /* The asynchronous http client for the asynchronous DynamoDb client */
- private final SdkAsyncHttpClient httpClient;
- private final DynamoDbAsyncClient client;
+ private final SdkClientProvider<DynamoDbAsyncClient> clientProvider;
private final boolean failOnError;
private final String tableName;
@@ -143,7 +137,7 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
boolean failOnError,
String tableName,
List<String> overwriteByPartitionKeys,
- Properties dynamoDbClientProperties,
+ SdkClientProvider<DynamoDbAsyncClient> clientProvider,
Collection<BufferedRequestState<DynamoDbWriteRequest>> states) {
super(
elementConverter,
@@ -161,31 +155,7 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
this.metrics = context.metricGroup();
this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
this.numRecordsSendPartialFailure = metrics.counter("numRecordsSendPartialFailure");
- this.httpClient =
- AWSGeneralUtil.createAsyncHttpClient(
- overrideClientProperties(dynamoDbClientProperties));
- this.client = buildClient(dynamoDbClientProperties, httpClient);
- }
-
- private Properties overrideClientProperties(Properties dynamoDbClientProperties) {
- Properties overridenProperties = new Properties();
- overridenProperties.putAll(dynamoDbClientProperties);
-
- // Specify HTTP1_1 protocol since DynamoDB endpoint doesn't support HTTP2
- overridenProperties.put(AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP1_1");
- return overridenProperties;
- }
-
- private DynamoDbAsyncClient buildClient(
- Properties dynamoDbClientProperties, SdkAsyncHttpClient httpClient) {
- AWSGeneralUtil.validateAwsCredentials(dynamoDbClientProperties);
-
- return AWSAsyncSinkUtil.createAwsAsyncClient(
- dynamoDbClientProperties,
- httpClient,
- DynamoDbAsyncClient.builder(),
- DynamoDbConfigConstants.BASE_DYNAMODB_USER_AGENT_PREFIX_FORMAT,
- DynamoDbConfigConstants.DYNAMODB_CLIENT_USER_AGENT_PREFIX);
+ this.clientProvider = clientProvider;
}
@Override
@@ -211,10 +181,12 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
}
CompletableFuture<BatchWriteItemResponse> future =
- client.batchWriteItem(
- BatchWriteItemRequest.builder()
- .requestItems(ImmutableMap.of(tableName, items))
- .build());
+ clientProvider
+ .getClient()
+ .batchWriteItem(
+ BatchWriteItemRequest.builder()
+ .requestItems(ImmutableMap.of(tableName, items))
+ .build());
future.whenComplete(
(response, err) -> {
@@ -259,6 +231,7 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
}
private boolean isRetryable(Throwable err) {
+ // isFatal() is really isNotFatal()
if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, getFatalExceptionCons())) {
return false;
}
@@ -281,7 +254,7 @@ class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, DynamoDbWriteRe
@Override
public void close() {
- AWSGeneralUtil.closeResources(httpClient, client);
+ AWSGeneralUtil.closeResources(clientProvider);
}
private WriteRequest convertToWriteRequest(DynamoDbWriteRequest dynamoDbWriteRequest) {
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java
index 4fd340a..c8e3c88 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java
@@ -25,7 +25,6 @@ import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.io.Serializable;
import java.util.Map;
-import java.util.Objects;
/**
* Represents a single Write Request to DynamoDb. Contains the item to be written as well as the
@@ -52,25 +51,13 @@ public class DynamoDbWriteRequest implements Serializable {
return type;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- DynamoDbWriteRequest that = (DynamoDbWriteRequest) o;
- return item.equals(that.item) && type == that.type;
+ public static Builder builder() {
+ return new Builder();
}
@Override
- public int hashCode() {
- return Objects.hash(item, type);
- }
-
- public static Builder builder() {
- return new Builder();
+ public String toString() {
+ return "DynamoDbWriteRequest{" + "item=" + item + ", type=" + type + '}';
}
/** Builder for DynamoDbWriteRequest. */
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/client/DynamoDbAsyncClientProvider.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/client/DynamoDbAsyncClientProvider.java
new file mode 100644
index 0000000..f72e304
--- /dev/null
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/client/DynamoDbAsyncClientProvider.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink.client;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbConfigConstants;
+
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+
+import java.util.Properties;
+
+/** Provides a {@link DynamoDbAsyncClient}. */
+@Internal
+public class DynamoDbAsyncClientProvider implements SdkClientProvider<DynamoDbAsyncClient> {
+
+ private final SdkAsyncHttpClient httpClient;
+ private final DynamoDbAsyncClient dynamoDbAsyncClient;
+
+ public DynamoDbAsyncClientProvider(Properties clientProperties) {
+ this.httpClient =
+ AWSGeneralUtil.createAsyncHttpClient(overrideClientProperties(clientProperties));
+ this.dynamoDbAsyncClient = buildClient(clientProperties, httpClient);
+ }
+
+ @Override
+ public DynamoDbAsyncClient getClient() {
+ return dynamoDbAsyncClient;
+ }
+
+ @Override
+ public void close() {
+ AWSGeneralUtil.closeResources(httpClient, dynamoDbAsyncClient);
+ }
+
+ private Properties overrideClientProperties(Properties dynamoDbClientProperties) {
+ Properties overridenProperties = new Properties();
+ overridenProperties.putAll(dynamoDbClientProperties);
+
+ // Specify HTTP1_1 protocol since DynamoDB endpoint doesn't support HTTP2
+ overridenProperties.putIfAbsent(AWSConfigConstants.HTTP_PROTOCOL_VERSION, "HTTP1_1");
+ return overridenProperties;
+ }
+
+ private DynamoDbAsyncClient buildClient(
+ Properties dynamoDbClientProperties, SdkAsyncHttpClient httpClient) {
+ AWSGeneralUtil.validateAwsCredentials(dynamoDbClientProperties);
+
+ return AWSAsyncSinkUtil.createAwsAsyncClient(
+ dynamoDbClientProperties,
+ httpClient,
+ DynamoDbAsyncClient.builder(),
+ DynamoDbConfigConstants.BASE_DYNAMODB_USER_AGENT_PREFIX_FORMAT,
+ DynamoDbConfigConstants.DYNAMODB_CLIENT_USER_AGENT_PREFIX);
+ }
+}
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/client/SdkClientProvider.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/client/SdkClientProvider.java
new file mode 100644
index 0000000..c4ce097
--- /dev/null
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/client/SdkClientProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink.client;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.core.SdkClient;
+import software.amazon.awssdk.utils.SdkAutoCloseable;
+
+/** Provides a {@link SdkClient}. */
+@Internal
+public interface SdkClientProvider<T extends SdkClient> extends SdkAutoCloseable {
+
+ /**
+ * Returns {@link T}.
+ *
+ * @return the AWS SDK client
+ */
+ T getClient();
+}
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
index c2f88eb..9cf198e 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkTest.java
@@ -34,6 +34,7 @@ import static org.apache.flink.connector.aws.config.AWSConfigConstants.Credentia
import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.SYS_PROP;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider.WEB_IDENTITY_TOKEN;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
/** Test for {@link DynamoDbSink}. */
public class DynamoDbSinkTest {
@@ -229,6 +230,22 @@ public class DynamoDbSinkTest {
.withMessageContaining("Invalid AWS Credential Provider Type set in config.");
}
+ @Test
+ public void testGetWriterStateSerializer() {
+ Properties properties = getDefaultProperties();
+ properties.put(AWS_CREDENTIALS_PROVIDER, "INVALID_CREDENTIALS_PROVIDER");
+ DynamoDbSink<Map<String, AttributeValue>> sink =
+ DynamoDbSink.<Map<String, AttributeValue>>builder()
+ .setElementConverter(new TestDynamoDbElementConverter())
+ .setDynamoDbProperties(properties)
+ .setTableName("test_table")
+ .build();
+
+ assertThat(sink.getWriterStateSerializer())
+ .usingRecursiveComparison()
+ .isEqualTo(new DynamoDbWriterStateSerializer());
+ }
+
private Properties getDefaultProperties() {
Properties properties = new Properties();
properties.setProperty(AWS_REGION, "us-east-1");
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
new file mode 100644
index 0000000..9a178e7
--- /dev/null
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.dynamodb.sink;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
+import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
+import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughputExceededException;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
+import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+import software.amazon.awssdk.services.sts.model.StsException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DynamoDbSinkWriter}. */
+public class DynamoDbSinkWriterTest {
+
+ private static final String PARTITION_KEY = "partition_key";
+ private static final String SORT_KEY = "sort_key";
+ private static final String TABLE_NAME = "table_name";
+ private static final long FUTURE_TIMEOUT_MS = 1000;
+
+ @Test
+ public void testSuccessfulRequestWithNoDeduplication() throws Exception {
+ List<String> overwriteByPartitionKeys = Collections.emptyList();
+ List<DynamoDbWriteRequest> inputRequests =
+ ImmutableList.of(
+ sinkPutRequest(item("pk", "1")),
+ sinkPutRequest(item("pk", "2")),
+ sinkDeleteRequest(item("pk", "3")),
+ sinkDeleteRequest(item("pk", "4")));
+ List<WriteRequest> expectedClientRequests =
+ ImmutableList.of(
+ dynamoDbPutRequest(item("pk", "1")),
+ dynamoDbPutRequest(item("pk", "2")),
+ dynamoDbDeleteRequest(item("pk", "3")),
+ dynamoDbDeleteRequest(item("pk", "4")));
+
+ TrackingDynamoDbAsyncClient trackingDynamoDbAsyncClient = new TrackingDynamoDbAsyncClient();
+ DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+ getDefaultSinkWriter(
+ true, overwriteByPartitionKeys, () -> trackingDynamoDbAsyncClient);
+ CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+ Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+ dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer);
+ assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
+ .isNotEmpty()
+ .containsExactly(expectedClientRequests);
+ assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)).isEmpty();
+ }
+
+ @Test
+ public void testPutRequestPartitionKeyDeduplication() throws Exception {
+ List<String> overwriteByPartitionKeys = ImmutableList.of(PARTITION_KEY);
+ List<DynamoDbWriteRequest> inputRequests =
+ ImmutableList.of(sinkPutRequest(item("pk", "1")), sinkPutRequest(item("pk", "2")));
+ List<WriteRequest> expectedClientRequests =
+ ImmutableList.of(dynamoDbPutRequest(item("pk", "2")));
+
+ TrackingDynamoDbAsyncClient trackingDynamoDbAsyncClient = new TrackingDynamoDbAsyncClient();
+ DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+ getDefaultSinkWriter(
+ true, overwriteByPartitionKeys, () -> trackingDynamoDbAsyncClient);
+ CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+ Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+ dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer);
+ assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
+ .isNotEmpty()
+ .containsExactly(expectedClientRequests);
+ assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)).isEmpty();
+ }
+
+ @Test
+ public void testDeleteRequestPartitionKeyDeduplication() throws Exception {
+ List<String> overwriteByPartitionKeys = ImmutableList.of(PARTITION_KEY);
+ List<DynamoDbWriteRequest> inputRequests =
+ ImmutableList.of(
+ sinkDeleteRequest(item("pk", "1")), sinkDeleteRequest(item("pk", "2")));
+ List<WriteRequest> expectedClientRequests =
+ ImmutableList.of(dynamoDbDeleteRequest(item("pk", "2")));
+
+ TrackingDynamoDbAsyncClient trackingDynamoDbAsyncClient = new TrackingDynamoDbAsyncClient();
+ DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+ getDefaultSinkWriter(
+ true, overwriteByPartitionKeys, () -> trackingDynamoDbAsyncClient);
+ CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+ Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+ dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer);
+ assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
+ .isNotEmpty()
+ .containsExactly(expectedClientRequests);
+ assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)).isEmpty();
+ }
+
+ @Test
+ public void testMultipleKeyDeduplication() throws Exception {
+ List<String> overwriteByPartitionKeys = ImmutableList.of(PARTITION_KEY, SORT_KEY);
+ List<DynamoDbWriteRequest> inputRequests =
+ ImmutableList.of(
+ sinkPutRequest(item("pk", "1")),
+ sinkPutRequest(item("pk", "2")),
+ sinkPutRequest(itemWithPayload("pk", "2", "string_payload_1")),
+ sinkPutRequest(itemWithPayload("pk", "2", "string_payload_2")));
+ List<WriteRequest> expectedClientRequests =
+ ImmutableList.of(
+ dynamoDbPutRequest(item("pk", "1")),
+ dynamoDbPutRequest(itemWithPayload("pk", "2", "string_payload_2")));
+
+ TrackingDynamoDbAsyncClient trackingDynamoDbAsyncClient = new TrackingDynamoDbAsyncClient();
+ DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+ getDefaultSinkWriter(
+ true, overwriteByPartitionKeys, () -> trackingDynamoDbAsyncClient);
+ CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+ Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+ dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer);
+ // Order does not matter in a batch write request
+ assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
+ .isNotEmpty()
+ .allSatisfy(
+ clientBatchRequest ->
+ assertThat(clientBatchRequest)
+ .containsExactlyInAnyOrderElementsOf(
+ expectedClientRequests));
+ assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS)).isEmpty();
+ }
+
+ @Test
+ public void testRetryableExceptionWhenFailOnErrorOnWillNotRetry() {
+ Optional<Exception> exceptionToThrow = getGenericRetryableException();
+ boolean failOnError = true;
+
+ assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+ }
+
+ @Test
+ public void testRetryableExceptionWhenFailOnErrorOffWillRetry() throws Exception {
+ Optional<Exception> exceptionToThrow = getGenericRetryableException();
+ boolean failOnError = false;
+
+ List<DynamoDbWriteRequest> inputRequests = getDefaultInputRequests();
+ ThrowingDynamoDbAsyncClient<Exception> throwingDynamoDbAsyncClient =
+ new ThrowingDynamoDbAsyncClient<>(exceptionToThrow, str -> true);
+ DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+ getDefaultSinkWriter(
+ failOnError, Collections.emptyList(), () -> throwingDynamoDbAsyncClient);
+ CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+ Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+ dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer);
+
+ assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS))
+ .containsExactlyInAnyOrderElementsOf(inputRequests);
+ }
+
+ @Test
+ public void testPartiallyFailedRequestRetriesFailedRecords() throws Exception {
+ boolean failOnError = false;
+ List<DynamoDbWriteRequest> inputRequests =
+ ImmutableList.of(
+ sinkPutRequest(item("put_will_fail_pk", "1")),
+ sinkPutRequest(item("put_will_not_fail_pk", "2")),
+ sinkDeleteRequest(item("delete_will_fail_pk", "3")),
+ sinkDeleteRequest(item("delete_will_not_fail_pk", "4")));
+ List<DynamoDbWriteRequest> expectedRetriedRecords =
+ ImmutableList.of(
+ sinkPutRequest(item("put_will_fail_pk", "1")),
+ sinkDeleteRequest(item("delete_will_fail_pk", "3")));
+ Predicate<String> failWhenPartitionKeyMatcher = str -> str.contains("will_fail_pk");
+
+ FailingRecordsDynamoDbAsyncClient failingRecordsDynamoDbAsyncClient =
+ new FailingRecordsDynamoDbAsyncClient(failWhenPartitionKeyMatcher);
+ DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+ getDefaultSinkWriter(
+ failOnError,
+ Collections.emptyList(),
+ () -> failingRecordsDynamoDbAsyncClient);
+ CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+ Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+ dynamoDbSinkWriter.submitRequestEntries(inputRequests, failedRequestConsumer);
+
+ assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, TimeUnit.MILLISECONDS))
+ .usingRecursiveComparison()
+ .isEqualTo(expectedRetriedRecords);
+ }
+
+ @Test
+ public void testNonRetryableExceptionWhenFailOnErrorOnWillNotRetry() {
+ Optional<Exception> exceptionToThrow = getGenericNonRetryableException();
+ boolean failOnError = true;
+
+ assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+ }
+
+ @Test
+ public void testNonRetryableExceptionWhenFailOnErrorOffWillNotRetry() {
+ Optional<Exception> exceptionToThrow = getGenericNonRetryableException();
+ boolean failOnError = false;
+
+ assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+ }
+
+ @Test
+ public void testInterruptedExceptionIsNonRetryable() {
+ Optional<Exception> exceptionToThrow = Optional.of(new InterruptedException());
+ boolean failOnError = false;
+
+ assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+ }
+
+ @Test
+ public void testInvalidCredentialsExceptionIsNonRetryable() {
+ Optional<Exception> exceptionToThrow = Optional.of(StsException.builder().build());
+ boolean failOnError = false;
+
+ assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+ }
+
+ @Test
+ public void testResourceNotFoundExceptionIsNonRetryable() {
+ Optional<Exception> exceptionToThrow =
+ Optional.of(ResourceNotFoundException.builder().build());
+ boolean failOnError = false;
+
+ assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+ }
+
+ @Test
+ public void testConditionalCheckFailedExceptionIsNonRetryable() {
+ Optional<Exception> exceptionToThrow =
+ Optional.of(ConditionalCheckFailedException.builder().build());
+ boolean failOnError = false;
+
+ assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+ }
+
+ @Test
+ public void testValidationExceptionIsNonRetryable() {
+ Optional<Exception> exceptionToThrow =
+ Optional.of(
+ DynamoDbException.builder()
+ .awsErrorDetails(
+ AwsErrorDetails.builder()
+ .errorCode("ValidationException")
+ .build())
+ .build());
+ boolean failOnError = false;
+
+ assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+ }
+
+ @Test
+ public void testSdkClientExceptionIsNonRetryable() {
+ Optional<Exception> exceptionToThrow = Optional.of(SdkClientException.builder().build());
+ boolean failOnError = false;
+
+ assertThatRequestsAreNotRetried(failOnError, exceptionToThrow);
+ }
+
+ @Test
+ public void testGetSizeInBytesNotImplemented() {
+ DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+ getDefaultSinkWriter(
+ false, Collections.emptyList(), () -> new TrackingDynamoDbAsyncClient());
+ assertThat(dynamoDbSinkWriter.getSizeInBytes(sinkPutRequest(item("pk", "1")))).isEqualTo(0);
+ }
+
+ @Test
+ public void testClientClosesWhenWriterIsClosed() {
+ TestAsyncDynamoDbClientProvider testAsyncDynamoDbClientProvider =
+ new TestAsyncDynamoDbClientProvider(new TrackingDynamoDbAsyncClient());
+ DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+ getDefaultSinkWriter(
+ false, Collections.emptyList(), testAsyncDynamoDbClientProvider);
+ dynamoDbSinkWriter.close();
+
+ assertThat(testAsyncDynamoDbClientProvider.getCloseCount()).isEqualTo(1);
+ }
+
+ private void assertThatRequestsAreNotRetried(
+ boolean failOnError, Optional<Exception> exceptionToThrow) {
+ ThrowingDynamoDbAsyncClient<Exception> throwingDynamoDbAsyncClient =
+ new ThrowingDynamoDbAsyncClient<>(exceptionToThrow, str -> true);
+ DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
+ getDefaultSinkWriter(
+ failOnError, Collections.emptyList(), () -> throwingDynamoDbAsyncClient);
+ CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new CompletableFuture<>();
+ Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = failedRequests::complete;
+
+ dynamoDbSinkWriter.submitRequestEntries(getDefaultInputRequests(), failedRequestConsumer);
+ assertThat(failedRequests).isNotCompleted();
+ }
+
+ private DynamoDbSinkWriter<Map<String, AttributeValue>> getDefaultSinkWriter(
+ boolean failOnError,
+ List<String> overwriteByPartitionKeys,
+ Supplier<DynamoDbAsyncClient> clientSupplier) {
+ return getDefaultSinkWriter(
+ failOnError,
+ overwriteByPartitionKeys,
+ new TestAsyncDynamoDbClientProvider(clientSupplier.get()));
+ }
+
+ private DynamoDbSinkWriter<Map<String, AttributeValue>> getDefaultSinkWriter(
+ boolean failOnError,
+ List<String> overwriteByPartitionKeys,
+ SdkClientProvider<DynamoDbAsyncClient> dynamoDbAsyncClientProvider) {
+ Sink.InitContext initContext = new TestSinkInitContext();
+ return new DynamoDbSinkWriter(
+ new TestDynamoDbElementConverter(),
+ initContext,
+ 2,
+ 1,
+ 10,
+ 1024,
+ 1000,
+ 1024,
+ failOnError,
+ TABLE_NAME,
+ overwriteByPartitionKeys,
+ dynamoDbAsyncClientProvider,
+ Collections.emptyList());
+ }
+
+ private List<DynamoDbWriteRequest> getDefaultInputRequests() {
+ return ImmutableList.of(sinkPutRequest(item("pk", "1")), sinkPutRequest(item("pk", "2")));
+ }
+
+ private Optional<Exception> getGenericRetryableException() {
+ return Optional.of(
+ ProvisionedThroughputExceededException.builder()
+ .awsErrorDetails(
+ AwsErrorDetails.builder()
+ .errorCode("SomeErrorCodeThatIsNotUsed")
+ .build())
+ .build());
+ }
+
+ private Optional<Exception> getGenericNonRetryableException() {
+ return Optional.of(
+ ResourceNotFoundException.builder()
+ .awsErrorDetails(
+ AwsErrorDetails.builder()
+ .errorCode("SomeErrorCodeThatIsNotUsed")
+ .build())
+ .build());
+ }
+
+ private DynamoDbWriteRequest sinkPutRequest(Map<String, AttributeValue> item) {
+ return DynamoDbWriteRequest.builder()
+ .setType(DynamoDbWriteRequestType.PUT)
+ .setItem(item)
+ .build();
+ }
+
+ private DynamoDbWriteRequest sinkDeleteRequest(Map<String, AttributeValue> item) {
+ return DynamoDbWriteRequest.builder()
+ .setType(DynamoDbWriteRequestType.DELETE)
+ .setItem(item)
+ .build();
+ }
+
+ private WriteRequest dynamoDbPutRequest(Map<String, AttributeValue> item) {
+ return WriteRequest.builder().putRequest(PutRequest.builder().item(item).build()).build();
+ }
+
+ private WriteRequest dynamoDbDeleteRequest(Map<String, AttributeValue> item) {
+ return WriteRequest.builder()
+ .deleteRequest(DeleteRequest.builder().key(item).build())
+ .build();
+ }
+
+ private Map<String, AttributeValue> item(String partitionKey, String sortKey) {
+ return ImmutableMap.<String, AttributeValue>builder()
+ .put(PARTITION_KEY, AttributeValue.builder().s(partitionKey).build())
+ .put(SORT_KEY, AttributeValue.builder().n(sortKey).build())
+ .put("string_payload", AttributeValue.builder().s("some_strings").build())
+ .put("number_payload", AttributeValue.builder().n("1234").build())
+ .build();
+ }
+
+ private Map<String, AttributeValue> itemWithPayload(
+ String partitionKey, String sortKey, String payload) {
+ return ImmutableMap.<String, AttributeValue>builder()
+ .put(PARTITION_KEY, AttributeValue.builder().s(partitionKey).build())
+ .put(SORT_KEY, AttributeValue.builder().n(sortKey).build())
+ .put("string_payload", AttributeValue.builder().s(payload).build())
+ .put("number_payload", AttributeValue.builder().n("1234").build())
+ .build();
+ }
+
+ private static class TestAsyncDynamoDbClientProvider
+ implements SdkClientProvider<DynamoDbAsyncClient> {
+
+ private final DynamoDbAsyncClient dynamoDbAsyncClient;
+ private int closeCount = 0;
+
+ private TestAsyncDynamoDbClientProvider(DynamoDbAsyncClient dynamoDbAsyncClient) {
+ this.dynamoDbAsyncClient = dynamoDbAsyncClient;
+ }
+
+ @Override
+ public DynamoDbAsyncClient getClient() {
+ return dynamoDbAsyncClient;
+ }
+
+ @Override
+ public void close() {
+ closeCount++;
+ }
+
+ public int getCloseCount() {
+ return closeCount;
+ }
+ }
+
+ private static class TrackingDynamoDbAsyncClient implements DynamoDbAsyncClient {
+
+ private List<List<WriteRequest>> requestHistory = new ArrayList<>();
+
+ @Override
+ public String serviceName() {
+ return "DynamoDB";
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public CompletableFuture<BatchWriteItemResponse> batchWriteItem(
+ BatchWriteItemRequest batchWriteItemRequest) {
+ requestHistory.add(batchWriteItemRequest.requestItems().get(TABLE_NAME));
+ return CompletableFuture.completedFuture(BatchWriteItemResponse.builder().build());
+ }
+
+ public List<List<WriteRequest>> getRequestHistory() {
+ return requestHistory;
+ }
+ }
+
+ private static class ThrowingDynamoDbAsyncClient<T extends Throwable>
+ implements DynamoDbAsyncClient {
+
+ private final Optional<T> errorToReturn;
+ private final Predicate<String> failWhenMatched;
+
+ private ThrowingDynamoDbAsyncClient(
+ Optional<T> errorToReturn, Predicate<String> failWhenMatched) {
+ this.errorToReturn = errorToReturn;
+ this.failWhenMatched = failWhenMatched;
+ }
+
+ @Override
+ public String serviceName() {
+ return "DynamoDB";
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public CompletableFuture<BatchWriteItemResponse> batchWriteItem(
+ BatchWriteItemRequest batchWriteItemRequest) {
+ if (errorToReturn.isPresent()) {
+ CompletableFuture<BatchWriteItemResponse> future = new CompletableFuture<>();
+ future.completeExceptionally(
+ DynamoDbException.builder().cause((errorToReturn.get())).build());
+ return future;
+ }
+
+ List<WriteRequest> failedRequests =
+ batchWriteItemRequest.requestItems().get(TABLE_NAME).stream()
+ .filter(
+ writeRequest ->
+ failWhenMatched.test(
+ writeRequest
+ .putRequest()
+ .item()
+ .get(PARTITION_KEY)
+ .s()))
+ .collect(Collectors.toList());
+
+ BatchWriteItemResponse.Builder responseBuilder = BatchWriteItemResponse.builder();
+ if (!failedRequests.isEmpty()) {
+ responseBuilder =
+ responseBuilder.unprocessedItems(
+ ImmutableMap.of(TABLE_NAME, failedRequests));
+ }
+ return CompletableFuture.completedFuture(responseBuilder.build());
+ }
+ }
+
+ private static class FailingRecordsDynamoDbAsyncClient implements DynamoDbAsyncClient {
+ private final Predicate<String> failWhenMatched;
+
+ private FailingRecordsDynamoDbAsyncClient(Predicate<String> failWhenMatched) {
+ this.failWhenMatched = failWhenMatched;
+ }
+
+ @Override
+ public String serviceName() {
+ return "DynamoDB";
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public CompletableFuture<BatchWriteItemResponse> batchWriteItem(
+ BatchWriteItemRequest batchWriteItemRequest) {
+ List<WriteRequest> failedRequests =
+ batchWriteItemRequest.requestItems().get(TABLE_NAME).stream()
+ .filter(
+ writeRequest -> {
+ if (writeRequest.putRequest() != null) {
+ return failWhenMatched.test(
+ writeRequest
+ .putRequest()
+ .item()
+ .get(PARTITION_KEY)
+ .s());
+ } else if (writeRequest.deleteRequest() != null) {
+ return failWhenMatched.test(
+ writeRequest
+ .deleteRequest()
+ .key()
+ .get(PARTITION_KEY)
+ .s());
+ } else {
+ throw new RuntimeException(
+ "Write request cannot be empty");
+ }
+ })
+ .collect(Collectors.toList());
+
+ BatchWriteItemResponse.Builder responseBuilder = BatchWriteItemResponse.builder();
+ if (!failedRequests.isEmpty()) {
+ responseBuilder =
+ responseBuilder.unprocessedItems(
+ ImmutableMap.of(TABLE_NAME, failedRequests));
+ }
+ return CompletableFuture.completedFuture(responseBuilder.build());
+ }
+ }
+}
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java
index 49a8894..f41cac0 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequestTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.connector.dynamodb.sink;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -44,4 +46,19 @@ class DynamoDbWriteRequestTest {
+ "We need to check this, and update the DynamoDbWriterStateSerializer if required.")
.hasSize(11);
}
+
+ @Test
+ public void testToString() {
+ DynamoDbWriteRequest dynamoDbWriteRequest =
+ DynamoDbWriteRequest.builder()
+ .setItem(
+ ImmutableMap.of(
+ "testKey", AttributeValue.builder().s("testValue").build()))
+ .setType(DynamoDbWriteRequestType.PUT)
+ .build();
+ assertThat(dynamoDbWriteRequest.toString())
+ .contains("testKey")
+ .contains("testValue")
+ .contains(DynamoDbWriteRequestType.PUT.toString());
+ }
}
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java
index b88000e..3925119 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriterStateSerializerTest.java
@@ -29,8 +29,8 @@ import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.getTestState;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
/**
* Tests for serializing and deserialzing a collection of {@link DynamoDbWriteRequest} with {@link
@@ -56,7 +56,13 @@ public class DynamoDbWriterStateSerializerTest {
BufferedRequestState<DynamoDbWriteRequest> actualState =
serializer.deserialize(1, serializer.serialize(expectedState));
- assertThatBufferStatesAreEqual(actualState, expectedState);
+ assertThat(actualState).usingRecursiveComparison().isEqualTo(expectedState);
+ }
+
+ @Test
+ public void testVersion() {
+ DynamoDbWriterStateSerializer serializer = new DynamoDbWriterStateSerializer();
+ assertThat(serializer.getVersion()).isEqualTo(1);
}
private int getRequestSize(DynamoDbWriteRequest requestEntry) {
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java
index 8ec38f2..1f92bf1 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java
@@ -60,7 +60,7 @@ public class RowDataElementConverterTest {
.setItem(rowDataToAttributeValueConverter.convertRowData(rowData))
.build();
- assertThat(actualWriteRequest).isEqualTo(expectedWriterequest);
+ assertThat(actualWriteRequest).usingRecursiveComparison().isEqualTo(expectedWriterequest);
}
@Test
@@ -73,7 +73,7 @@ public class RowDataElementConverterTest {
.setItem(rowDataToAttributeValueConverter.convertRowData(rowData))
.build();
- assertThat(actualWriteRequest).isEqualTo(expectedWriterequest);
+ assertThat(actualWriteRequest).usingRecursiveComparison().isEqualTo(expectedWriterequest);
}
@Test
@@ -98,7 +98,7 @@ public class RowDataElementConverterTest {
.setItem(rowDataToAttributeValueConverter.convertRowData(rowData))
.build();
- assertThat(actualWriteRequest).isEqualTo(expectedWriterequest);
+ assertThat(actualWriteRequest).usingRecursiveComparison().isEqualTo(expectedWriterequest);
}
@Test
@@ -123,7 +123,7 @@ public class RowDataElementConverterTest {
assertThat(transformedConverter).extracting("rowDataToAttributeValueConverter").isNotNull();
- assertThat(actualWriteRequest).isEqualTo(expectedWriterequest);
+ assertThat(actualWriteRequest).usingRecursiveComparison().isEqualTo(expectedWriterequest);
}
private RowData createElement(RowKind kind) {
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
index 367fe23..25b3fad 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java
@@ -100,7 +100,9 @@ public class DynamoDbSerializationUtilTest {
DataInputStream dataInputStream = new DataInputStream(inputStream)) {
DynamoDbWriteRequest deserializedWriteRequest =
DynamoDbSerializationUtil.deserializeWriteRequest(dataInputStream);
- assertThat(deserializedWriteRequest).isEqualTo(dynamoDbWriteRequest);
+ assertThat(deserializedWriteRequest)
+ .usingRecursiveComparison()
+ .isEqualTo(dynamoDbWriteRequest);
}
}
@@ -126,7 +128,9 @@ public class DynamoDbSerializationUtilTest {
DataInputStream dataInputStream = new DataInputStream(inputStream)) {
DynamoDbWriteRequest deserializedWriteRequest =
DynamoDbSerializationUtil.deserializeWriteRequest(dataInputStream);
- assertThat(deserializedWriteRequest).isEqualTo(dynamoDbWriteRequest);
+ assertThat(deserializedWriteRequest)
+ .usingRecursiveComparison()
+ .isEqualTo(dynamoDbWriteRequest);
}
}
@@ -152,4 +156,22 @@ public class DynamoDbSerializationUtilTest {
throw new RuntimeException(e);
}
}
+
+ @Test
+ public void testUnsupportedDynamoDbType() {
+ assertThatExceptionOfType(UnsupportedOperationException.class)
+ .isThrownBy(
+ () -> DynamoDbType.fromByteValue((byte) (DynamoDbType.values().length + 1)))
+ .withMessageContaining("Unsupported byte value");
+ }
+
+ @Test
+ public void testUnsupportedDynamoDbWriteRequestType() {
+ assertThatExceptionOfType(UnsupportedOperationException.class)
+ .isThrownBy(
+ () ->
+ DynamoDbWriteRequestType.fromByteValue(
+ (byte) (DynamoDbWriteRequestType.values().length + 1)))
+ .withMessageContaining("Unsupported byte value");
+ }
}