You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/11/22 17:13:10 UTC

[flink] branch master updated: [FLINK-29938][Connectors/Base] Add open() Method to AsyncSink ElementConverter (#21265)

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 91c4d865eab [FLINK-29938][Connectors/Base] Add open() Method to AsyncSink ElementConverter (#21265)
91c4d865eab is described below

commit 91c4d865eabad0f7f1b8c7426d87e86afa06d6f6
Author: Danny Cranmer <da...@apache.org>
AuthorDate: Tue Nov 22 17:13:02 2022 +0000

    [FLINK-29938][Connectors/Base] Add open() Method to AsyncSink ElementConverter (#21265)
---
 .../sink/KinesisFirehoseSinkElementConverter.java  | 49 ++++++++--------
 .../KinesisFirehoseSinkElementConverterTest.java   | 28 +++++++++
 .../sink/KinesisStreamsSinkElementConverter.java   | 50 ++++++++--------
 .../KinesisStreamsSinkElementConverterTest.java    | 57 +++++++++++++++++++
 .../base/sink/writer/AsyncSinkWriter.java          |  1 +
 .../base/sink/writer/ElementConverter.java         |  5 ++
 .../base/sink/writer/AsyncSinkWriterTest.java      | 66 +++++++++++++---------
 .../base/sink/writer/TestElementConverter.java     | 43 ++++++++++++++
 8 files changed, 222 insertions(+), 77 deletions(-)

diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
index b90db3387b9..f6e83fef2be 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
@@ -19,6 +19,7 @@ package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.metrics.MetricGroup;
@@ -39,7 +40,8 @@ import software.amazon.awssdk.services.firehose.model.Record;
 @Internal
 public class KinesisFirehoseSinkElementConverter<InputT>
         implements ElementConverter<InputT, Record> {
-    private boolean schemaOpened = false;
+
+    private static final long serialVersionUID = 1L;
 
     /** A serialization schema to specify how the input element should be serialized. */
     private final SerializationSchema<InputT> serializationSchema;
@@ -48,37 +50,34 @@ public class KinesisFirehoseSinkElementConverter<InputT>
         this.serializationSchema = serializationSchema;
     }
 
+    @Override
+    public void open(Sink.InitContext context) {
+        try {
+            serializationSchema.open(
+                    new SerializationSchema.InitializationContext() {
+                        @Override
+                        public MetricGroup getMetricGroup() {
+                            return new UnregisteredMetricsGroup();
+                        }
+
+                        @Override
+                        public UserCodeClassLoader getUserCodeClassLoader() {
+                            return SimpleUserCodeClassLoader.create(
+                                    KinesisFirehoseSinkElementConverter.class.getClassLoader());
+                        }
+                    });
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
+        }
+    }
+
     @Override
     public Record apply(InputT element, SinkWriter.Context context) {
-        checkOpened();
         return Record.builder()
                 .data(SdkBytes.fromByteArray(serializationSchema.serialize(element)))
                 .build();
     }
 
-    private void checkOpened() {
-        if (!schemaOpened) {
-            try {
-                serializationSchema.open(
-                        new SerializationSchema.InitializationContext() {
-                            @Override
-                            public MetricGroup getMetricGroup() {
-                                return new UnregisteredMetricsGroup();
-                            }
-
-                            @Override
-                            public UserCodeClassLoader getUserCodeClassLoader() {
-                                return SimpleUserCodeClassLoader.create(
-                                        KinesisFirehoseSinkElementConverter.class.getClassLoader());
-                            }
-                        });
-                schemaOpened = true;
-            } catch (Exception e) {
-                throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
-            }
-        }
-    }
-
     public static <InputT> Builder<InputT> builder() {
         return new Builder<>();
     }
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
index ccb45822c75..dbec4709d38 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.connector.firehose.sink;
 
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 
@@ -25,6 +26,7 @@ import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.firehose.model.Record;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Covers construction and sanity checking of {@link KinesisFirehoseSinkElementConverter}. */
@@ -51,4 +53,30 @@ class KinesisFirehoseSinkElementConverterTest {
         byte[] serializedString = (new SimpleStringSchema()).serialize(testString);
         assertThat(serializedRecord.data()).isEqualTo(SdkBytes.fromByteArray(serializedString));
     }
+
+    @Test
+    void elementConverterOpenInvokesSerializationSchemaOpen() {
+        OpenTestingSerializationSchema serializationSchema = new OpenTestingSerializationSchema();
+
+        KinesisFirehoseSinkElementConverter.<String>builder()
+                .setSerializationSchema(serializationSchema)
+                .build()
+                .open(null);
+
+        assertThat(serializationSchema.openCalled).isTrue();
+    }
+
+    private static class OpenTestingSerializationSchema implements SerializationSchema<String> {
+        private boolean openCalled;
+
+        @Override
+        public void open(SerializationSchema.InitializationContext context) throws Exception {
+            openCalled = true;
+        }
+
+        @Override
+        public byte[] serialize(String element) {
+            return element.getBytes(UTF_8);
+        }
+    }
 }
diff --git a/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
index a7e441128f2..3a644cba241 100644
--- a/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
@@ -19,6 +19,7 @@ package org.apache.flink.connector.kinesis.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.metrics.MetricGroup;
@@ -40,6 +41,8 @@ import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
 public class KinesisStreamsSinkElementConverter<InputT>
         implements ElementConverter<InputT, PutRecordsRequestEntry> {
 
+    private static final long serialVersionUID = 1L;
+
     /** A serialization schema to specify how the input element should be serialized. */
     private final SerializationSchema<InputT> serializationSchema;
 
@@ -48,8 +51,6 @@ public class KinesisStreamsSinkElementConverter<InputT>
      */
     private final PartitionKeyGenerator<InputT> partitionKeyGenerator;
 
-    private boolean schemaOpened = false;
-
     private KinesisStreamsSinkElementConverter(
             SerializationSchema<InputT> serializationSchema,
             PartitionKeyGenerator<InputT> partitionKeyGenerator) {
@@ -57,38 +58,35 @@ public class KinesisStreamsSinkElementConverter<InputT>
         this.partitionKeyGenerator = partitionKeyGenerator;
     }
 
+    @Override
+    public void open(Sink.InitContext context) {
+        try {
+            serializationSchema.open(
+                    new SerializationSchema.InitializationContext() {
+                        @Override
+                        public MetricGroup getMetricGroup() {
+                            return new UnregisteredMetricsGroup();
+                        }
+
+                        @Override
+                        public UserCodeClassLoader getUserCodeClassLoader() {
+                            return SimpleUserCodeClassLoader.create(
+                                    KinesisStreamsSinkElementConverter.class.getClassLoader());
+                        }
+                    });
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
+        }
+    }
+
     @Override
     public PutRecordsRequestEntry apply(InputT element, SinkWriter.Context context) {
-        checkOpened();
         return PutRecordsRequestEntry.builder()
                 .data(SdkBytes.fromByteArray(serializationSchema.serialize(element)))
                 .partitionKey(partitionKeyGenerator.apply(element))
                 .build();
     }
 
-    private void checkOpened() {
-        if (!schemaOpened) {
-            try {
-                serializationSchema.open(
-                        new SerializationSchema.InitializationContext() {
-                            @Override
-                            public MetricGroup getMetricGroup() {
-                                return new UnregisteredMetricsGroup();
-                            }
-
-                            @Override
-                            public UserCodeClassLoader getUserCodeClassLoader() {
-                                return SimpleUserCodeClassLoader.create(
-                                        KinesisStreamsSinkElementConverter.class.getClassLoader());
-                            }
-                        });
-                schemaOpened = true;
-            } catch (Exception e) {
-                throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
-            }
-        }
-    }
-
     public static <InputT> Builder<InputT> builder() {
         return new Builder<>();
     }
diff --git a/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverterTest.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverterTest.java
new file mode 100644
index 00000000000..cd6444c8094
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverterTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class KinesisStreamsSinkElementConverterTest {
+
+    @Test
+    void elementConverterOpenInvokesSerializationSchemaOpen() {
+        OpenTestingSerializationSchema serializationSchema = new OpenTestingSerializationSchema();
+
+        KinesisStreamsSinkElementConverter.<String>builder()
+                .setSerializationSchema(serializationSchema)
+                .setPartitionKeyGenerator(record -> UUID.randomUUID().toString())
+                .build()
+                .open(null);
+
+        assertThat(serializationSchema.openCalled).isTrue();
+    }
+
+    private static class OpenTestingSerializationSchema implements SerializationSchema<String> {
+        private boolean openCalled;
+
+        @Override
+        public void open(SerializationSchema.InitializationContext context) throws Exception {
+            openCalled = true;
+        }
+
+        @Override
+        public byte[] serialize(String element) {
+            return element.getBytes(UTF_8);
+        }
+    }
+}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index f6c2cc86e3b..7dea96df7b5 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -303,6 +303,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
                                 },
                                 "A fatal exception occurred in the sink that cannot be recovered from or should not be retried.");
 
+        elementConverter.open(context);
         initializeState(states);
     }
 
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
index de9f6e0da03..d70a27e43d9 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
@@ -18,6 +18,7 @@
 package org.apache.flink.connector.base.sink.writer;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 
 import java.io.Serializable;
@@ -33,4 +34,8 @@ import java.io.Serializable;
 @PublicEvolving
 public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
     RequestEntryT apply(InputT element, SinkWriter.Context context);
+
+    default void open(Sink.InitContext context) {
+        // No-op default implementation
+    }
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index bd25179b4ca..ac4d9447e15 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -73,6 +73,20 @@ public class AsyncSinkWriterTest {
         }
     }
 
+    @Test
+    public void testElementConvertOpenIsInvoked() {
+        TestElementConverter elementConverter = new TestElementConverter();
+        assertThat(elementConverter.getOpenCallCount()).isEqualTo(0);
+
+        new AsyncSinkWriterImplBuilder()
+                .elementConverter(elementConverter)
+                .context(sinkInitContext)
+                .build();
+
+        // The open() method should be called in the AsyncSinkWriter constructor
+        assertThat(elementConverter.getOpenCallCount()).isEqualTo(1);
+    }
+
     @Test
     public void testNumberOfRecordsIsAMultipleOfBatchSizeResultsInThatNumberOfRecordsBeingWritten()
             throws IOException, InterruptedException {
@@ -1025,29 +1039,7 @@ public class AsyncSinkWriterTest {
         private final int delay;
 
         private AsyncSinkWriterImpl(
-                Sink.InitContext context,
-                int maxBatchSize,
-                int maxInFlightRequests,
-                int maxBufferedRequests,
-                long maxBatchSizeInBytes,
-                long maxTimeInBufferMS,
-                long maxRecordSizeInBytes,
-                boolean simulateFailures,
-                int delay) {
-            this(
-                    context,
-                    maxBatchSize,
-                    maxInFlightRequests,
-                    maxBufferedRequests,
-                    maxBatchSizeInBytes,
-                    maxTimeInBufferMS,
-                    maxRecordSizeInBytes,
-                    simulateFailures,
-                    delay,
-                    Collections.emptyList());
-        }
-
-        private AsyncSinkWriterImpl(
+                ElementConverter<String, Integer> elementConverter,
                 Sink.InitContext context,
                 int maxBatchSize,
                 int maxInFlightRequests,
@@ -1060,7 +1052,7 @@ public class AsyncSinkWriterTest {
                 List<BufferedRequestState<Integer>> bufferedState) {
 
             super(
-                    (elem, ctx) -> Integer.parseInt(elem),
+                    elementConverter,
                     context,
                     AsyncSinkWriterConfiguration.builder()
                             .setMaxBatchSize(maxBatchSize)
@@ -1183,6 +1175,8 @@ public class AsyncSinkWriterTest {
     /** A builder for {@link AsyncSinkWriterImpl}. */
     private class AsyncSinkWriterImplBuilder {
 
+        private ElementConverter<String, Integer> elementConverter =
+                (elem, ctx) -> Integer.parseInt(elem);
         private boolean simulateFailures = false;
         private int delay = 0;
         private Sink.InitContext context;
@@ -1193,6 +1187,12 @@ public class AsyncSinkWriterTest {
         private long maxTimeInBufferMS = 1_000;
         private long maxRecordSizeInBytes = maxBatchSizeInBytes;
 
+        private AsyncSinkWriterImplBuilder elementConverter(
+                ElementConverter<String, Integer> elementConverter) {
+            this.elementConverter = elementConverter;
+            return this;
+        }
+
         private AsyncSinkWriterImplBuilder context(Sink.InitContext context) {
             this.context = context;
             return this;
@@ -1240,6 +1240,7 @@ public class AsyncSinkWriterTest {
 
         private AsyncSinkWriterImpl build() {
             return new AsyncSinkWriterImpl(
+                    elementConverter,
                     context,
                     maxBatchSize,
                     maxInFlightRequests,
@@ -1248,12 +1249,14 @@ public class AsyncSinkWriterTest {
                     maxTimeInBufferMS,
                     maxRecordSizeInBytes,
                     simulateFailures,
-                    delay);
+                    delay,
+                    Collections.emptyList());
         }
 
         private AsyncSinkWriterImpl buildWithState(
                 List<BufferedRequestState<Integer>> bufferedState) {
             return new AsyncSinkWriterImpl(
+                    elementConverter,
                     context,
                     maxBatchSize,
                     maxInFlightRequests,
@@ -1283,7 +1286,18 @@ public class AsyncSinkWriterTest {
                 CountDownLatch blockedThreadLatch,
                 CountDownLatch delayedStartLatch,
                 boolean blockForLimitedTime) {
-            super(context, 3, maxInFlightRequests, 20, 100, 100, 100, false, 0);
+            super(
+                    (elem, ctx) -> Integer.parseInt(elem),
+                    context,
+                    3,
+                    maxInFlightRequests,
+                    20,
+                    100,
+                    100,
+                    100,
+                    false,
+                    0,
+                    Collections.emptyList());
             this.blockedThreadLatch = blockedThreadLatch;
             this.delayedStartLatch = delayedStartLatch;
             this.blockForLimitedTime = blockForLimitedTime;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestElementConverter.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestElementConverter.java
new file mode 100644
index 00000000000..fe9d2afaeab
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestElementConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+
+/** A test implementation of {@link ElementConverter} used to verify the open() method. */
+public class TestElementConverter implements ElementConverter<String, Integer> {
+
+    private static final long serialVersionUID = 1L;
+    private int openCallCount;
+
+    @Override
+    public void open(Sink.InitContext context) {
+        openCallCount++;
+    }
+
+    @Override
+    public Integer apply(String element, SinkWriter.Context context) {
+        return openCallCount;
+    }
+
+    public int getOpenCallCount() {
+        return openCallCount;
+    }
+}