You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/11/22 17:13:10 UTC
[flink] branch master updated: [FLINK-29938][Connectors/Base] Add open() Method to AsyncSink ElementConverter (#21265)
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 91c4d865eab [FLINK-29938][Connectors/Base] Add open() Method to AsyncSink ElementConverter (#21265)
91c4d865eab is described below
commit 91c4d865eabad0f7f1b8c7426d87e86afa06d6f6
Author: Danny Cranmer <da...@apache.org>
AuthorDate: Tue Nov 22 17:13:02 2022 +0000
[FLINK-29938][Connectors/Base] Add open() Method to AsyncSink ElementConverter (#21265)
---
.../sink/KinesisFirehoseSinkElementConverter.java | 49 ++++++++--------
.../KinesisFirehoseSinkElementConverterTest.java | 28 +++++++++
.../sink/KinesisStreamsSinkElementConverter.java | 50 ++++++++--------
.../KinesisStreamsSinkElementConverterTest.java | 57 +++++++++++++++++++
.../base/sink/writer/AsyncSinkWriter.java | 1 +
.../base/sink/writer/ElementConverter.java | 5 ++
.../base/sink/writer/AsyncSinkWriterTest.java | 66 +++++++++++++---------
.../base/sink/writer/TestElementConverter.java | 43 ++++++++++++++
8 files changed, 222 insertions(+), 77 deletions(-)
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
index b90db3387b9..f6e83fef2be 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
@@ -19,6 +19,7 @@ package org.apache.flink.connector.firehose.sink;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.metrics.MetricGroup;
@@ -39,7 +40,8 @@ import software.amazon.awssdk.services.firehose.model.Record;
@Internal
public class KinesisFirehoseSinkElementConverter<InputT>
implements ElementConverter<InputT, Record> {
- private boolean schemaOpened = false;
+
+ private static final long serialVersionUID = 1L;
/** A serialization schema to specify how the input element should be serialized. */
private final SerializationSchema<InputT> serializationSchema;
@@ -48,37 +50,34 @@ public class KinesisFirehoseSinkElementConverter<InputT>
this.serializationSchema = serializationSchema;
}
+ @Override
+ public void open(Sink.InitContext context) {
+ try {
+ serializationSchema.open(
+ new SerializationSchema.InitializationContext() {
+ @Override
+ public MetricGroup getMetricGroup() {
+ return new UnregisteredMetricsGroup();
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return SimpleUserCodeClassLoader.create(
+ KinesisFirehoseSinkElementConverter.class.getClassLoader());
+ }
+ });
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
+ }
+ }
+
@Override
public Record apply(InputT element, SinkWriter.Context context) {
- checkOpened();
return Record.builder()
.data(SdkBytes.fromByteArray(serializationSchema.serialize(element)))
.build();
}
- private void checkOpened() {
- if (!schemaOpened) {
- try {
- serializationSchema.open(
- new SerializationSchema.InitializationContext() {
- @Override
- public MetricGroup getMetricGroup() {
- return new UnregisteredMetricsGroup();
- }
-
- @Override
- public UserCodeClassLoader getUserCodeClassLoader() {
- return SimpleUserCodeClassLoader.create(
- KinesisFirehoseSinkElementConverter.class.getClassLoader());
- }
- });
- schemaOpened = true;
- } catch (Exception e) {
- throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
- }
- }
- }
-
public static <InputT> Builder<InputT> builder() {
return new Builder<>();
}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
index ccb45822c75..dbec4709d38 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverterTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.connector.firehose.sink;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -25,6 +26,7 @@ import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.firehose.model.Record;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
/** Covers construction and sanity checking of {@link KinesisFirehoseSinkElementConverter}. */
@@ -51,4 +53,30 @@ class KinesisFirehoseSinkElementConverterTest {
byte[] serializedString = (new SimpleStringSchema()).serialize(testString);
assertThat(serializedRecord.data()).isEqualTo(SdkBytes.fromByteArray(serializedString));
}
+
+ @Test
+ void elementConverterOpenInvokesSerializationSchemaOpen() {
+ OpenTestingSerializationSchema serializationSchema = new OpenTestingSerializationSchema();
+
+ KinesisFirehoseSinkElementConverter.<String>builder()
+ .setSerializationSchema(serializationSchema)
+ .build()
+ .open(null);
+
+ assertThat(serializationSchema.openCalled).isTrue();
+ }
+
+ private static class OpenTestingSerializationSchema implements SerializationSchema<String> {
+ private boolean openCalled;
+
+ @Override
+ public void open(SerializationSchema.InitializationContext context) throws Exception {
+ openCalled = true;
+ }
+
+ @Override
+ public byte[] serialize(String element) {
+ return element.getBytes(UTF_8);
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
index a7e441128f2..3a644cba241 100644
--- a/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
@@ -19,6 +19,7 @@ package org.apache.flink.connector.kinesis.sink;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.metrics.MetricGroup;
@@ -40,6 +41,8 @@ import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
public class KinesisStreamsSinkElementConverter<InputT>
implements ElementConverter<InputT, PutRecordsRequestEntry> {
+ private static final long serialVersionUID = 1L;
+
/** A serialization schema to specify how the input element should be serialized. */
private final SerializationSchema<InputT> serializationSchema;
@@ -48,8 +51,6 @@ public class KinesisStreamsSinkElementConverter<InputT>
*/
private final PartitionKeyGenerator<InputT> partitionKeyGenerator;
- private boolean schemaOpened = false;
-
private KinesisStreamsSinkElementConverter(
SerializationSchema<InputT> serializationSchema,
PartitionKeyGenerator<InputT> partitionKeyGenerator) {
@@ -57,38 +58,35 @@ public class KinesisStreamsSinkElementConverter<InputT>
this.partitionKeyGenerator = partitionKeyGenerator;
}
+ @Override
+ public void open(Sink.InitContext context) {
+ try {
+ serializationSchema.open(
+ new SerializationSchema.InitializationContext() {
+ @Override
+ public MetricGroup getMetricGroup() {
+ return new UnregisteredMetricsGroup();
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return SimpleUserCodeClassLoader.create(
+ KinesisStreamsSinkElementConverter.class.getClassLoader());
+ }
+ });
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
+ }
+ }
+
@Override
public PutRecordsRequestEntry apply(InputT element, SinkWriter.Context context) {
- checkOpened();
return PutRecordsRequestEntry.builder()
.data(SdkBytes.fromByteArray(serializationSchema.serialize(element)))
.partitionKey(partitionKeyGenerator.apply(element))
.build();
}
- private void checkOpened() {
- if (!schemaOpened) {
- try {
- serializationSchema.open(
- new SerializationSchema.InitializationContext() {
- @Override
- public MetricGroup getMetricGroup() {
- return new UnregisteredMetricsGroup();
- }
-
- @Override
- public UserCodeClassLoader getUserCodeClassLoader() {
- return SimpleUserCodeClassLoader.create(
- KinesisStreamsSinkElementConverter.class.getClassLoader());
- }
- });
- schemaOpened = true;
- } catch (Exception e) {
- throw new FlinkRuntimeException("Failed to initialize serialization schema.", e);
- }
- }
- }
-
public static <InputT> Builder<InputT> builder() {
return new Builder<>();
}
diff --git a/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverterTest.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverterTest.java
new file mode 100644
index 00000000000..cd6444c8094
--- /dev/null
+++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverterTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class KinesisStreamsSinkElementConverterTest {
+
+ @Test
+ void elementConverterOpenInvokesSerializationSchemaOpen() {
+ OpenTestingSerializationSchema serializationSchema = new OpenTestingSerializationSchema();
+
+ KinesisStreamsSinkElementConverter.<String>builder()
+ .setSerializationSchema(serializationSchema)
+ .setPartitionKeyGenerator(record -> UUID.randomUUID().toString())
+ .build()
+ .open(null);
+
+ assertThat(serializationSchema.openCalled).isTrue();
+ }
+
+ private static class OpenTestingSerializationSchema implements SerializationSchema<String> {
+ private boolean openCalled;
+
+ @Override
+ public void open(SerializationSchema.InitializationContext context) throws Exception {
+ openCalled = true;
+ }
+
+ @Override
+ public byte[] serialize(String element) {
+ return element.getBytes(UTF_8);
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index f6c2cc86e3b..7dea96df7b5 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -303,6 +303,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
},
"A fatal exception occurred in the sink that cannot be recovered from or should not be retried.");
+ elementConverter.open(context);
initializeState(states);
}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
index de9f6e0da03..d70a27e43d9 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.base.sink.writer;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import java.io.Serializable;
@@ -33,4 +34,8 @@ import java.io.Serializable;
@PublicEvolving
public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
RequestEntryT apply(InputT element, SinkWriter.Context context);
+
+ default void open(Sink.InitContext context) {
+ // No-op default implementation
+ }
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index bd25179b4ca..ac4d9447e15 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -73,6 +73,20 @@ public class AsyncSinkWriterTest {
}
}
+ @Test
+ public void testElementConvertOpenIsInvoked() {
+ TestElementConverter elementConverter = new TestElementConverter();
+ assertThat(elementConverter.getOpenCallCount()).isEqualTo(0);
+
+ new AsyncSinkWriterImplBuilder()
+ .elementConverter(elementConverter)
+ .context(sinkInitContext)
+ .build();
+
+ // The open() method should be called in the AsyncSinkWriter constructor
+ assertThat(elementConverter.getOpenCallCount()).isEqualTo(1);
+ }
+
@Test
public void testNumberOfRecordsIsAMultipleOfBatchSizeResultsInThatNumberOfRecordsBeingWritten()
throws IOException, InterruptedException {
@@ -1025,29 +1039,7 @@ public class AsyncSinkWriterTest {
private final int delay;
private AsyncSinkWriterImpl(
- Sink.InitContext context,
- int maxBatchSize,
- int maxInFlightRequests,
- int maxBufferedRequests,
- long maxBatchSizeInBytes,
- long maxTimeInBufferMS,
- long maxRecordSizeInBytes,
- boolean simulateFailures,
- int delay) {
- this(
- context,
- maxBatchSize,
- maxInFlightRequests,
- maxBufferedRequests,
- maxBatchSizeInBytes,
- maxTimeInBufferMS,
- maxRecordSizeInBytes,
- simulateFailures,
- delay,
- Collections.emptyList());
- }
-
- private AsyncSinkWriterImpl(
+ ElementConverter<String, Integer> elementConverter,
Sink.InitContext context,
int maxBatchSize,
int maxInFlightRequests,
@@ -1060,7 +1052,7 @@ public class AsyncSinkWriterTest {
List<BufferedRequestState<Integer>> bufferedState) {
super(
- (elem, ctx) -> Integer.parseInt(elem),
+ elementConverter,
context,
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(maxBatchSize)
@@ -1183,6 +1175,8 @@ public class AsyncSinkWriterTest {
/** A builder for {@link AsyncSinkWriterImpl}. */
private class AsyncSinkWriterImplBuilder {
+ private ElementConverter<String, Integer> elementConverter =
+ (elem, ctx) -> Integer.parseInt(elem);
private boolean simulateFailures = false;
private int delay = 0;
private Sink.InitContext context;
@@ -1193,6 +1187,12 @@ public class AsyncSinkWriterTest {
private long maxTimeInBufferMS = 1_000;
private long maxRecordSizeInBytes = maxBatchSizeInBytes;
+ private AsyncSinkWriterImplBuilder elementConverter(
+ ElementConverter<String, Integer> elementConverter) {
+ this.elementConverter = elementConverter;
+ return this;
+ }
+
private AsyncSinkWriterImplBuilder context(Sink.InitContext context) {
this.context = context;
return this;
@@ -1240,6 +1240,7 @@ public class AsyncSinkWriterTest {
private AsyncSinkWriterImpl build() {
return new AsyncSinkWriterImpl(
+ elementConverter,
context,
maxBatchSize,
maxInFlightRequests,
@@ -1248,12 +1249,14 @@ public class AsyncSinkWriterTest {
maxTimeInBufferMS,
maxRecordSizeInBytes,
simulateFailures,
- delay);
+ delay,
+ Collections.emptyList());
}
private AsyncSinkWriterImpl buildWithState(
List<BufferedRequestState<Integer>> bufferedState) {
return new AsyncSinkWriterImpl(
+ elementConverter,
context,
maxBatchSize,
maxInFlightRequests,
@@ -1283,7 +1286,18 @@ public class AsyncSinkWriterTest {
CountDownLatch blockedThreadLatch,
CountDownLatch delayedStartLatch,
boolean blockForLimitedTime) {
- super(context, 3, maxInFlightRequests, 20, 100, 100, 100, false, 0);
+ super(
+ (elem, ctx) -> Integer.parseInt(elem),
+ context,
+ 3,
+ maxInFlightRequests,
+ 20,
+ 100,
+ 100,
+ 100,
+ false,
+ 0,
+ Collections.emptyList());
this.blockedThreadLatch = blockedThreadLatch;
this.delayedStartLatch = delayedStartLatch;
this.blockForLimitedTime = blockForLimitedTime;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestElementConverter.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestElementConverter.java
new file mode 100644
index 00000000000..fe9d2afaeab
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestElementConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+
+/** A test implementation of {@link ElementConverter} used to verify the open() method. */
+public class TestElementConverter implements ElementConverter<String, Integer> {
+
+ private static final long serialVersionUID = 1L;
+ private int openCallCount;
+
+ @Override
+ public void open(Sink.InitContext context) {
+ openCallCount++;
+ }
+
+ @Override
+ public Integer apply(String element, SinkWriter.Context context) {
+ return openCallCount;
+ }
+
+ public int getOpenCallCount() {
+ return openCallCount;
+ }
+}