You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/11/09 08:57:09 UTC

[flink-connector-aws] branch main updated: [FLINK-29937][Connectors/DynamoDB] Rename DynamoDBEnhancedElementConverter to DynamoDbBeanElementConverter. Add missing @PublicEvolving and tweak comments

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new 611653b  [FLINK-29937][Connectors/DynamoDB] Rename DynamoDBEnhancedElementConverter to DynamoDbBeanElementConverter. Add missing @PublicEvolving and tweak comments
611653b is described below

commit 611653b906e3083be3d421d442fd25636b7f7bdb
Author: Danny Cranmer <da...@apache.org>
AuthorDate: Wed Nov 9 07:51:07 2022 +0000

    [FLINK-29937][Connectors/DynamoDB] Rename DynamoDBEnhancedElementConverter to DynamoDbBeanElementConverter. Add missing @PublicEvolving and tweak comments
---
 ...entConverter.java => DynamoDbBeanElementConverter.java} | 14 ++++++++------
 ...rterTest.java => DynamoDbBeanElementConverterTest.java} | 10 +++++-----
 ...entConverter.java => SinkDynamoDbBeanIntoDynamoDb.java} |  8 ++++----
 .../org/apache/flink/connector/dynamodb/util/Order.java    |  4 ++--
 4 files changed, 19 insertions(+), 17 deletions(-)

diff --git a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverter.java b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java
similarity index 84%
rename from flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverter.java
rename to flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java
index 2b3053f..e6bffa5 100644
--- a/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverter.java
+++ b/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.connector.dynamodb.sink;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 
@@ -27,9 +28,10 @@ import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
  * A generic {@link ElementConverter} that uses the dynamodb-enhanced client to build a {@link
  * DynamoDbWriteRequest} from a POJO annotated with {@link DynamoDbBean}.
  *
- * @param <InputT> The
+ * @param <InputT> The type of the {@link DynamoDbBean} to convert into {@link DynamoDbWriteRequest}
  */
-public class DynamoDBEnhancedElementConverter<InputT>
+@PublicEvolving
+public class DynamoDbBeanElementConverter<InputT>
         implements ElementConverter<InputT, DynamoDbWriteRequest> {
 
     private static final long serialVersionUID = 1L;
@@ -38,11 +40,11 @@ public class DynamoDBEnhancedElementConverter<InputT>
     private final boolean ignoreNulls;
     private transient BeanTableSchema<InputT> tableSchema;
 
-    public DynamoDBEnhancedElementConverter(final Class<InputT> recordType) {
+    public DynamoDbBeanElementConverter(final Class<InputT> recordType) {
         this(recordType, false);
     }
 
-    public DynamoDBEnhancedElementConverter(
+    public DynamoDbBeanElementConverter(
             final Class<InputT> recordType, final boolean ignoreNulls) {
         this.recordType = recordType;
         this.ignoreNulls = ignoreNulls;
@@ -54,8 +56,8 @@ public class DynamoDBEnhancedElementConverter<InputT>
     @Override
     public DynamoDbWriteRequest apply(InputT element, SinkWriter.Context context) {
         if (tableSchema == null) {
-            // We have to lazily initialise this because BeanTableSchema is not serializable and
-            // there is no open() method
+            // We have to lazily initialize this because BeanTableSchema is not serializable and
+            // there is no open() method on ElementConverter (FLINK-29938)
             tableSchema = createTableSchema(recordType);
         }
 
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverterTest.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java
similarity index 89%
rename from flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverterTest.java
rename to flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java
index 57fb55b..258ab8b 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDBEnhancedElementConverterTest.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverterTest.java
@@ -27,12 +27,12 @@ import static org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType.
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 
-class DynamoDBEnhancedElementConverterTest {
+class DynamoDbBeanElementConverterTest {
 
     @Test
     void testBadType() {
         assertThatExceptionOfType(IllegalArgumentException.class)
-                .isThrownBy(() -> new DynamoDBEnhancedElementConverter<>(Integer.class))
+                .isThrownBy(() -> new DynamoDbBeanElementConverter<>(Integer.class))
                 .withMessageContaining(
                         "A DynamoDb bean class must be annotated with @DynamoDbBean");
     }
@@ -40,7 +40,7 @@ class DynamoDBEnhancedElementConverterTest {
     @Test
     void testConvertOrderToDynamoDbWriteRequest() {
         ElementConverter<Order, DynamoDbWriteRequest> elementConverter =
-                new DynamoDBEnhancedElementConverter<>(Order.class);
+                new DynamoDbBeanElementConverter<>(Order.class);
         Order order = new Order("orderId", 1, 2.0);
 
         DynamoDbWriteRequest actual = elementConverter.apply(order, null);
@@ -55,7 +55,7 @@ class DynamoDBEnhancedElementConverterTest {
     @Test
     void testConvertOrderToDynamoDbWriteRequestWithIgnoresNull() {
         ElementConverter<Order, DynamoDbWriteRequest> elementConverter =
-                new DynamoDBEnhancedElementConverter<>(Order.class, true);
+                new DynamoDbBeanElementConverter<>(Order.class, true);
         Order order = new Order(null, 1, 2.0);
 
         DynamoDbWriteRequest actual = elementConverter.apply(order, null);
@@ -66,7 +66,7 @@ class DynamoDBEnhancedElementConverterTest {
     @Test
     void testConvertOrderToDynamoDbWriteRequestWritesNull() {
         ElementConverter<Order, DynamoDbWriteRequest> elementConverter =
-                new DynamoDBEnhancedElementConverter<>(Order.class, false);
+                new DynamoDbBeanElementConverter<>(Order.class, false);
         Order order = new Order(null, 1, 2.0);
 
         DynamoDbWriteRequest actual = elementConverter.apply(order, null);
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDbUsingEnhancedElementConverter.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoDynamoDb.java
similarity index 90%
rename from flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDbUsingEnhancedElementConverter.java
rename to flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoDynamoDb.java
index f255e8e..f352a41 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDbUsingEnhancedElementConverter.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoDynamoDb.java
@@ -20,7 +20,7 @@ package org.apache.flink.connector.dynamodb.sink.examples;
 
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.dynamodb.sink.DynamoDBEnhancedElementConverter;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbBeanElementConverter;
 import org.apache.flink.connector.dynamodb.sink.DynamoDbSink;
 import org.apache.flink.connector.dynamodb.util.Order;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -32,9 +32,9 @@ import java.util.UUID;
 
 /**
  * An example application demonstrating how to use the {@link DynamoDbSink} to sink into DynamoDb
- * using the {@link DynamoDBEnhancedElementConverter}.
+ * using the {@link DynamoDbBeanElementConverter}.
  */
-public class SinkIntoDynamoDbUsingEnhancedElementConverter {
+public class SinkDynamoDbBeanIntoDynamoDb {
 
     private static final String DYNAMODB_TABLE = "orders";
     private static final String REGION = "us-east-1";
@@ -48,7 +48,7 @@ public class SinkIntoDynamoDbUsingEnhancedElementConverter {
         DynamoDbSink<Order> dynamoDbSink =
                 DynamoDbSink.<Order>builder()
                         .setDestinationTableName(DYNAMODB_TABLE)
-                        .setElementConverter(new DynamoDBEnhancedElementConverter<>(Order.class))
+                        .setElementConverter(new DynamoDbBeanElementConverter<>(Order.class))
                         .setDynamoDbProperties(sinkProperties)
                         .build();
 
diff --git a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java
index 81e651c..08a50ec 100644
--- a/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java
+++ b/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/Order.java
@@ -1,10 +1,10 @@
 package org.apache.flink.connector.dynamodb.util;
 
-import org.apache.flink.connector.dynamodb.sink.DynamoDBEnhancedElementConverter;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbBeanElementConverter;
 
 import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
 
-/** A test {@link DynamoDbBean} POJO for use with {@link DynamoDBEnhancedElementConverter}. */
+/** A test {@link DynamoDbBean} POJO for use with {@link DynamoDbBeanElementConverter}. */
 @DynamoDbBean
 public class Order {