You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/02/15 10:22:20 UTC

[flink] branch master updated: [FLINK-25574][connectors/async] Migrate AsyncSink connectors to decomposed SinkV2

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 423143c  [FLINK-25574][connectors/async] Migrate AsyncSink connectors to decomposed SinkV2
423143c is described below

commit 423143c1a9dcfba2c8ddc08f4c785451b82802be
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Wed Feb 2 19:59:02 2022 +0100

    [FLINK-25574][connectors/async] Migrate AsyncSink connectors to decomposed SinkV2
---
 .../kinesis/sink/KinesisDataStreamsSink.java       | 36 ++++++++++++++----
 .../KinesisDataStreamsSinkElementConverter.java    |  2 +-
 .../kinesis/sink/KinesisDataStreamsSinkWriter.java |  5 ++-
 .../kinesis/table/KinesisDynamicSink.java          |  5 ++-
 .../table/KinesisDynamicTableSinkFactoryTest.java  | 12 +++---
 .../firehose/sink/KinesisFirehoseSink.java         | 36 +++++++++++++-----
 .../sink/KinesisFirehoseSinkElementConverter.java  |  2 +-
 .../firehose/sink/KinesisFirehoseSinkWriter.java   |  5 ++-
 .../firehose/table/KinesisFirehoseDynamicSink.java |  4 +-
 .../sink/KinesisFirehoseSinkWriterTest.java        |  7 +---
 .../KinesisFirehoseDynamicTableFactoryTest.java    |  8 ++--
 .../flink/connector/base/sink/AsyncSinkBase.java   | 28 +-------------
 .../base/sink/writer/AsyncSinkWriter.java          | 27 +++++++-------
 .../base/sink/writer/ElementConverter.java         |  2 +-
 .../connector/base/sink/ArrayListAsyncSink.java    | 43 +++++++++++++++++-----
 .../base/sink/writer/AsyncSinkWriterTest.java      | 33 ++++++++---------
 .../base/sink/writer/TestSinkInitContext.java      | 17 +++++++--
 17 files changed, 158 insertions(+), 114 deletions(-)

diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
index e4b9779..6252bfe 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java
@@ -19,7 +19,6 @@ package org.apache.flink.connector.kinesis.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.connector.base.sink.AsyncSinkBase;
 import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -28,8 +27,9 @@ import org.apache.flink.util.Preconditions;
 
 import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
 
-import java.util.List;
-import java.util.Optional;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Properties;
 
 /**
@@ -113,8 +113,8 @@ public class KinesisDataStreamsSink<InputT> extends AsyncSinkBase<InputT, PutRec
 
     @Internal
     @Override
-    public SinkWriter<InputT, Void, BufferedRequestState<PutRecordsRequestEntry>> createWriter(
-            InitContext context, List<BufferedRequestState<PutRecordsRequestEntry>> states) {
+    public StatefulSinkWriter<InputT, BufferedRequestState<PutRecordsRequestEntry>> createWriter(
+            InitContext context) throws IOException {
         return new KinesisDataStreamsSinkWriter<>(
                 getElementConverter(),
                 context,
@@ -127,13 +127,33 @@ public class KinesisDataStreamsSink<InputT> extends AsyncSinkBase<InputT, PutRec
                 failOnError,
                 streamName,
                 kinesisClientProperties,
-                states);
+                Collections.emptyList());
     }
 
     @Internal
     @Override
-    public Optional<SimpleVersionedSerializer<BufferedRequestState<PutRecordsRequestEntry>>>
+    public SimpleVersionedSerializer<BufferedRequestState<PutRecordsRequestEntry>>
             getWriterStateSerializer() {
-        return Optional.of(new KinesisDataStreamsStateSerializer());
+        return new KinesisDataStreamsStateSerializer();
+    }
+
+    @Override
+    public StatefulSinkWriter<InputT, BufferedRequestState<PutRecordsRequestEntry>> restoreWriter(
+            InitContext context,
+            Collection<BufferedRequestState<PutRecordsRequestEntry>> recoveredState)
+            throws IOException {
+        return new KinesisDataStreamsSinkWriter<>(
+                getElementConverter(),
+                context,
+                getMaxBatchSize(),
+                getMaxInFlightRequests(),
+                getMaxBufferedRequests(),
+                getMaxBatchSizeInBytes(),
+                getMaxTimeInBufferMS(),
+                getMaxRecordSizeInBytes(),
+                failOnError,
+                streamName,
+                kinesisClientProperties,
+                recoveredState);
     }
 }
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
index 2080f2e..2873603 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
@@ -19,7 +19,7 @@ package org.apache.flink.connector.kinesis.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.util.Preconditions;
 
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
index 50566dc..98ee077 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.connector.kinesis.sink;
 
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
 import org.apache.flink.connector.aws.util.AWSGeneralUtil;
 import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
@@ -37,6 +37,7 @@ import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
 import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -111,7 +112,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRe
             boolean failOnError,
             String streamName,
             Properties kinesisClientProperties,
-            List<BufferedRequestState<PutRecordsRequestEntry>> states) {
+            Collection<BufferedRequestState<PutRecordsRequestEntry>> states) {
         super(
                 elementConverter,
                 context,
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
index c55ebe7..361db54 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java
@@ -28,7 +28,7 @@ import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
@@ -114,7 +114,8 @@ public class KinesisDynamicSink extends AsyncDynamicTableSink<PutRecordsRequestE
 
         Optional.ofNullable(failOnError).ifPresent(builder::setFailOnError);
         addAsyncOptionsToSinkBuilder(builder);
-        return SinkProvider.of(builder.build());
+        KinesisDataStreamsSink<RowData> kdsSink = builder.build();
+        return SinkV2Provider.of(kdsSink);
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
index 38ad281..dfe829e 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java
@@ -18,14 +18,14 @@
 
 package org.apache.flink.connector.kinesis.table;
 
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.TableOptionsBuilder;
 import org.apache.flink.table.factories.TestFormatFactory;
@@ -84,7 +84,7 @@ public class KinesisDynamicTableSinkFactoryTest extends TestLogger {
         // verify the produced sink
         DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
                 actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
-        Sink<RowData, ?, ?, ?> sinkFunction = ((SinkProvider) sinkFunctionProvider).createSink();
+        Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
         Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class);
     }
 
@@ -141,7 +141,7 @@ public class KinesisDynamicTableSinkFactoryTest extends TestLogger {
         // verify the produced sink
         DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
                 actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
-        Sink<RowData, ?, ?, ?> sinkFunction = ((SinkProvider) sinkFunctionProvider).createSink();
+        Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
         Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class);
     }
 
@@ -170,7 +170,7 @@ public class KinesisDynamicTableSinkFactoryTest extends TestLogger {
         // verify the produced sink
         DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
                 actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
-        Sink<RowData, ?, ?, ?> sinkFunction = ((SinkProvider) sinkFunctionProvider).createSink();
+        Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
         Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class);
     }
 
@@ -204,7 +204,7 @@ public class KinesisDynamicTableSinkFactoryTest extends TestLogger {
         // verify the produced sink
         DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
                 actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
-        Sink<RowData, ?, ?, ?> sinkFunction = ((SinkProvider) sinkFunctionProvider).createSink();
+        Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
         Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class);
     }
 
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
index 76fa0f7..6f9ed54 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSink.java
@@ -18,7 +18,6 @@
 package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.connector.base.sink.AsyncSinkBase;
 import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -27,8 +26,9 @@ import org.apache.flink.util.Preconditions;
 
 import software.amazon.awssdk.services.firehose.model.Record;
 
-import java.util.List;
-import java.util.Optional;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Properties;
 
 /**
@@ -92,8 +92,8 @@ public class KinesisFirehoseSink<InputT> extends AsyncSinkBase<InputT, Record> {
     }
 
     @Override
-    public SinkWriter<InputT, Void, BufferedRequestState<Record>> createWriter(
-            InitContext context, List<BufferedRequestState<Record>> states) {
+    public StatefulSinkWriter<InputT, BufferedRequestState<Record>> createWriter(
+            InitContext context) throws IOException {
         return new KinesisFirehoseSinkWriter<>(
                 getElementConverter(),
                 context,
@@ -106,12 +106,30 @@ public class KinesisFirehoseSink<InputT> extends AsyncSinkBase<InputT, Record> {
                 failOnError,
                 deliveryStreamName,
                 firehoseClientProperties,
-                states);
+                Collections.emptyList());
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<BufferedRequestState<Record>>>
-            getWriterStateSerializer() {
-        return Optional.of(new KinesisFirehoseStateSerializer());
+    public StatefulSinkWriter<InputT, BufferedRequestState<Record>> restoreWriter(
+            InitContext context, Collection<BufferedRequestState<Record>> recoveredState)
+            throws IOException {
+        return new KinesisFirehoseSinkWriter<>(
+                getElementConverter(),
+                context,
+                getMaxBatchSize(),
+                getMaxInFlightRequests(),
+                getMaxBufferedRequests(),
+                getMaxBatchSizeInBytes(),
+                getMaxTimeInBufferMS(),
+                getMaxRecordSizeInBytes(),
+                failOnError,
+                deliveryStreamName,
+                firehoseClientProperties,
+                recoveredState);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<BufferedRequestState<Record>> getWriterStateSerializer() {
+        return new KinesisFirehoseStateSerializer();
     }
 }
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
index cca749c..dc72584 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkElementConverter.java
@@ -19,7 +19,7 @@ package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.util.Preconditions;
 
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
index 47ce6e3..89d37d3 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -18,7 +18,7 @@
 package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
 import org.apache.flink.connector.aws.util.AWSGeneralUtil;
 import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
@@ -38,6 +38,7 @@ import software.amazon.awssdk.services.firehose.model.Record;
 import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -131,7 +132,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record>
             boolean failOnError,
             String deliveryStreamName,
             Properties firehoseClientProperties,
-            List<BufferedRequestState<Record>> initialStates) {
+            Collection<BufferedRequestState<Record>> initialStates) {
         super(
                 elementConverter,
                 context,
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
index 2fe7d0e..5b6358b 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java
@@ -27,7 +27,7 @@ import org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkBuilder;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
@@ -104,7 +104,7 @@ public class KinesisFirehoseDynamicSink extends AsyncDynamicTableSink<Record> {
         Optional.ofNullable(failOnError).ifPresent(builder::setFailOnError);
         super.addAsyncOptionsToSinkBuilder(builder);
 
-        return SinkProvider.of(builder.build());
+        return SinkV2Provider.of(builder.build());
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
index 8773127..d8dc5d4 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
@@ -18,9 +18,8 @@
 package org.apache.flink.connector.firehose.sink;
 
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.aws.config.AWSConfigConstants;
-import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
 
@@ -32,7 +31,6 @@ import software.amazon.awssdk.services.firehose.model.Record;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.Properties;
 
 import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_ENDPOINT;
@@ -95,8 +93,7 @@ public class KinesisFirehoseSinkWriterTest {
                         true,
                         "test-stream",
                         prop);
-        SinkWriter<String, Void, BufferedRequestState<Record>> writer =
-                kinesisFirehoseSink.createWriter(ctx, new ArrayList<>());
+        SinkWriter<String> writer = kinesisFirehoseSink.createWriter(ctx);
 
         for (int i = 0; i < 12; i++) {
             writer.write("data_bytes", null);
diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
index 7f039c9..8c45883 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java
@@ -18,13 +18,13 @@
 
 package org.apache.flink.connector.firehose.table;
 
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.connector.firehose.sink.KinesisFirehoseSink;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.TableOptionsBuilder;
 import org.apache.flink.table.factories.TestFormatFactory;
@@ -69,7 +69,7 @@ public class KinesisFirehoseDynamicTableFactoryTest extends TestLogger {
         // verify the produced sink
         DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
                 actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
-        Sink<RowData, ?, ?, ?> sinkFunction = ((SinkProvider) sinkFunctionProvider).createSink();
+        Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
         Assertions.assertThat(sinkFunction).isInstanceOf(KinesisFirehoseSink.class);
     }
 
@@ -96,7 +96,7 @@ public class KinesisFirehoseDynamicTableFactoryTest extends TestLogger {
         // verify the produced sink
         DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider =
                 actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
-        Sink<RowData, ?, ?, ?> sinkFunction = ((SinkProvider) sinkFunctionProvider).createSink();
+        Sink<RowData> sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink();
         Assertions.assertThat(sinkFunction).isInstanceOf(KinesisFirehoseSink.class);
     }
 
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
index 872e951..0d93460 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
@@ -18,16 +18,12 @@
 package org.apache.flink.connector.base.sink;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
-import java.util.Optional;
 
 /**
  * A generic sink for destinations that provide an async client to persist data.
@@ -49,7 +45,7 @@ import java.util.Optional;
  */
 @PublicEvolving
 public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
-        implements Sink<InputT, Void, BufferedRequestState<RequestEntryT>, Void> {
+        implements StatefulSink<InputT, BufferedRequestState<RequestEntryT>> {
 
     private final ElementConverter<InputT, RequestEntryT> elementConverter;
     private final int maxBatchSize;
@@ -79,26 +75,6 @@ public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
         this.maxRecordSizeInBytes = maxRecordSizeInBytes;
     }
 
-    @Override
-    public Optional<Committer<Void>> createCommitter() {
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<GlobalCommitter<Void, Void>> createGlobalCommitter() {
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<SimpleVersionedSerializer<Void>> getCommittableSerializer() {
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
-        return Optional.empty();
-    }
-
     protected ElementConverter<InputT, RequestEntryT> getElementConverter() {
         return elementConverter;
     }
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 0457b08..14ac74f 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -19,8 +19,9 @@ package org.apache.flink.connector.base.sink.writer;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.util.Preconditions;
@@ -29,6 +30,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
@@ -49,10 +51,10 @@ import java.util.function.Consumer;
  */
 @PublicEvolving
 public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
-        implements SinkWriter<InputT, Void, BufferedRequestState<RequestEntryT>> {
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
 
     private final MailboxExecutor mailboxExecutor;
-    private final Sink.ProcessingTimeService timeService;
+    private final ProcessingTimeService timeService;
 
     /* The timestamp of the previous batch of records was sent from this sink. */
     private long lastSendTimestamp = 0;
@@ -227,7 +229,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
             long maxBatchSizeInBytes,
             long maxTimeInBufferMS,
             long maxRecordSizeInBytes,
-            List<BufferedRequestState<RequestEntryT>> states) {
+            Collection<BufferedRequestState<RequestEntryT>> states) {
         this.elementConverter = elementConverter;
         this.mailboxExecutor = context.getMailboxExecutor();
         this.timeService = context.getProcessingTimeService();
@@ -273,15 +275,14 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
     }
 
     private void registerCallback() {
-        Sink.ProcessingTimeService.ProcessingTimeCallback ptc =
+        ProcessingTimeService.ProcessingTimeCallback ptc =
                 instant -> {
                     existsActiveTimerCallback = false;
                     while (!bufferedRequestEntries.isEmpty()) {
                         flush();
                     }
                 };
-        timeService.registerProcessingTimer(
-                timeService.getCurrentProcessingTime() + maxTimeInBufferMS, ptc);
+        timeService.registerTimer(timeService.getCurrentProcessingTime() + maxTimeInBufferMS, ptc);
         existsActiveTimerCallback = true;
     }
 
@@ -408,15 +409,13 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
      * <p>To this end, all in-flight requests need to completed before proceeding with the commit.
      */
     @Override
-    public List<Void> prepareCommit(boolean flush) {
+    public void flush(boolean flush) {
         while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0 && flush)) {
             mailboxExecutor.tryYield();
             if (flush) {
                 flush();
             }
         }
-
-        return Collections.emptyList();
     }
 
     /**
@@ -426,11 +425,11 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
      * a failure/restart of the application.
      */
     @Override
-    public List<BufferedRequestState<RequestEntryT>> snapshotState() {
+    public List<BufferedRequestState<RequestEntryT>> snapshotState(long checkpointId) {
         return Collections.singletonList(new BufferedRequestState<>((bufferedRequestEntries)));
     }
 
-    protected void initialize(List<BufferedRequestState<RequestEntryT>> states) {
+    protected void initialize(Collection<BufferedRequestState<RequestEntryT>> states) {
         if (states.isEmpty()) {
             return;
         }
@@ -440,7 +439,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
                     "Writer failed to initialize due to multiple initial states.");
         }
 
-        BufferedRequestState<RequestEntryT> state = states.get(0);
+        BufferedRequestState<RequestEntryT> state = states.iterator().next();
         this.bufferedRequestEntries.addAll(state.getBufferedRequestEntries());
 
         for (RequestEntryWrapper<RequestEntryT> wrapper : bufferedRequestEntries) {
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
index 94334e0..de9f6e0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
@@ -18,7 +18,7 @@
 package org.apache.flink.connector.base.sink.writer;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
 
 import java.io.Serializable;
 
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
index a768083..0e68b1a 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
@@ -17,14 +17,17 @@
 
 package org.apache.flink.connector.base.sink;
 
-import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
 import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
-import java.util.Optional;
 import java.util.function.Consumer;
 
 /** Dummy destination that records write events. */
@@ -52,11 +55,8 @@ public class ArrayListAsyncSink extends AsyncSinkBase<String, Integer> {
     }
 
     @Override
-    public SinkWriter<String, Void, BufferedRequestState<Integer>> createWriter(
-            InitContext context, List<BufferedRequestState<Integer>> states) {
-        /* SinkWriter implementing {@code submitRequestEntries} that is used to define the persistence
-         * logic into {@code ArrayListDestination}.
-         */
+    public StatefulSinkWriter<String, BufferedRequestState<Integer>> createWriter(
+            InitContext context) throws IOException {
         return new AsyncSinkWriter<String, Integer>(
                 getElementConverter(),
                 context,
@@ -86,8 +86,31 @@ public class ArrayListAsyncSink extends AsyncSinkBase<String, Integer> {
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<BufferedRequestState<Integer>>>
-            getWriterStateSerializer() {
-        return Optional.empty();
+    public StatefulSinkWriter<String, BufferedRequestState<Integer>> restoreWriter(
+            InitContext context, Collection<BufferedRequestState<Integer>> recoveredState)
+            throws IOException {
+        return createWriter(context);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<BufferedRequestState<Integer>> getWriterStateSerializer() {
+        return new AsyncSinkWriterStateSerializer<Integer>() {
+            @Override
+            protected void serializeRequestToStream(Integer request, DataOutputStream out)
+                    throws IOException {
+                out.writeInt(request);
+            }
+
+            @Override
+            protected Integer deserializeRequestFromStream(long requestSize, DataInputStream in)
+                    throws IOException {
+                return in.readInt();
+            }
+
+            @Override
+            public int getVersion() {
+                return 0;
+            }
+        };
     }
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 671f6c6..c7eb95f 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.connector.base.sink.writer;
 
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 
 import org.junit.Before;
@@ -161,7 +161,7 @@ public class AsyncSinkWriterTest {
         for (int i = 0; i < 23; i++) {
             sink.write(String.valueOf(i));
         }
-        sink.prepareCommit(true);
+        sink.flush(true);
         assertEquals(23, res.size());
     }
 
@@ -177,7 +177,7 @@ public class AsyncSinkWriterTest {
                         .simulateFailures(false)
                         .build();
         sink.write(String.valueOf(0));
-        sink.prepareCommit(true);
+        sink.flush(true);
         assertEquals(1, res.size());
     }
 
@@ -200,7 +200,6 @@ public class AsyncSinkWriterTest {
 
         sink.write("75");
         assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), getWriterState(sink));
-
         assertEquals(3, res.size());
     }
 
@@ -221,7 +220,7 @@ public class AsyncSinkWriterTest {
         sink.write("95");
         sink.write("955");
         assertThatBufferStatesAreEqual(sink.wrapRequests(95, 955), getWriterState(sink));
-        sink.prepareCommit(true);
+        sink.flush(true);
         assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), getWriterState(sink));
     }
 
@@ -321,7 +320,7 @@ public class AsyncSinkWriterTest {
                 sink, "535", Arrays.asList(25, 55, 965, 75, 95, 45), Arrays.asList(35, 535));
 
         // Checkpoint occurs
-        sink.prepareCommit(true);
+        sink.flush(true);
 
         // Everything is saved
         assertEquals(Arrays.asList(25, 55, 965, 75, 95, 45, 955, 550, 35, 535), res);
@@ -356,7 +355,7 @@ public class AsyncSinkWriterTest {
         sink.write("505");
         assertTrue(res.contains(550));
         assertTrue(res.contains(645));
-        sink.prepareCommit(true);
+        sink.flush(true);
         assertTrue(res.contains(545));
         assertTrue(res.contains(535));
         assertTrue(res.contains(515));
@@ -468,7 +467,7 @@ public class AsyncSinkWriterTest {
         sink.write(String.valueOf(0));
         sink.write(String.valueOf(1));
         sink.write(String.valueOf(2));
-        sink.prepareCommit(false);
+        sink.flush(false);
         assertEquals(0, res.size());
     }
 
@@ -626,9 +625,9 @@ public class AsyncSinkWriterTest {
         sink.write(String.valueOf(2)); // flushing -- request should have [225,0,1], [225] fails,
         // buffer has [2]
         assertEquals(2, res.size());
-        sink.prepareCommit(false); // inflight should be added to  buffer still [225, 2]
+        sink.flush(false); // inflight should be added to  buffer still [225, 2]
         assertEquals(2, res.size());
-        sink.prepareCommit(true); // buffer now flushed []
+        sink.flush(true); // buffer now flushed []
         assertEquals(Arrays.asList(0, 1, 225, 2), res);
     }
 
@@ -654,9 +653,9 @@ public class AsyncSinkWriterTest {
         assertEquals(2, res.size()); // Request was [225, 1, 2], element 225 failed
 
         // buffer should be [3] with [225] inflight
-        sink.prepareCommit(false); // Buffer: [225,3] - > 8/110; 2/10 elements; 0 inflight
+        sink.flush(false); // Buffer: [225,3] - > 8/110; 2/10 elements; 0 inflight
         assertEquals(2, res.size()); //
-        List<BufferedRequestState<Integer>> states = sink.snapshotState();
+        List<BufferedRequestState<Integer>> states = sink.snapshotState(1);
         AsyncSinkWriterImpl newSink =
                 new AsyncSinkWriterImplBuilder()
                         .context(sinkInitContext)
@@ -691,8 +690,8 @@ public class AsyncSinkWriterTest {
                         .build();
 
         sink.write(String.valueOf(225)); // Buffer: 100/110B; 1/10 elements; 0 inflight
-        sink.prepareCommit(false);
-        List<BufferedRequestState<Integer>> states = sink.snapshotState();
+        sink.flush(false);
+        List<BufferedRequestState<Integer>> states = sink.snapshotState(1);
         assertThatExceptionOfType(IllegalStateException.class)
                 .isThrownBy(
                         () ->
@@ -729,7 +728,7 @@ public class AsyncSinkWriterTest {
         sink.write("1"); // A timer is registered here to elapse at t=100
         assertEquals(0, res.size());
         tpts.setCurrentTime(10L);
-        sink.prepareCommit(true);
+        sink.flush(true);
         assertEquals(1, res.size());
         tpts.setCurrentTime(20L); // At t=20, we write a new element that should not trigger another
         sink.write("2"); // timer to be registered. If it is, it should elapse at t=120s.
@@ -791,7 +790,7 @@ public class AsyncSinkWriterTest {
         tpts.setCurrentTime(0L);
         sink.write("1");
         tpts.setCurrentTime(50L);
-        sink.prepareCommit(true);
+        sink.flush(true);
         assertEquals(1, res.size());
         tpts.setCurrentTime(200L);
     }
@@ -928,7 +927,7 @@ public class AsyncSinkWriterTest {
 
     private BufferedRequestState<Integer> getWriterState(
             AsyncSinkWriter<String, Integer> sinkWriter) {
-        List<BufferedRequestState<Integer>> states = sinkWriter.snapshotState();
+        List<BufferedRequestState<Integer>> states = sinkWriter.snapshotState(1);
         assertEquals(states.size(), 1);
         return states.get(0);
     }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index fba2711..076da61 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -18,7 +18,9 @@
 package org.apache.flink.connector.base.sink.writer;
 
 import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
@@ -37,6 +39,7 @@ import org.apache.flink.util.function.ThrowingRunnable;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
 
 /** A mock implementation of a {@code Sink.InitContext} to be used in sink unit tests. */
 public class TestSinkInitContext implements Sink.InitContext {
@@ -85,18 +88,19 @@ public class TestSinkInitContext implements Sink.InitContext {
     }
 
     @Override
-    public Sink.ProcessingTimeService getProcessingTimeService() {
-        return new Sink.ProcessingTimeService() {
+    public ProcessingTimeService getProcessingTimeService() {
+        return new ProcessingTimeService() {
             @Override
             public long getCurrentProcessingTime() {
                 return processingTimeService.getCurrentProcessingTime();
             }
 
             @Override
-            public void registerProcessingTimer(
+            public ScheduledFuture<?> registerTimer(
                     long time, ProcessingTimeCallback processingTimerCallback) {
                 processingTimeService.registerTimer(
                         time, processingTimerCallback::onProcessingTime);
+                return null;
             }
         };
     }
@@ -121,6 +125,11 @@ public class TestSinkInitContext implements Sink.InitContext {
         return OptionalLong.empty();
     }
 
+    @Override
+    public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
+        return null;
+    }
+
     public TestProcessingTimeService getTestProcessingTimeService() {
         return processingTimeService;
     }