You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/02/09 08:17:58 UTC

[flink] 02/02: [FLINK-25572][connectors/filesystem] Update File Sink to use decomposed interfaces

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 890b83ca2c5d64f6707e83952270cfca05159227
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Feb 9 15:23:15 2022 +0800

    [FLINK-25572][connectors/filesystem] Update File Sink to use decomposed interfaces
    
    This closes #18642.
---
 .../apache/flink/connector/file/sink/FileSink.java | 51 ++++++++--------
 .../file/sink/committer/FileCommitter.java         | 14 ++---
 .../connector/file/sink/writer/FileWriter.java     | 34 +++++++----
 .../file/sink/writer/FileWriterBucket.java         |  4 +-
 .../file/sink/committer/FileCommitterTest.java     | 55 ++++++++++-------
 ...leWriterBucketStateSerializerMigrationTest.java | 13 ++--
 .../connector/file/sink/writer/FileWriterTest.java | 36 +++++------
 .../connector/kafka/sink/KafkaCommitterTest.java   | 55 +++--------------
 .../connector/sink2/mocks/MockCommitRequest.java   | 69 ++++++++++++++++++++++
 .../table/planner/factories/TestFileFactory.java   |  6 +-
 10 files changed, 196 insertions(+), 141 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
index 920a11b..566037f 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
@@ -23,10 +23,10 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.connector.file.sink.committer.FileCommitter;
 import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
 import org.apache.flink.connector.file.sink.writer.FileWriter;
@@ -52,8 +52,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -108,7 +106,10 @@ import static org.apache.flink.util.Preconditions.checkState;
  *     written to its output
  */
 @Experimental
-public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBucketState, Void> {
+public class FileSink<IN>
+        implements StatefulSink<IN, FileWriterBucketState>,
+                TwoPhaseCommittingSink<IN, FileSinkCommittable>,
+                WithCompatibleState {
 
     private final BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuilder;
 
@@ -117,17 +118,23 @@ public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBuc
     }
 
     @Override
-    public SinkWriter<IN, FileSinkCommittable, FileWriterBucketState> createWriter(
-            InitContext context, List<FileWriterBucketState> states) throws IOException {
+    public FileWriter<IN> createWriter(InitContext context) throws IOException {
+        return bucketsBuilder.createWriter(context);
+    }
+
+    @Override
+    public StatefulSinkWriter<IN, FileWriterBucketState> restoreWriter(
+            InitContext context, Collection<FileWriterBucketState> recoveredState)
+            throws IOException {
         FileWriter<IN> writer = bucketsBuilder.createWriter(context);
-        writer.initializeState(states);
+        writer.initializeState(recoveredState);
         return writer;
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<FileWriterBucketState>> getWriterStateSerializer() {
+    public SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer() {
         try {
-            return Optional.of(bucketsBuilder.getWriterStateSerializer());
+            return bucketsBuilder.getWriterStateSerializer();
         } catch (IOException e) {
             // it's not optimal that we have to do this but creating the serializers for the
             // FileSink requires (among other things) a call to FileSystem.get() which declares
@@ -137,14 +144,14 @@ public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBuc
     }
 
     @Override
-    public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
-        return Optional.of(bucketsBuilder.createCommitter());
+    public Committer<FileSinkCommittable> createCommitter() throws IOException {
+        return bucketsBuilder.createCommitter();
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer() {
+    public SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() {
         try {
-            return Optional.of(bucketsBuilder.getCommittableSerializer());
+            return bucketsBuilder.getCommittableSerializer();
         } catch (IOException e) {
             // it's not optimal that we have to do this but creating the serializers for the
             // FileSink requires (among other things) a call to FileSystem.get() which declares
@@ -154,17 +161,7 @@ public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBuc
     }
 
     @Override
-    public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
-        return Optional.empty();
-    }
-
-    @Override
-    public Collection<String> getCompatibleStateNames() {
+    public Collection<String> getCompatibleWriterStateNames() {
         // StreamingFileSink
         return Collections.singleton("bucket-states");
     }
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
index ccd3018..c72b399 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
@@ -19,14 +19,13 @@
 package org.apache.flink.connector.file.sink.committer;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
+import java.util.Collection;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -48,9 +47,10 @@ public class FileCommitter implements Committer<FileSinkCommittable> {
     }
 
     @Override
-    public List<FileSinkCommittable> commit(List<FileSinkCommittable> committables)
-            throws IOException {
-        for (FileSinkCommittable committable : committables) {
+    public void commit(Collection<CommitRequest<FileSinkCommittable>> requests)
+            throws IOException, InterruptedException {
+        for (CommitRequest<FileSinkCommittable> request : requests) {
+            FileSinkCommittable committable = request.getCommittable();
             if (committable.hasPendingFile()) {
                 // We should always use commitAfterRecovery which contains additional checks.
                 bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery();
@@ -61,8 +61,6 @@ public class FileCommitter implements Committer<FileSinkCommittable> {
                         committable.getInProgressFileToCleanup());
             }
         }
-
-        return Collections.emptyList();
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
index 15aebd0..fad3b50 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
@@ -20,8 +20,10 @@ package org.apache.flink.connector.file.sink.writer;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
 import org.apache.flink.core.fs.Path;
@@ -39,6 +41,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -58,8 +61,10 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 @Internal
 public class FileWriter<IN>
-        implements SinkWriter<IN, FileSinkCommittable, FileWriterBucketState>,
-                Sink.ProcessingTimeService.ProcessingTimeCallback {
+        implements StatefulSinkWriter<IN, FileWriterBucketState>,
+                TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, FileSinkCommittable>,
+                SinkWriter<IN>,
+                ProcessingTimeService.ProcessingTimeCallback {
 
     private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
 
@@ -75,7 +80,7 @@ public class FileWriter<IN>
 
     private final RollingPolicy<IN, String> rollingPolicy;
 
-    private final Sink.ProcessingTimeService processingTimeService;
+    private final ProcessingTimeService processingTimeService;
 
     private final long bucketCheckInterval;
 
@@ -89,6 +94,8 @@ public class FileWriter<IN>
 
     private final Counter recordsOutCounter;
 
+    private boolean endOfInput;
+
     /**
      * A constructor creating a new empty bucket manager.
      *
@@ -107,7 +114,7 @@ public class FileWriter<IN>
             final BucketWriter<IN, String> bucketWriter,
             final RollingPolicy<IN, String> rollingPolicy,
             final OutputFileConfig outputFileConfig,
-            final Sink.ProcessingTimeService processingTimeService,
+            final ProcessingTimeService processingTimeService,
             final long bucketCheckInterval) {
 
         this.basePath = checkNotNull(basePath);
@@ -148,7 +155,7 @@ public class FileWriter<IN>
      * @throws IOException if anything goes wrong during retrieving the state or
      *     restoring/committing of any in-progress/pending part files
      */
-    public void initializeState(List<FileWriterBucketState> bucketStates) throws IOException {
+    public void initializeState(Collection<FileWriterBucketState> bucketStates) throws IOException {
         checkNotNull(bucketStates, "The retrieved state was null.");
 
         for (FileWriterBucketState state : bucketStates) {
@@ -179,7 +186,7 @@ public class FileWriter<IN>
     }
 
     @Override
-    public void write(IN element, Context context) throws IOException {
+    public void write(IN element, Context context) throws IOException, InterruptedException {
         // setting the values in the bucketer context
         bucketerContext.update(
                 context.timestamp(),
@@ -193,7 +200,12 @@ public class FileWriter<IN>
     }
 
     @Override
-    public List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+    public void flush(boolean endOfInput) throws IOException, InterruptedException {
+        this.endOfInput = endOfInput;
+    }
+
+    @Override
+    public Collection<FileSinkCommittable> prepareCommit() throws IOException {
         List<FileSinkCommittable> committables = new ArrayList<>();
 
         // Every time before we prepare commit, we first check and remove the inactive
@@ -206,7 +218,7 @@ public class FileWriter<IN>
             if (!entry.getValue().isActive()) {
                 activeBucketIt.remove();
             } else {
-                committables.addAll(entry.getValue().prepareCommit(flush));
+                committables.addAll(entry.getValue().prepareCommit(endOfInput));
             }
         }
 
@@ -263,7 +275,7 @@ public class FileWriter<IN>
     private void registerNextBucketInspectionTimer() {
         final long nextInspectionTime =
                 processingTimeService.getCurrentProcessingTime() + bucketCheckInterval;
-        processingTimeService.registerProcessingTimer(nextInspectionTime, this);
+        processingTimeService.registerTimer(nextInspectionTime, this);
     }
 
     /**
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
index cec8648..3f5f5f9 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
@@ -191,9 +191,9 @@ class FileWriterBucket<IN> {
         inProgressPart.write(element, currentTime);
     }
 
-    List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+    List<FileSinkCommittable> prepareCommit(boolean endOfInput) throws IOException {
         if (inProgressPart != null
-                && (rollingPolicy.shouldRollOnCheckpoint(inProgressPart) || flush)) {
+                && (rollingPolicy.shouldRollOnCheckpoint(inProgressPart) || endOfInput)) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug(
                         "Closing in-progress part file for bucket id={} on checkpoint.", bucketId);
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
index 935d170..0b07370 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.file.sink.committer;
 
+import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
+import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
 import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
 import org.apache.flink.connector.file.sink.utils.NoOpBucketWriter;
@@ -28,9 +30,11 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -43,15 +47,16 @@ public class FileCommitterTest {
         StubBucketWriter stubBucketWriter = new StubBucketWriter();
         FileCommitter fileCommitter = new FileCommitter(stubBucketWriter);
 
-        FileSinkCommittable fileSinkCommittable =
-                new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable());
-        List<FileSinkCommittable> toRetry =
-                fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
+        MockCommitRequest<FileSinkCommittable> fileSinkCommittable =
+                new MockCommitRequest<>(
+                        new FileSinkCommittable(
+                                new FileSinkTestUtils.TestPendingFileRecoverable()));
+        fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
 
         assertEquals(1, stubBucketWriter.getRecoveredPendingFiles().size());
         assertEquals(0, stubBucketWriter.getNumCleanUp());
         assertTrue(stubBucketWriter.getRecoveredPendingFiles().get(0).isCommitted());
-        assertEquals(0, toRetry.size());
+        assertEquals(0, fileSinkCommittable.getNumberOfRetries());
     }
 
     @Test
@@ -59,14 +64,15 @@ public class FileCommitterTest {
         StubBucketWriter stubBucketWriter = new StubBucketWriter();
         FileCommitter fileCommitter = new FileCommitter(stubBucketWriter);
 
-        FileSinkCommittable fileSinkCommittable =
-                new FileSinkCommittable(new FileSinkTestUtils.TestInProgressFileRecoverable());
-        List<FileSinkCommittable> toRetry =
-                fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
+        MockCommitRequest<FileSinkCommittable> fileSinkCommittable =
+                new MockCommitRequest<>(
+                        new FileSinkCommittable(
+                                new FileSinkTestUtils.TestInProgressFileRecoverable()));
+        fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
 
         assertEquals(0, stubBucketWriter.getRecoveredPendingFiles().size());
         assertEquals(1, stubBucketWriter.getNumCleanUp());
-        assertEquals(0, toRetry.size());
+        assertEquals(0, fileSinkCommittable.getNumberOfRetries());
     }
 
     @Test
@@ -74,23 +80,28 @@ public class FileCommitterTest {
         StubBucketWriter stubBucketWriter = new StubBucketWriter();
         FileCommitter fileCommitter = new FileCommitter(stubBucketWriter);
 
-        List<FileSinkCommittable> committables =
-                Arrays.asList(
-                        new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()),
-                        new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()),
-                        new FileSinkCommittable(
-                                new FileSinkTestUtils.TestInProgressFileRecoverable()),
-                        new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()),
-                        new FileSinkCommittable(
-                                new FileSinkTestUtils.TestInProgressFileRecoverable()));
-        List<FileSinkCommittable> toRetry = fileCommitter.commit(committables);
+        Collection<CommitRequest<FileSinkCommittable>> committables =
+                Stream.of(
+                                new FileSinkCommittable(
+                                        new FileSinkTestUtils.TestPendingFileRecoverable()),
+                                new FileSinkCommittable(
+                                        new FileSinkTestUtils.TestPendingFileRecoverable()),
+                                new FileSinkCommittable(
+                                        new FileSinkTestUtils.TestInProgressFileRecoverable()),
+                                new FileSinkCommittable(
+                                        new FileSinkTestUtils.TestPendingFileRecoverable()),
+                                new FileSinkCommittable(
+                                        new FileSinkTestUtils.TestInProgressFileRecoverable()))
+                        .map(MockCommitRequest::new)
+                        .collect(Collectors.toList());
+        fileCommitter.commit(committables);
 
         assertEquals(3, stubBucketWriter.getRecoveredPendingFiles().size());
         assertEquals(2, stubBucketWriter.getNumCleanUp());
         stubBucketWriter
                 .getRecoveredPendingFiles()
                 .forEach(pendingFile -> assertTrue(pendingFile.isCommitted()));
-        assertEquals(0, toRetry.size());
+        assertTrue(committables.stream().allMatch(c -> c.getNumberOfRetries() == 0));
     }
 
     // ------------------------------- Mock Classes --------------------------------
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
index 65f15f6..228b9c9 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.connector.file.sink.writer;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
+import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
 import org.apache.flink.connector.file.sink.committer.FileCommitter;
@@ -169,7 +171,7 @@ public class FileWriterBucketStateSerializerMigrationTest {
     }
 
     @Test
-    public void testSerializationFull() throws IOException {
+    public void testSerializationFull() throws IOException, InterruptedException {
         testDeserializationFull(true, "full");
     }
 
@@ -180,12 +182,12 @@ public class FileWriterBucketStateSerializerMigrationTest {
     }
 
     @Test
-    public void testSerializationNullInProgress() throws IOException {
+    public void testSerializationNullInProgress() throws IOException, InterruptedException {
         testDeserializationFull(false, "full-no-in-progress");
     }
 
     private void testDeserializationFull(final boolean withInProgress, final String scenarioName)
-            throws IOException {
+            throws IOException, InterruptedException {
 
         final BucketStatePathResolver pathResolver =
                 new BucketStatePathResolver(BASE_PATH, previousVersion);
@@ -221,7 +223,10 @@ public class FileWriterBucketStateSerializerMigrationTest {
 
             // simulates we commit the recovered pending files on the first checkpoint
             bucket.snapshotState();
-            List<FileSinkCommittable> committables = bucket.prepareCommit(false);
+            Collection<CommitRequest<FileSinkCommittable>> committables =
+                    bucket.prepareCommit(false).stream()
+                            .map(MockCommitRequest::new)
+                            .collect(Collectors.toList());
             FileCommitter committer = new FileCommitter(createBucketWriter());
             committer.commit(committables);
 
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index a8fe439..e521f61 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.file.sink.writer;
 
+import org.apache.flink.api.common.operators.ProcessingTimeService;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
 import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
@@ -53,12 +53,14 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -92,7 +94,7 @@ public class FileWriterTest {
         fileWriter.write("test2", new ContextImpl());
         fileWriter.write("test3", new ContextImpl());
 
-        List<FileSinkCommittable> committables = fileWriter.prepareCommit(false);
+        Collection<FileSinkCommittable> committables = fileWriter.prepareCommit();
         assertEquals(3, committables.size());
     }
 
@@ -112,7 +114,7 @@ public class FileWriterTest {
         fileWriter.write("test3", new ContextImpl());
         assertEquals(3, fileWriter.getActiveBuckets().size());
 
-        fileWriter.prepareCommit(false);
+        fileWriter.prepareCommit();
         List<FileWriterBucketState> states = fileWriter.snapshotState(1L);
         assertEquals(3, states.size());
 
@@ -145,7 +147,7 @@ public class FileWriterTest {
         firstFileWriter.write("test2", new ContextImpl());
         firstFileWriter.write("test3", new ContextImpl());
 
-        firstFileWriter.prepareCommit(false);
+        firstFileWriter.prepareCommit();
         List<FileWriterBucketState> firstState = firstFileWriter.snapshotState(1L);
 
         FileWriter<String> secondFileWriter =
@@ -157,7 +159,7 @@ public class FileWriterTest {
         secondFileWriter.write("test1", new ContextImpl());
         secondFileWriter.write("test2", new ContextImpl());
 
-        secondFileWriter.prepareCommit(false);
+        secondFileWriter.prepareCommit();
         List<FileWriterBucketState> secondState = secondFileWriter.snapshotState(1L);
 
         List<FileWriterBucketState> mergedState = new ArrayList<>();
@@ -197,17 +199,17 @@ public class FileWriterTest {
                         path, OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", ""));
 
         fileWriter.write("test", new ContextImpl());
-        fileWriter.prepareCommit(false);
+        fileWriter.prepareCommit();
         fileWriter.snapshotState(1L);
 
         // No more records and another call to prepareCommit will makes it inactive
-        fileWriter.prepareCommit(false);
+        fileWriter.prepareCommit();
 
         assertTrue(fileWriter.getActiveBuckets().isEmpty());
     }
 
     @Test
-    public void testOnProcessingTime() throws IOException, InterruptedException {
+    public void testOnProcessingTime() throws Exception {
         File outDir = TEMP_FOLDER.newFolder();
         Path path = new Path(outDir.toURI());
 
@@ -247,7 +249,7 @@ public class FileWriterTest {
 
         // Close, pre-commit & clear all the pending records.
         processingTimeService.advanceTo(30);
-        fileWriter.prepareCommit(false);
+        fileWriter.prepareCommit();
 
         // Test timer re-registration.
         fileWriter.write("test1", new ContextImpl());
@@ -278,7 +280,7 @@ public class FileWriterTest {
     }
 
     @Test
-    public void testNumberRecordsOutCounter() throws IOException {
+    public void testNumberRecordsOutCounter() throws IOException, InterruptedException {
         final OperatorIOMetricGroup operatorIOMetricGroup =
                 UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
         File outDir = TEMP_FOLDER.newFolder();
@@ -350,8 +352,7 @@ public class FileWriterTest {
         }
     }
 
-    private static class ManuallyTriggeredProcessingTimeService
-            implements Sink.ProcessingTimeService {
+    private static class ManuallyTriggeredProcessingTimeService implements ProcessingTimeService {
 
         private long now;
 
@@ -364,20 +365,21 @@ public class FileWriterTest {
         }
 
         @Override
-        public void registerProcessingTimer(
+        public ScheduledFuture<?> registerTimer(
                 long time, ProcessingTimeCallback processingTimeCallback) {
             if (time <= now) {
                 try {
                     processingTimeCallback.onProcessingTime(now);
-                } catch (IOException | InterruptedException e) {
+                } catch (Exception e) {
                     ExceptionUtils.rethrow(e);
                 }
             } else {
                 timers.add(new Tuple2<>(time, processingTimeCallback));
             }
+            return null;
         }
 
-        public void advanceTo(long time) throws IOException, InterruptedException {
+        public void advanceTo(long time) throws Exception {
             if (time > now) {
                 now = time;
 
@@ -464,7 +466,7 @@ public class FileWriterTest {
             BucketAssigner<String, String> bucketAssigner,
             RollingPolicy<String, String> rollingPolicy,
             OutputFileConfig outputFileConfig,
-            Sink.ProcessingTimeService processingTimeService,
+            ProcessingTimeService processingTimeService,
             long bucketCheckInterval)
             throws IOException {
         return new FileWriter<>(
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
index 157699f..30bbdca7 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.connector.kafka.sink;
 
-import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
 import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.kafka.clients.CommonClientConfigs;
@@ -50,8 +50,8 @@ public class KafkaCommitterTest {
                         new FlinkKafkaInternalProducer<>(properties, TRANSACTIONAL_ID);
                 Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable =
                         new Recyclable<>(producer, p -> {})) {
-            final MockCommitRequest request =
-                    new MockCommitRequest(
+            final MockCommitRequest<KafkaCommittable> request =
+                    new MockCommitRequest<>(
                             new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable));
 
             producer.resumeTransaction(PRODUCER_ID, EPOCH);
@@ -73,12 +73,13 @@ public class KafkaCommitterTest {
                 Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable =
                         new Recyclable<>(producer, p -> {})) {
             // will fail because transaction not started
-            final MockCommitRequest request =
-                    new MockCommitRequest(
+            final MockCommitRequest<KafkaCommittable> request =
+                    new MockCommitRequest<>(
                             new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable));
             committer.commit(Collections.singletonList(request));
-            assertThat(request.failedWithUnknownReason).isInstanceOf(IllegalStateException.class);
-            assertThat(request.failedWithUnknownReason.getMessage())
+            assertThat(request.getFailedWithUnknownReason())
+                    .isInstanceOf(IllegalStateException.class);
+            assertThat(request.getFailedWithUnknownReason().getMessage())
                     .contains("Transaction was not started");
             assertThat(recyclable.isRecycled()).isTrue();
         }
@@ -93,44 +94,4 @@ public class KafkaCommitterTest {
         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         return properties;
     }
-
-    private static class MockCommitRequest implements Committer.CommitRequest<KafkaCommittable> {
-
-        private final KafkaCommittable committable;
-        private int retries = 0;
-        Throwable failedWithUnknownReason;
-
-        MockCommitRequest(KafkaCommittable committable) {
-            this.committable = committable;
-        }
-
-        @Override
-        public KafkaCommittable getCommittable() {
-            return committable;
-        }
-
-        @Override
-        public int getNumberOfRetries() {
-            return retries;
-        }
-
-        @Override
-        public void signalFailedWithKnownReason(Throwable t) {}
-
-        @Override
-        public void signalFailedWithUnknownReason(Throwable t) {
-            failedWithUnknownReason = t;
-        }
-
-        @Override
-        public void retryLater() {
-            retries++;
-        }
-
-        @Override
-        public void updateAndRetryLater(KafkaCommittable committable) {}
-
-        @Override
-        public void signalAlreadyCommitted() {}
-    }
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/sink2/mocks/MockCommitRequest.java b/flink-core/src/test/java/org/apache/flink/api/connector/sink2/mocks/MockCommitRequest.java
new file mode 100644
index 0000000..75a652a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/sink2/mocks/MockCommitRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2.mocks;
+
+import org.apache.flink.api.connector.sink2.Committer;
+
+/**
+ * A simple {@link Committer.CommitRequest} used for testing.
+ *
+ * @param <CommT> committable type
+ */
+public class MockCommitRequest<CommT> implements Committer.CommitRequest<CommT> {
+
+    private final CommT committable;
+    private int retries = 0;
+    private Throwable failedWithUnknownReason;
+
+    public MockCommitRequest(CommT committable) {
+        this.committable = committable;
+    }
+
+    @Override
+    public CommT getCommittable() {
+        return committable;
+    }
+
+    @Override
+    public int getNumberOfRetries() {
+        return retries;
+    }
+
+    @Override
+    public void signalFailedWithKnownReason(Throwable t) {}
+
+    @Override
+    public void signalFailedWithUnknownReason(Throwable t) {
+        failedWithUnknownReason = t;
+    }
+
+    @Override
+    public void retryLater() {
+        retries++;
+    }
+
+    @Override
+    public void updateAndRetryLater(CommT committable) {}
+
+    @Override
+    public void signalAlreadyCommitted() {}
+
+    public Throwable getFailedWithUnknownReason() {
+        return failedWithUnknownReason;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java
index 8949160..626a395 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
@@ -56,7 +56,7 @@ import java.util.Set;
 
 /**
  * Test implementation of {@link DynamicTableSourceFactory} and {@link DynamicTableSinkFactory} that
- * creates a file source and sink based on {@link SourceProvider} and {@link SinkProvider}.
+ * creates a file source and sink based on {@link SourceProvider} and {@link SinkV2Provider}.
  */
 public class TestFileFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
 
@@ -158,7 +158,7 @@ public class TestFileFactory implements DynamicTableSourceFactory, DynamicTableS
         public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
             final FileSink<RowData> fileSink =
                     FileSink.forRowFormat(path, new RowDataEncoder()).build();
-            return SinkProvider.of(fileSink);
+            return SinkV2Provider.of(fileSink);
         }
 
         @Override