You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/11/09 08:57:09 UTC
[flink-connector-aws] branch main updated: [FLINK-29937][Connectors/DynamoDB] Rename DynamoDBEnhancedElementConverter to DynamoDbBeanElementConverter. Add missing @PublicEvolving and tweak comments
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new 611653b [FLINK-29937][Connectors/DynamoDB] Rename DynamoDBEnhancedElementConverter to DynamoDbBeanElementConverter. Add missing @PublicEvolving and tweak comments
611653b is described below
commit 611653b906e3083be3d421d442fd25636b7f7bdb
Author: Danny Cranmer <da...@apache.org>
AuthorDate: Wed Nov 9 07:51:07 2022 +0000
[FLINK-29937][Connectors/DynamoDB] Rename DynamoDBEnhancedElementConverter to DynamoDbBeanElementConverter. Add missing @PublicEvolving and tweak comments
---
...entConverter.java => DynamoDbBeanElementConverter.java} | 14 ++++++++------
...rterTest.java => DynamoDbBeanElementConverterTest.java} | 10 +++++-----
...entConverter.java => SinkDynamoDbBeanIntoDynamoDb.java} | 8 ++++----
.../org/apache/flink/connector/dynamodb/util/Order.java | 4 ++--
4 files changed, 19 insertions(+), 17 deletions(-)
diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverter.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java
similarity index 84%
rename from flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverter.java
rename to flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java
index 2b3053f..e6bffa5 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverter.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java
@@ -17,6 +17,7 @@
package org.apache.flink.connector.dynamodb.sink;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -27,9 +28,10 @@ import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
* A generic {@link ElementConverter} that uses the dynamodb-enhanced client to build a {@link
* DynamoDbWriteRequest} from a POJO annotated with {@link DynamoDbBean}.
*
- * @param <InputT> The
+ * @param <InputT> The type of the {@link DynamoDbBean} to convert into {@link DynamoDbWriteRequest}
*/
-public class DynamoDBEnhancedElementConverter<InputT>
+@PublicEvolving
+public class DynamoDbBeanElementConverter<InputT>
implements ElementConverter<InputT, DynamoDbWriteRequest> {
private static final long serialVersionUID = 1L;
@@ -38,11 +40,11 @@ public class DynamoDBEnhancedElementConverter<InputT>
private final boolean ignoreNulls;
private transient BeanTableSchema<InputT> tableSchema;
- public DynamoDBEnhancedElementConverter(final Class<InputT> recordType) {
+ public DynamoDbBeanElementConverter(final Class<InputT> recordType) {
this(recordType, false);
}
- public DynamoDBEnhancedElementConverter(
+ public DynamoDbBeanElementConverter(
final Class<InputT> recordType, final boolean ignoreNulls) {
this.recordType = recordType;
this.ignoreNulls = ignoreNulls;
@@ -54,8 +56,8 @@ public class DynamoDBEnhancedElementConverter<InputT>
@Override
public DynamoDbWriteRequest apply(InputT element, SinkWriter.Context context) {
if (tableSchema == null) {
- // We have to lazily initialise this because BeanTableSchema is not serializable and
- // there is no open() method
+ // We have to lazily initialize this because BeanTableSchema is not serializable and
+ // there is no open() method on ElementConverter (FLINK-29938)
tableSchema = createTableSchema(recordType);
}
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverterTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java
similarity index 89%
rename from flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverterTest.java
rename to flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java
index 57fb55b..258ab8b 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverterTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java
@@ -27,12 +27,12 @@ import static org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-class DynamoDBEnhancedElementConverterTest {
+class DynamoDbBeanElementConverterTest {
@Test
void testBadType() {
assertThatExceptionOfType(IllegalArgumentException.class)
- .isThrownBy(() -> new DynamoDBEnhancedElementConverter<>(Integer.class))
+ .isThrownBy(() -> new DynamoDbBeanElementConverter<>(Integer.class))
.withMessageContaining(
"A DynamoDb bean class must be annotated with @DynamoDbBean");
}
@@ -40,7 +40,7 @@ class DynamoDBEnhancedElementConverterTest {
@Test
void testConvertOrderToDynamoDbWriteRequest() {
ElementConverter<Order, DynamoDbWriteRequest> elementConverter =
- new DynamoDBEnhancedElementConverter<>(Order.class);
+ new DynamoDbBeanElementConverter<>(Order.class);
Order order = new Order("orderId", 1, 2.0);
DynamoDbWriteRequest actual = elementConverter.apply(order, null);
@@ -55,7 +55,7 @@ class DynamoDBEnhancedElementConverterTest {
@Test
void testConvertOrderToDynamoDbWriteRequestWithIgnoresNull() {
ElementConverter<Order, DynamoDbWriteRequest> elementConverter =
- new DynamoDBEnhancedElementConverter<>(Order.class, true);
+ new DynamoDbBeanElementConverter<>(Order.class, true);
Order order = new Order(null, 1, 2.0);
DynamoDbWriteRequest actual = elementConverter.apply(order, null);
@@ -66,7 +66,7 @@ class DynamoDBEnhancedElementConverterTest {
@Test
void testConvertOrderToDynamoDbWriteRequestWritesNull() {
ElementConverter<Order, DynamoDbWriteRequest> elementConverter =
- new DynamoDBEnhancedElementConverter<>(Order.class, false);
+ new DynamoDbBeanElementConverter<>(Order.class, false);
Order order = new Order(null, 1, 2.0);
DynamoDbWriteRequest actual = elementConverter.apply(order, null);
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDbUsingEnhancedElementConverter.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoDynamoDb.java
similarity index 90%
rename from flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDbUsingEnhancedElementConverter.java
rename to flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoDynamoDb.java
index f255e8e..f352a41 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDbUsingEnhancedElementConverter.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoDynamoDb.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.dynamodb.sink.examples;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.dynamodb.sink.DynamoDBEnhancedElementConverter;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbBeanElementConverter;
import org.apache.flink.connector.dynamodb.sink.DynamoDbSink;
import org.apache.flink.connector.dynamodb.util.Order;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -32,9 +32,9 @@ import java.util.UUID;
/**
* An example application demonstrating how to use the {@link DynamoDbSink} to sink into DynamoDb
- * using the {@link DynamoDBEnhancedElementConverter}.
+ * using the {@link DynamoDbBeanElementConverter}.
*/
-public class SinkIntoDynamoDbUsingEnhancedElementConverter {
+public class SinkDynamoDbBeanIntoDynamoDb {
private static final String DYNAMODB_TABLE = "orders";
private static final String REGION = "us-east-1";
@@ -48,7 +48,7 @@ public class SinkIntoDynamoDbUsingEnhancedElementConverter {
DynamoDbSink<Order> dynamoDbSink =
DynamoDbSink.<Order>builder()
.setDestinationTableName(DYNAMODB_TABLE)
- .setElementConverter(new DynamoDBEnhancedElementConverter<>(Order.class))
+ .setElementConverter(new DynamoDbBeanElementConverter<>(Order.class))
.setDynamoDbProperties(sinkProperties)
.build();
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java
index 81e651c..08a50ec 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java
@@ -1,10 +1,10 @@
package org.apache.flink.connector.dynamodb.util;
-import org.apache.flink.connector.dynamodb.sink.DynamoDBEnhancedElementConverter;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbBeanElementConverter;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
-/** A test {@link DynamoDbBean} POJO for use with {@link DynamoDBEnhancedElementConverter}. */
+/** A test {@link DynamoDbBean} POJO for use with {@link DynamoDbBeanElementConverter}. */
@DynamoDbBean
public class Order {