You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/15 10:22:20 UTC
[flink] branch master updated: [FLINK-25574][connectors/async] Migrate AsyncSink connectors to decomposed SinkV2
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 423143c [FLINK-25574][connectors/async] Migrate AsyncSink connectors to decomposed SinkV2
423143c is described below
commit 423143c1a9dcfba2c8ddc08f4c785451b82802be
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Wed Feb 2 19:59:02 2022 +0100
[FLINK-25574][connectors/async] Migrate AsyncSink connectors to decomposed SinkV2
---
.../kinesis/sink/KinesisDataStreamsSink.java | 36 ++++++++++++++----
.../KinesisDataStreamsSinkElementConverter.java | 2 +-
.../kinesis/sink/KinesisDataStreamsSinkWriter.java | 5 ++-
.../kinesis/table/KinesisDynamicSink.java | 5 ++-
.../table/KinesisDynamicTableSinkFactoryTest.java | 12 +++---
.../firehose/sink/KinesisFirehoseSink.java | 36 +++++++++++++-----
.../sink/KinesisFirehoseSinkElementConverter.java | 2 +-
.../firehose/sink/KinesisFirehoseSinkWriter.java | 5 ++-
.../firehose/table/KinesisFirehoseDynamicSink.java | 4 +-
.../sink/KinesisFirehoseSinkWriterTest.java | 7 +---
.../KinesisFirehoseDynamicTableFactoryTest.java | 8 ++--
.../flink/connector/base/sink/AsyncSinkBase.java | 28 +-------------
.../base/sink/writer/AsyncSinkWriter.java | 27 +++++++-------
.../base/sink/writer/ElementConverter.java | 2 +-
.../connector/base/sink/ArrayListAsyncSink.java | 43 +++++++++++++++++-----
.../base/sink/writer/AsyncSinkWriterTest.java | 33 ++++++++---------
.../base/sink/writer/TestSinkInitContext.java | 17 +++++++--
17 files changed, 158 insertions(+), 114 deletions(-)
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
index e4b9779..6252bfe 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
@@ -19,7 +19,6 @@ package org.apache.flink.connector.kinesis.sink;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -28,8 +27,9 @@ import org.apache.flink.util.Preconditions;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
-import java.util.List;
-import java.util.Optional;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Properties;
/**
@@ -113,8 +113,8 @@ public class KinesisDataStreamsSink<InputT> extends AsyncSinkBase<InputT, PutRec
@Internal
@Override
- public SinkWriter<InputT, Void, BufferedRequestState<PutRecordsRequestEntry>> createWriter(
- InitContext context, List<BufferedRequestState<PutRecordsRequestEntry>> states) {
+ public StatefulSinkWriter<InputT, BufferedRequestState<PutRecordsRequestEntry>> createWriter(
+ InitContext context) throws IOException {
return new KinesisDataStreamsSinkWriter<>(
getElementConverter(),
context,
@@ -127,13 +127,33 @@ public class KinesisDataStreamsSink<InputT> extends AsyncSinkBase<InputT, PutRec
failOnError,
streamName,
kinesisClientProperties,
- states);
+ Collections.emptyList());
}
@Internal
@Override
- public Optional<SimpleVersionedSerializer<BufferedRequestState<PutRecordsRequestEntry>>>
+ public SimpleVersionedSerializer<BufferedRequestState<PutRecordsRequestEntry>>
getWriterStateSerializer() {
- return Optional.of(new KinesisDataStreamsStateSerializer());
+ return new KinesisDataStreamsStateSerializer();
+ }
+
+ @Override
+ public StatefulSinkWriter<InputT, BufferedRequestState<PutRecordsRequestEntry>> restoreWriter(
+ InitContext context,
+ Collection<BufferedRequestState<PutRecordsRequestEntry>> recoveredState)
+ throws IOException {
+ return new KinesisDataStreamsSinkWriter<>(
+ getElementConverter(),
+ context,
+ getMaxBatchSize(),
+ getMaxInFlightRequests(),
+ getMaxBufferedRequests(),
+ getMaxBatchSizeInBytes(),
+ getMaxTimeInBufferMS(),
+ getMaxRecordSizeInBytes(),
+ failOnError,
+ streamName,
+ kinesisClientProperties,
+ recoveredState);
}
}
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
index 2080f2e..2873603 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
@@ -19,7 +19,7 @@ package org.apache.flink.connector.kinesis.sink;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.util.Preconditions;
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
index 50566dc..98ee077 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
@@ -17,7 +17,7 @@
package org.apache.flink.connector.kinesis.sink;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
@@ -37,6 +37,7 @@ import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -111,7 +112,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe
boolean failOnError,
String streamName,
Properties kinesisClientProperties,
- List<BufferedRequestState<PutRecordsRequestEntry>> states) {
+ Collection<BufferedRequestState<PutRecordsRequestEntry>> states) {
super(
elementConverter,
context,
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
index c55ebe7..361db54 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
@@ -28,7 +28,7 @@ import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
@@ -114,7 +114,8 @@ public class KinesisDynamicSink extends AsyncDynamicTableSink<PutRecordsRequestE
Optional.ofNullable(failOnError).ifPresent(builder::setFailOnError);
addAsyncOptionsToSinkBuilder(builder);
- return SinkProvider.of(builder.build());
+ KinesisDataStreamsSink<RowData> kdsSink = builder.build();
+ return SinkV2Provider.of(kdsSink);
}
@Override
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
index 38ad281..dfe829e 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
@@ -18,14 +18,14 @@
package org.apache.flink.connector.kinesis.table;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.TableOptionsBuilder;
import org.apache.flink.table.factories.TestFormatFactory;
@@ -84,7 +84,7 @@ public class KinesisDynamicTableSinkFactoryTest extends TestLogger {
// verify the produced sink
DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
- Sink<RowData, ?, ?, ?> sinkFunction = ((SinkProvider) sinkFunctionProvider).createSink();
+ Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class);
}
@@ -141,7 +141,7 @@ public class KinesisDynamicTableSinkFactoryTest extends TestLogger {
// verify the produced sink
DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
- Sink<RowData, ?, ?, ?> sinkFunction = ((SinkProvider) sinkFunctionProvider).createSink();
+ Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class);
}
@@ -170,7 +170,7 @@ public class KinesisDynamicTableSinkFactoryTest extends TestLogger {
// verify the produced sink
DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
- Sink<RowData, ?, ?, ?> sinkFunction = ((SinkProvider) sinkFunctionProvider).createSink();
+ Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class);
}
@@ -204,7 +204,7 @@ public class KinesisDynamicTableSinkFactoryTest extends TestLogger {
// verify the produced sink
DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
- Sink<RowData, ?, ?, ?> sinkFunction = ((SinkProvider) sinkFunctionProvider).createSink();
+ Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class);
}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
index 76fa0f7..6f9ed54 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
@@ -18,7 +18,6 @@
package org.apache.flink.connector.firehose.sink;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -27,8 +26,9 @@ import org.apache.flink.util.Preconditions;
import software.amazon.awssdk.services.firehose.model.Record;
-import java.util.List;
-import java.util.Optional;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Properties;
/**
@@ -92,8 +92,8 @@ public class KinesisFirehoseSink<InputT> extends AsyncSinkBase<InputT, Record> {
}
@Override
- public SinkWriter<InputT, Void, BufferedRequestState<Record>> createWriter(
- InitContext context, List<BufferedRequestState<Record>> states) {
+ public StatefulSinkWriter<InputT, BufferedRequestState<Record>> createWriter(
+ InitContext context) throws IOException {
return new KinesisFirehoseSinkWriter<>(
getElementConverter(),
context,
@@ -106,12 +106,30 @@ public class KinesisFirehoseSink<InputT> extends AsyncSinkBase<InputT, Record> {
failOnError,
deliveryStreamName,
firehoseClientProperties,
- states);
+ Collections.emptyList());
}
@Override
- public Optional<SimpleVersionedSerializer<BufferedRequestState<Record>>>
- getWriterStateSerializer() {
- return Optional.of(new KinesisFirehoseStateSerializer());
+ public StatefulSinkWriter<InputT, BufferedRequestState<Record>> restoreWriter(
+ InitContext context, Collection<BufferedRequestState<Record>> recoveredState)
+ throws IOException {
+ return new KinesisFirehoseSinkWriter<>(
+ getElementConverter(),
+ context,
+ getMaxBatchSize(),
+ getMaxInFlightRequests(),
+ getMaxBufferedRequests(),
+ getMaxBatchSizeInBytes(),
+ getMaxTimeInBufferMS(),
+ getMaxRecordSizeInBytes(),
+ failOnError,
+ deliveryStreamName,
+ firehoseClientProperties,
+ recoveredState);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<BufferedRequestState<Record>> getWriterStateSerializer() {
+ return new KinesisFirehoseStateSerializer();
}
}
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
index cca749c..dc72584 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
@@ -19,7 +19,7 @@ package org.apache.flink.connector.firehose.sink;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.util.Preconditions;
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
index 47ce6e3..89d37d3 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -18,7 +18,7 @@
package org.apache.flink.connector.firehose.sink;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
@@ -38,6 +38,7 @@ import software.amazon.awssdk.services.firehose.model.Record;
import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -131,7 +132,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
boolean failOnError,
String deliveryStreamName,
Properties firehoseClientProperties,
- List<BufferedRequestState<Record>> initialStates) {
+ Collection<BufferedRequestState<Record>> initialStates) {
super(
elementConverter,
context,
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
index 2fe7d0e..5b6358b 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
@@ -27,7 +27,7 @@ import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkBuilder;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
@@ -104,7 +104,7 @@ public class KinesisFirehoseDynamicSink extends AsyncDynamicTableSink<Record> {
Optional.ofNullable(failOnError).ifPresent(builder::setFailOnError);
super.addAsyncOptionsToSinkBuilder(builder);
- return SinkProvider.of(builder.build());
+ return SinkV2Provider.of(builder.build());
}
@Override
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
index 8773127..d8dc5d4 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
@@ -18,9 +18,8 @@
package org.apache.flink.connector.firehose.sink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
@@ -32,7 +31,6 @@ import software.amazon.awssdk.services.firehose.model.Record;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.Properties;
import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
@@ -95,8 +93,7 @@ public class KinesisFirehoseSinkWriterTest {
true,
"test-stream",
prop);
- SinkWriter<String, Void, BufferedRequestState<Record>> writer =
- kinesisFirehoseSink.createWriter(ctx, new ArrayList<>());
+ SinkWriter<String> writer = kinesisFirehoseSink.createWriter(ctx);
for (int i = 0; i < 12; i++) {
writer.write("data_bytes", null);
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
index 7f039c9..8c45883 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
@@ -18,13 +18,13 @@
package org.apache.flink.connector.firehose.table;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.TableOptionsBuilder;
import org.apache.flink.table.factories.TestFormatFactory;
@@ -69,7 +69,7 @@ public class KinesisFirehoseDynamicTableFactoryTest extends TestLogger {
// verify the produced sink
DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
- Sink<RowData, ?, ?, ?> sinkFunction = ((SinkProvider) sinkFunctionProvider).createSink();
+ Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisFirehoseSink.class);
}
@@ -96,7 +96,7 @@ public class KinesisFirehoseDynamicTableFactoryTest extends TestLogger {
// verify the produced sink
DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
- Sink<RowData, ?, ?, ?> sinkFunction = ((SinkProvider) sinkFunctionProvider).createSink();
+ Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
Assertions.assertThat(sinkFunction).isInstanceOf(KinesisFirehoseSink.class);
}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
index 872e951..0d93460 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
@@ -18,16 +18,12 @@
package org.apache.flink.connector.base.sink;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
-import java.util.Optional;
/**
* A generic sink for destinations that provide an async client to persist data.
@@ -49,7 +45,7 @@ import java.util.Optional;
*/
@PublicEvolving
public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
- implements Sink<InputT, Void, BufferedRequestState<RequestEntryT>, Void> {
+ implements StatefulSink<InputT, BufferedRequestState<RequestEntryT>> {
private final ElementConverter<InputT, RequestEntryT> elementConverter;
private final int maxBatchSize;
@@ -79,26 +75,6 @@ public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
this.maxRecordSizeInBytes = maxRecordSizeInBytes;
}
- @Override
- public Optional<Committer<Void>> createCommitter() {
- return Optional.empty();
- }
-
- @Override
- public Optional<GlobalCommitter<Void, Void>> createGlobalCommitter() {
- return Optional.empty();
- }
-
- @Override
- public Optional<SimpleVersionedSerializer<Void>> getCommittableSerializer() {
- return Optional.empty();
- }
-
- @Override
- public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
- return Optional.empty();
- }
-
protected ElementConverter<InputT, RequestEntryT> getElementConverter() {
return elementConverter;
}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 0457b08..14ac74f 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -19,8 +19,9 @@ package org.apache.flink.connector.base.sink.writer;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.Preconditions;
@@ -29,6 +30,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
@@ -49,10 +51,10 @@ import java.util.function.Consumer;
*/
@PublicEvolving
public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
- implements SinkWriter<InputT, Void, BufferedRequestState<RequestEntryT>> {
+ implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
private final MailboxExecutor mailboxExecutor;
- private final Sink.ProcessingTimeService timeService;
+ private final ProcessingTimeService timeService;
/* The timestamp of the previous batch of records was sent from this sink. */
private long lastSendTimestamp = 0;
@@ -227,7 +229,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
- List<BufferedRequestState<RequestEntryT>> states) {
+ Collection<BufferedRequestState<RequestEntryT>> states) {
this.elementConverter = elementConverter;
this.mailboxExecutor = context.getMailboxExecutor();
this.timeService = context.getProcessingTimeService();
@@ -273,15 +275,14 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
}
private void registerCallback() {
- Sink.ProcessingTimeService.ProcessingTimeCallback ptc =
+ ProcessingTimeService.ProcessingTimeCallback ptc =
instant -> {
existsActiveTimerCallback = false;
while (!bufferedRequestEntries.isEmpty()) {
flush();
}
};
- timeService.registerProcessingTimer(
- timeService.getCurrentProcessingTime() + maxTimeInBufferMS, ptc);
+ timeService.registerTimer(timeService.getCurrentProcessingTime() + maxTimeInBufferMS, ptc);
existsActiveTimerCallback = true;
}
@@ -408,15 +409,13 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
* <p>To this end, all in-flight requests need to completed before proceeding with the commit.
*/
@Override
- public List<Void> prepareCommit(boolean flush) {
+ public void flush(boolean flush) {
while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0 && flush)) {
mailboxExecutor.tryYield();
if (flush) {
flush();
}
}
-
- return Collections.emptyList();
}
/**
@@ -426,11 +425,11 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
* a failure/restart of the application.
*/
@Override
- public List<BufferedRequestState<RequestEntryT>> snapshotState() {
+ public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
return Collections.singletonList(new BufferedRequestState<>((bufferedRequestEntries)));
}
- protected void initialize(List<BufferedRequestState<RequestEntryT>> states) {
+ protected void initialize(Collection<BufferedRequestState<RequestEntryT>> states) {
if (states.isEmpty()) {
return;
}
@@ -440,7 +439,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
"Writer failed to initialize due to multiple initial states.");
}
- BufferedRequestState<RequestEntryT> state = states.get(0);
+ BufferedRequestState<RequestEntryT> state = states.iterator().next();
this.bufferedRequestEntries.addAll(state.getBufferedRequestEntries());
for (RequestEntryWrapper<RequestEntryT> wrapper : bufferedRequestEntries) {
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
index 94334e0..de9f6e0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
@@ -18,7 +18,7 @@
package org.apache.flink.connector.base.sink.writer;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
import java.io.Serializable;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
index a768083..0e68b1a 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
@@ -17,14 +17,17 @@
package org.apache.flink.connector.base.sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
-import java.util.Optional;
import java.util.function.Consumer;
/** Dummy destination that records write events. */
@@ -52,11 +55,8 @@ public class ArrayListAsyncSink extends AsyncSinkBase<String, Integer> {
}
@Override
- public SinkWriter<String, Void, BufferedRequestState<Integer>> createWriter(
- InitContext context, List<BufferedRequestState<Integer>> states) {
- /* SinkWriter implementing {@code submitRequestEntries} that is used to define the persistence
- * logic into {@code ArrayListDestination}.
- */
+ public StatefulSinkWriter<String, BufferedRequestState<Integer>> createWriter(
+ InitContext context) throws IOException {
return new AsyncSinkWriter<String, Integer>(
getElementConverter(),
context,
@@ -86,8 +86,31 @@ public class ArrayListAsyncSink extends AsyncSinkBase<String, Integer> {
}
@Override
- public Optional<SimpleVersionedSerializer<BufferedRequestState<Integer>>>
- getWriterStateSerializer() {
- return Optional.empty();
+ public StatefulSinkWriter<String, BufferedRequestState<Integer>> restoreWriter(
+ InitContext context, Collection<BufferedRequestState<Integer>> recoveredState)
+ throws IOException {
+ return createWriter(context);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<BufferedRequestState<Integer>> getWriterStateSerializer() {
+ return new AsyncSinkWriterStateSerializer<Integer>() {
+ @Override
+ protected void serializeRequestToStream(Integer request, DataOutputStream out)
+ throws IOException {
+ out.writeInt(request);
+ }
+
+ @Override
+ protected Integer deserializeRequestFromStream(long requestSize, DataInputStream in)
+ throws IOException {
+ return in.readInt();
+ }
+
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+ };
}
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 671f6c6..c7eb95f 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -17,7 +17,7 @@
package org.apache.flink.connector.base.sink.writer;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.junit.Before;
@@ -161,7 +161,7 @@ public class AsyncSinkWriterTest {
for (int i = 0; i < 23; i++) {
sink.write(String.valueOf(i));
}
- sink.prepareCommit(true);
+ sink.flush(true);
assertEquals(23, res.size());
}
@@ -177,7 +177,7 @@ public class AsyncSinkWriterTest {
.simulateFailures(false)
.build();
sink.write(String.valueOf(0));
- sink.prepareCommit(true);
+ sink.flush(true);
assertEquals(1, res.size());
}
@@ -200,7 +200,6 @@ public class AsyncSinkWriterTest {
sink.write("75");
assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), getWriterState(sink));
-
assertEquals(3, res.size());
}
@@ -221,7 +220,7 @@ public class AsyncSinkWriterTest {
sink.write("95");
sink.write("955");
assertThatBufferStatesAreEqual(sink.wrapRequests(95, 955), getWriterState(sink));
- sink.prepareCommit(true);
+ sink.flush(true);
assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), getWriterState(sink));
}
@@ -321,7 +320,7 @@ public class AsyncSinkWriterTest {
sink, "535", Arrays.asList(25, 55, 965, 75, 95, 45), Arrays.asList(35, 535));
// Checkpoint occurs
- sink.prepareCommit(true);
+ sink.flush(true);
// Everything is saved
assertEquals(Arrays.asList(25, 55, 965, 75, 95, 45, 955, 550, 35, 535), res);
@@ -356,7 +355,7 @@ public class AsyncSinkWriterTest {
sink.write("505");
assertTrue(res.contains(550));
assertTrue(res.contains(645));
- sink.prepareCommit(true);
+ sink.flush(true);
assertTrue(res.contains(545));
assertTrue(res.contains(535));
assertTrue(res.contains(515));
@@ -468,7 +467,7 @@ public class AsyncSinkWriterTest {
sink.write(String.valueOf(0));
sink.write(String.valueOf(1));
sink.write(String.valueOf(2));
- sink.prepareCommit(false);
+ sink.flush(false);
assertEquals(0, res.size());
}
@@ -626,9 +625,9 @@ public class AsyncSinkWriterTest {
sink.write(String.valueOf(2)); // flushing -- request should have [225,0,1], [225] fails,
// buffer has [2]
assertEquals(2, res.size());
- sink.prepareCommit(false); // inflight should be added to buffer still [225, 2]
+ sink.flush(false); // inflight should be added to buffer still [225, 2]
assertEquals(2, res.size());
- sink.prepareCommit(true); // buffer now flushed []
+ sink.flush(true); // buffer now flushed []
assertEquals(Arrays.asList(0, 1, 225, 2), res);
}
@@ -654,9 +653,9 @@ public class AsyncSinkWriterTest {
assertEquals(2, res.size()); // Request was [225, 1, 2], element 225 failed
// buffer should be [3] with [225] inflight
- sink.prepareCommit(false); // Buffer: [225,3] - > 8/110; 2/10 elements; 0 inflight
+ sink.flush(false); // Buffer: [225,3] - > 8/110; 2/10 elements; 0 inflight
assertEquals(2, res.size()); //
- List<BufferedRequestState<Integer>> states = sink.snapshotState();
+ List<BufferedRequestState<Integer>> states = sink.snapshotState(1);
AsyncSinkWriterImpl newSink =
new AsyncSinkWriterImplBuilder()
.context(sinkInitContext)
@@ -691,8 +690,8 @@ public class AsyncSinkWriterTest {
.build();
sink.write(String.valueOf(225)); // Buffer: 100/110B; 1/10 elements; 0 inflight
- sink.prepareCommit(false);
- List<BufferedRequestState<Integer>> states = sink.snapshotState();
+ sink.flush(false);
+ List<BufferedRequestState<Integer>> states = sink.snapshotState(1);
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(
() ->
@@ -729,7 +728,7 @@ public class AsyncSinkWriterTest {
sink.write("1"); // A timer is registered here to elapse at t=100
assertEquals(0, res.size());
tpts.setCurrentTime(10L);
- sink.prepareCommit(true);
+ sink.flush(true);
assertEquals(1, res.size());
tpts.setCurrentTime(20L); // At t=20, we write a new element that should not trigger another
sink.write("2"); // timer to be registered. If it is, it should elapse at t=120s.
@@ -791,7 +790,7 @@ public class AsyncSinkWriterTest {
tpts.setCurrentTime(0L);
sink.write("1");
tpts.setCurrentTime(50L);
- sink.prepareCommit(true);
+ sink.flush(true);
assertEquals(1, res.size());
tpts.setCurrentTime(200L);
}
@@ -928,7 +927,7 @@ public class AsyncSinkWriterTest {
private BufferedRequestState<Integer> getWriterState(
AsyncSinkWriter<String, Integer> sinkWriter) {
- List<BufferedRequestState<Integer>> states = sinkWriter.snapshotState();
+ List<BufferedRequestState<Integer>> states = sinkWriter.snapshotState(1);
assertEquals(states.size(), 1);
return states.get(0);
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index fba2711..076da61 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -18,7 +18,9 @@
package org.apache.flink.connector.base.sink.writer;
import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
@@ -37,6 +39,7 @@ import org.apache.flink.util.function.ThrowingRunnable;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
/** A mock implementation of a {@code Sink.InitContext} to be used in sink unit tests. */
public class TestSinkInitContext implements Sink.InitContext {
@@ -85,18 +88,19 @@ public class TestSinkInitContext implements Sink.InitContext {
}
@Override
- public Sink.ProcessingTimeService getProcessingTimeService() {
- return new Sink.ProcessingTimeService() {
+ public ProcessingTimeService getProcessingTimeService() {
+ return new ProcessingTimeService() {
@Override
public long getCurrentProcessingTime() {
return processingTimeService.getCurrentProcessingTime();
}
@Override
- public void registerProcessingTimer(
+ public ScheduledFuture<?> registerTimer(
long time, ProcessingTimeCallback processingTimerCallback) {
processingTimeService.registerTimer(
time, processingTimerCallback::onProcessingTime);
+ return null;
}
};
}
@@ -121,6 +125,11 @@ public class TestSinkInitContext implements Sink.InitContext {
return OptionalLong.empty();
}
+ @Override
+ public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
+ return null;
+ }
+
public TestProcessingTimeService getTestProcessingTimeService() {
return processingTimeService;
}