You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/02/09 08:17:56 UTC

[flink] branch master updated (7cb00d1 -> 890b83c)

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 7cb00d1  [FLINK-25329][runtime] Use cache in memory graph store and support memory graph store in session cluster
     new 46788bb  [hotfix][core] Complete the scala API for the new sink
     new 890b83c  [FLINK-25572][connectors/filesystem] Update File Sink to use decomposed interfaces

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/connector/file/sink/FileSink.java | 51 ++++++++--------
 .../file/sink/committer/FileCommitter.java         | 14 ++---
 .../connector/file/sink/writer/FileWriter.java     | 34 +++++++----
 .../file/sink/writer/FileWriterBucket.java         |  4 +-
 .../file/sink/committer/FileCommitterTest.java     | 55 ++++++++++-------
 ...leWriterBucketStateSerializerMigrationTest.java | 13 ++--
 .../connector/file/sink/writer/FileWriterTest.java | 36 +++++------
 .../connector/kafka/sink/KafkaCommitterTest.java   | 55 +++--------------
 .../connector/sink2/mocks/MockCommitRequest.java   | 69 ++++++++++++++++++++++
 .../flink/streaming/api/scala/DataStream.scala     | 12 +++-
 .../table/planner/factories/TestFileFactory.java   |  6 +-
 11 files changed, 206 insertions(+), 143 deletions(-)
 create mode 100644 flink-core/src/test/java/org/apache/flink/api/connector/sink2/mocks/MockCommitRequest.java

[flink] 01/02: [hotfix][core] Complete the scala API for the new sink

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 46788bbabed543cbfff329b16b3f517231856c65
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Feb 9 15:29:52 2022 +0800

    [hotfix][core] Complete the scala API for the new sink
---
 .../org/apache/flink/streaming/api/scala/DataStream.scala    | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index f6a7bd9..2fd7522 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.operators.{ResourceSpec, SlotSharingGroup}
 import org.apache.flink.api.common.serialization.SerializationSchema
 import org.apache.flink.api.common.state.MapStateDescriptor
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.connector.sink.Sink
+import org.apache.flink.api.connector.sink2.Sink
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
@@ -1154,7 +1154,15 @@ class DataStream[T](stream: JavaStream[T]) {
    * will be executed once the StreamExecutionEnvironment.execute(...)
    * method is called.
    */
-  def sinkTo(sink: Sink[T, _, _, _]): DataStreamSink[T] = stream.sinkTo(sink)
+  def sinkTo(sink: org.apache.flink.api.connector.sink.Sink[T, _, _, _]): DataStreamSink[T] =
+    stream.sinkTo(sink)
+
+  /**
+   * Adds the given sink to this DataStream. Only streams with sinks added
+   * will be executed once the StreamExecutionEnvironment.execute(...)
+   * method is called.
+   */
+  def sinkTo(sink: Sink[T]): DataStreamSink[T] = stream.sinkTo(sink)
 
   /**
    * Triggers the distributed execution of the streaming dataflow and returns an iterator over the

[flink] 02/02: [FLINK-25572][connectors/filesystem] Update File Sink to use decomposed interfaces

Posted by ga...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 890b83ca2c5d64f6707e83952270cfca05159227
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Feb 9 15:23:15 2022 +0800

    [FLINK-25572][connectors/filesystem] Update File Sink to use decomposed interfaces
    
    This closes #18642.
---
 .../apache/flink/connector/file/sink/FileSink.java | 51 ++++++++--------
 .../file/sink/committer/FileCommitter.java         | 14 ++---
 .../connector/file/sink/writer/FileWriter.java     | 34 +++++++----
 .../file/sink/writer/FileWriterBucket.java         |  4 +-
 .../file/sink/committer/FileCommitterTest.java     | 55 ++++++++++-------
 ...leWriterBucketStateSerializerMigrationTest.java | 13 ++--
 .../connector/file/sink/writer/FileWriterTest.java | 36 +++++------
 .../connector/kafka/sink/KafkaCommitterTest.java   | 55 +++--------------
 .../connector/sink2/mocks/MockCommitRequest.java   | 69 ++++++++++++++++++++++
 .../table/planner/factories/TestFileFactory.java   |  6 +-
 10 files changed, 196 insertions(+), 141 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
index 920a11b..566037f 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
@@ -23,10 +23,10 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.connector.file.sink.committer.FileCommitter;
 import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
 import org.apache.flink.connector.file.sink.writer.FileWriter;
@@ -52,8 +52,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -108,7 +106,10 @@ import static org.apache.flink.util.Preconditions.checkState;
  *     written to its output
  */
 @Experimental
-public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBucketState, Void> {
+public class FileSink<IN>
+        implements StatefulSink<IN, FileWriterBucketState>,
+                TwoPhaseCommittingSink<IN, FileSinkCommittable>,
+                WithCompatibleState {
 
     private final BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuilder;
 
@@ -117,17 +118,23 @@ public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBuc
     }
 
     @Override
-    public SinkWriter<IN, FileSinkCommittable, FileWriterBucketState> createWriter(
-            InitContext context, List<FileWriterBucketState> states) throws IOException {
+    public FileWriter<IN> createWriter(InitContext context) throws IOException {
+        return bucketsBuilder.createWriter(context);
+    }
+
+    @Override
+    public StatefulSinkWriter<IN, FileWriterBucketState> restoreWriter(
+            InitContext context, Collection<FileWriterBucketState> recoveredState)
+            throws IOException {
         FileWriter<IN> writer = bucketsBuilder.createWriter(context);
-        writer.initializeState(states);
+        writer.initializeState(recoveredState);
         return writer;
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<FileWriterBucketState>> getWriterStateSerializer() {
+    public SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer() {
         try {
-            return Optional.of(bucketsBuilder.getWriterStateSerializer());
+            return bucketsBuilder.getWriterStateSerializer();
         } catch (IOException e) {
             // it's not optimal that we have to do this but creating the serializers for the
             // FileSink requires (among other things) a call to FileSystem.get() which declares
@@ -137,14 +144,14 @@ public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBuc
     }
 
     @Override
-    public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
-        return Optional.of(bucketsBuilder.createCommitter());
+    public Committer<FileSinkCommittable> createCommitter() throws IOException {
+        return bucketsBuilder.createCommitter();
     }
 
     @Override
-    public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer() {
+    public SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() {
         try {
-            return Optional.of(bucketsBuilder.getCommittableSerializer());
+            return bucketsBuilder.getCommittableSerializer();
         } catch (IOException e) {
             // it's not optimal that we have to do this but creating the serializers for the
             // FileSink requires (among other things) a call to FileSystem.get() which declares
@@ -154,17 +161,7 @@ public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBuc
     }
 
     @Override
-    public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
-        return Optional.empty();
-    }
-
-    @Override
-    public Collection<String> getCompatibleStateNames() {
+    public Collection<String> getCompatibleWriterStateNames() {
         // StreamingFileSink
         return Collections.singleton("bucket-states");
     }
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
index ccd3018..c72b399 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
@@ -19,14 +19,13 @@
 package org.apache.flink.connector.file.sink.committer;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
+import java.util.Collection;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -48,9 +47,10 @@ public class FileCommitter implements Committer<FileSinkCommittable> {
     }
 
     @Override
-    public List<FileSinkCommittable> commit(List<FileSinkCommittable> committables)
-            throws IOException {
-        for (FileSinkCommittable committable : committables) {
+    public void commit(Collection<CommitRequest<FileSinkCommittable>> requests)
+            throws IOException, InterruptedException {
+        for (CommitRequest<FileSinkCommittable> request : requests) {
+            FileSinkCommittable committable = request.getCommittable();
             if (committable.hasPendingFile()) {
                 // We should always use commitAfterRecovery which contains additional checks.
                 bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery();
@@ -61,8 +61,6 @@ public class FileCommitter implements Committer<FileSinkCommittable> {
                         committable.getInProgressFileToCleanup());
             }
         }
-
-        return Collections.emptyList();
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
index 15aebd0..fad3b50 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
@@ -20,8 +20,10 @@ package org.apache.flink.connector.file.sink.writer;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
 import org.apache.flink.core.fs.Path;
@@ -39,6 +41,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -58,8 +61,10 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 @Internal
 public class FileWriter<IN>
-        implements SinkWriter<IN, FileSinkCommittable, FileWriterBucketState>,
-                Sink.ProcessingTimeService.ProcessingTimeCallback {
+        implements StatefulSinkWriter<IN, FileWriterBucketState>,
+                TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, FileSinkCommittable>,
+                SinkWriter<IN>,
+                ProcessingTimeService.ProcessingTimeCallback {
 
     private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
 
@@ -75,7 +80,7 @@ public class FileWriter<IN>
 
     private final RollingPolicy<IN, String> rollingPolicy;
 
-    private final Sink.ProcessingTimeService processingTimeService;
+    private final ProcessingTimeService processingTimeService;
 
     private final long bucketCheckInterval;
 
@@ -89,6 +94,8 @@ public class FileWriter<IN>
 
     private final Counter recordsOutCounter;
 
+    private boolean endOfInput;
+
     /**
      * A constructor creating a new empty bucket manager.
      *
@@ -107,7 +114,7 @@ public class FileWriter<IN>
             final BucketWriter<IN, String> bucketWriter,
             final RollingPolicy<IN, String> rollingPolicy,
             final OutputFileConfig outputFileConfig,
-            final Sink.ProcessingTimeService processingTimeService,
+            final ProcessingTimeService processingTimeService,
             final long bucketCheckInterval) {
 
         this.basePath = checkNotNull(basePath);
@@ -148,7 +155,7 @@ public class FileWriter<IN>
      * @throws IOException if anything goes wrong during retrieving the state or
      *     restoring/committing of any in-progress/pending part files
      */
-    public void initializeState(List<FileWriterBucketState> bucketStates) throws IOException {
+    public void initializeState(Collection<FileWriterBucketState> bucketStates) throws IOException {
         checkNotNull(bucketStates, "The retrieved state was null.");
 
         for (FileWriterBucketState state : bucketStates) {
@@ -179,7 +186,7 @@ public class FileWriter<IN>
     }
 
     @Override
-    public void write(IN element, Context context) throws IOException {
+    public void write(IN element, Context context) throws IOException, InterruptedException {
         // setting the values in the bucketer context
         bucketerContext.update(
                 context.timestamp(),
@@ -193,7 +200,12 @@ public class FileWriter<IN>
     }
 
     @Override
-    public List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+    public void flush(boolean endOfInput) throws IOException, InterruptedException {
+        this.endOfInput = endOfInput;
+    }
+
+    @Override
+    public Collection<FileSinkCommittable> prepareCommit() throws IOException {
         List<FileSinkCommittable> committables = new ArrayList<>();
 
         // Every time before we prepare commit, we first check and remove the inactive
@@ -206,7 +218,7 @@ public class FileWriter<IN>
             if (!entry.getValue().isActive()) {
                 activeBucketIt.remove();
             } else {
-                committables.addAll(entry.getValue().prepareCommit(flush));
+                committables.addAll(entry.getValue().prepareCommit(endOfInput));
             }
         }
 
@@ -263,7 +275,7 @@ public class FileWriter<IN>
     private void registerNextBucketInspectionTimer() {
         final long nextInspectionTime =
                 processingTimeService.getCurrentProcessingTime() + bucketCheckInterval;
-        processingTimeService.registerProcessingTimer(nextInspectionTime, this);
+        processingTimeService.registerTimer(nextInspectionTime, this);
     }
 
     /**
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
index cec8648..3f5f5f9 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
@@ -191,9 +191,9 @@ class FileWriterBucket<IN> {
         inProgressPart.write(element, currentTime);
     }
 
-    List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+    List<FileSinkCommittable> prepareCommit(boolean endOfInput) throws IOException {
         if (inProgressPart != null
-                && (rollingPolicy.shouldRollOnCheckpoint(inProgressPart) || flush)) {
+                && (rollingPolicy.shouldRollOnCheckpoint(inProgressPart) || endOfInput)) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug(
                         "Closing in-progress part file for bucket id={} on checkpoint.", bucketId);
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
index 935d170..0b07370 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.file.sink.committer;
 
+import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
+import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
 import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
 import org.apache.flink.connector.file.sink.utils.NoOpBucketWriter;
@@ -28,9 +30,11 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -43,15 +47,16 @@ public class FileCommitterTest {
         StubBucketWriter stubBucketWriter = new StubBucketWriter();
         FileCommitter fileCommitter = new FileCommitter(stubBucketWriter);
 
-        FileSinkCommittable fileSinkCommittable =
-                new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable());
-        List<FileSinkCommittable> toRetry =
-                fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
+        MockCommitRequest<FileSinkCommittable> fileSinkCommittable =
+                new MockCommitRequest<>(
+                        new FileSinkCommittable(
+                                new FileSinkTestUtils.TestPendingFileRecoverable()));
+        fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
 
         assertEquals(1, stubBucketWriter.getRecoveredPendingFiles().size());
         assertEquals(0, stubBucketWriter.getNumCleanUp());
         assertTrue(stubBucketWriter.getRecoveredPendingFiles().get(0).isCommitted());
-        assertEquals(0, toRetry.size());
+        assertEquals(0, fileSinkCommittable.getNumberOfRetries());
     }
 
     @Test
@@ -59,14 +64,15 @@ public class FileCommitterTest {
         StubBucketWriter stubBucketWriter = new StubBucketWriter();
         FileCommitter fileCommitter = new FileCommitter(stubBucketWriter);
 
-        FileSinkCommittable fileSinkCommittable =
-                new FileSinkCommittable(new FileSinkTestUtils.TestInProgressFileRecoverable());
-        List<FileSinkCommittable> toRetry =
-                fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
+        MockCommitRequest<FileSinkCommittable> fileSinkCommittable =
+                new MockCommitRequest<>(
+                        new FileSinkCommittable(
+                                new FileSinkTestUtils.TestInProgressFileRecoverable()));
+        fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
 
         assertEquals(0, stubBucketWriter.getRecoveredPendingFiles().size());
         assertEquals(1, stubBucketWriter.getNumCleanUp());
-        assertEquals(0, toRetry.size());
+        assertEquals(0, fileSinkCommittable.getNumberOfRetries());
     }
 
     @Test
@@ -74,23 +80,28 @@ public class FileCommitterTest {
         StubBucketWriter stubBucketWriter = new StubBucketWriter();
         FileCommitter fileCommitter = new FileCommitter(stubBucketWriter);
 
-        List<FileSinkCommittable> committables =
-                Arrays.asList(
-                        new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()),
-                        new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()),
-                        new FileSinkCommittable(
-                                new FileSinkTestUtils.TestInProgressFileRecoverable()),
-                        new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()),
-                        new FileSinkCommittable(
-                                new FileSinkTestUtils.TestInProgressFileRecoverable()));
-        List<FileSinkCommittable> toRetry = fileCommitter.commit(committables);
+        Collection<CommitRequest<FileSinkCommittable>> committables =
+                Stream.of(
+                                new FileSinkCommittable(
+                                        new FileSinkTestUtils.TestPendingFileRecoverable()),
+                                new FileSinkCommittable(
+                                        new FileSinkTestUtils.TestPendingFileRecoverable()),
+                                new FileSinkCommittable(
+                                        new FileSinkTestUtils.TestInProgressFileRecoverable()),
+                                new FileSinkCommittable(
+                                        new FileSinkTestUtils.TestPendingFileRecoverable()),
+                                new FileSinkCommittable(
+                                        new FileSinkTestUtils.TestInProgressFileRecoverable()))
+                        .map(MockCommitRequest::new)
+                        .collect(Collectors.toList());
+        fileCommitter.commit(committables);
 
         assertEquals(3, stubBucketWriter.getRecoveredPendingFiles().size());
         assertEquals(2, stubBucketWriter.getNumCleanUp());
         stubBucketWriter
                 .getRecoveredPendingFiles()
                 .forEach(pendingFile -> assertTrue(pendingFile.isCommitted()));
-        assertEquals(0, toRetry.size());
+        assertTrue(committables.stream().allMatch(c -> c.getNumberOfRetries() == 0));
     }
 
     // ------------------------------- Mock Classes --------------------------------
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
index 65f15f6..228b9c9 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.connector.file.sink.writer;
 
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
+import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
 import org.apache.flink.connector.file.sink.committer.FileCommitter;
@@ -169,7 +171,7 @@ public class FileWriterBucketStateSerializerMigrationTest {
     }
 
     @Test
-    public void testSerializationFull() throws IOException {
+    public void testSerializationFull() throws IOException, InterruptedException {
         testDeserializationFull(true, "full");
     }
 
@@ -180,12 +182,12 @@ public class FileWriterBucketStateSerializerMigrationTest {
     }
 
     @Test
-    public void testSerializationNullInProgress() throws IOException {
+    public void testSerializationNullInProgress() throws IOException, InterruptedException {
         testDeserializationFull(false, "full-no-in-progress");
     }
 
     private void testDeserializationFull(final boolean withInProgress, final String scenarioName)
-            throws IOException {
+            throws IOException, InterruptedException {
 
         final BucketStatePathResolver pathResolver =
                 new BucketStatePathResolver(BASE_PATH, previousVersion);
@@ -221,7 +223,10 @@ public class FileWriterBucketStateSerializerMigrationTest {
 
             // simulates we commit the recovered pending files on the first checkpoint
             bucket.snapshotState();
-            List<FileSinkCommittable> committables = bucket.prepareCommit(false);
+            Collection<CommitRequest<FileSinkCommittable>> committables =
+                    bucket.prepareCommit(false).stream()
+                            .map(MockCommitRequest::new)
+                            .collect(Collectors.toList());
             FileCommitter committer = new FileCommitter(createBucketWriter());
             committer.commit(committables);
 
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index a8fe439..e521f61 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.connector.file.sink.writer;
 
+import org.apache.flink.api.common.operators.ProcessingTimeService;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
 import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
@@ -53,12 +53,14 @@ import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Queue;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -92,7 +94,7 @@ public class FileWriterTest {
         fileWriter.write("test2", new ContextImpl());
         fileWriter.write("test3", new ContextImpl());
 
-        List<FileSinkCommittable> committables = fileWriter.prepareCommit(false);
+        Collection<FileSinkCommittable> committables = fileWriter.prepareCommit();
         assertEquals(3, committables.size());
     }
 
@@ -112,7 +114,7 @@ public class FileWriterTest {
         fileWriter.write("test3", new ContextImpl());
         assertEquals(3, fileWriter.getActiveBuckets().size());
 
-        fileWriter.prepareCommit(false);
+        fileWriter.prepareCommit();
         List<FileWriterBucketState> states = fileWriter.snapshotState(1L);
         assertEquals(3, states.size());
 
@@ -145,7 +147,7 @@ public class FileWriterTest {
         firstFileWriter.write("test2", new ContextImpl());
         firstFileWriter.write("test3", new ContextImpl());
 
-        firstFileWriter.prepareCommit(false);
+        firstFileWriter.prepareCommit();
         List<FileWriterBucketState> firstState = firstFileWriter.snapshotState(1L);
 
         FileWriter<String> secondFileWriter =
@@ -157,7 +159,7 @@ public class FileWriterTest {
         secondFileWriter.write("test1", new ContextImpl());
         secondFileWriter.write("test2", new ContextImpl());
 
-        secondFileWriter.prepareCommit(false);
+        secondFileWriter.prepareCommit();
         List<FileWriterBucketState> secondState = secondFileWriter.snapshotState(1L);
 
         List<FileWriterBucketState> mergedState = new ArrayList<>();
@@ -197,17 +199,17 @@ public class FileWriterTest {
                         path, OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", ""));
 
         fileWriter.write("test", new ContextImpl());
-        fileWriter.prepareCommit(false);
+        fileWriter.prepareCommit();
         fileWriter.snapshotState(1L);
 
         // No more records and another call to prepareCommit will makes it inactive
-        fileWriter.prepareCommit(false);
+        fileWriter.prepareCommit();
 
         assertTrue(fileWriter.getActiveBuckets().isEmpty());
     }
 
     @Test
-    public void testOnProcessingTime() throws IOException, InterruptedException {
+    public void testOnProcessingTime() throws Exception {
         File outDir = TEMP_FOLDER.newFolder();
         Path path = new Path(outDir.toURI());
 
@@ -247,7 +249,7 @@ public class FileWriterTest {
 
         // Close, pre-commit & clear all the pending records.
         processingTimeService.advanceTo(30);
-        fileWriter.prepareCommit(false);
+        fileWriter.prepareCommit();
 
         // Test timer re-registration.
         fileWriter.write("test1", new ContextImpl());
@@ -278,7 +280,7 @@ public class FileWriterTest {
     }
 
     @Test
-    public void testNumberRecordsOutCounter() throws IOException {
+    public void testNumberRecordsOutCounter() throws IOException, InterruptedException {
         final OperatorIOMetricGroup operatorIOMetricGroup =
                 UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
         File outDir = TEMP_FOLDER.newFolder();
@@ -350,8 +352,7 @@ public class FileWriterTest {
         }
     }
 
-    private static class ManuallyTriggeredProcessingTimeService
-            implements Sink.ProcessingTimeService {
+    private static class ManuallyTriggeredProcessingTimeService implements ProcessingTimeService {
 
         private long now;
 
@@ -364,20 +365,21 @@ public class FileWriterTest {
         }
 
         @Override
-        public void registerProcessingTimer(
+        public ScheduledFuture<?> registerTimer(
                 long time, ProcessingTimeCallback processingTimeCallback) {
             if (time <= now) {
                 try {
                     processingTimeCallback.onProcessingTime(now);
-                } catch (IOException | InterruptedException e) {
+                } catch (Exception e) {
                     ExceptionUtils.rethrow(e);
                 }
             } else {
                 timers.add(new Tuple2<>(time, processingTimeCallback));
             }
+            return null;
         }
 
-        public void advanceTo(long time) throws IOException, InterruptedException {
+        public void advanceTo(long time) throws Exception {
             if (time > now) {
                 now = time;
 
@@ -464,7 +466,7 @@ public class FileWriterTest {
             BucketAssigner<String, String> bucketAssigner,
             RollingPolicy<String, String> rollingPolicy,
             OutputFileConfig outputFileConfig,
-            Sink.ProcessingTimeService processingTimeService,
+            ProcessingTimeService processingTimeService,
             long bucketCheckInterval)
             throws IOException {
         return new FileWriter<>(
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
index 157699f..30bbdca7 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.connector.kafka.sink;
 
-import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
 import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.kafka.clients.CommonClientConfigs;
@@ -50,8 +50,8 @@ public class KafkaCommitterTest {
                         new FlinkKafkaInternalProducer<>(properties, TRANSACTIONAL_ID);
                 Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable =
                         new Recyclable<>(producer, p -> {})) {
-            final MockCommitRequest request =
-                    new MockCommitRequest(
+            final MockCommitRequest<KafkaCommittable> request =
+                    new MockCommitRequest<>(
                             new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable));
 
             producer.resumeTransaction(PRODUCER_ID, EPOCH);
@@ -73,12 +73,13 @@ public class KafkaCommitterTest {
                 Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable =
                         new Recyclable<>(producer, p -> {})) {
             // will fail because transaction not started
-            final MockCommitRequest request =
-                    new MockCommitRequest(
+            final MockCommitRequest<KafkaCommittable> request =
+                    new MockCommitRequest<>(
                             new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable));
             committer.commit(Collections.singletonList(request));
-            assertThat(request.failedWithUnknownReason).isInstanceOf(IllegalStateException.class);
-            assertThat(request.failedWithUnknownReason.getMessage())
+            assertThat(request.getFailedWithUnknownReason())
+                    .isInstanceOf(IllegalStateException.class);
+            assertThat(request.getFailedWithUnknownReason().getMessage())
                     .contains("Transaction was not started");
             assertThat(recyclable.isRecycled()).isTrue();
         }
@@ -93,44 +94,4 @@ public class KafkaCommitterTest {
         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         return properties;
     }
-
-    private static class MockCommitRequest implements Committer.CommitRequest<KafkaCommittable> {
-
-        private final KafkaCommittable committable;
-        private int retries = 0;
-        Throwable failedWithUnknownReason;
-
-        MockCommitRequest(KafkaCommittable committable) {
-            this.committable = committable;
-        }
-
-        @Override
-        public KafkaCommittable getCommittable() {
-            return committable;
-        }
-
-        @Override
-        public int getNumberOfRetries() {
-            return retries;
-        }
-
-        @Override
-        public void signalFailedWithKnownReason(Throwable t) {}
-
-        @Override
-        public void signalFailedWithUnknownReason(Throwable t) {
-            failedWithUnknownReason = t;
-        }
-
-        @Override
-        public void retryLater() {
-            retries++;
-        }
-
-        @Override
-        public void updateAndRetryLater(KafkaCommittable committable) {}
-
-        @Override
-        public void signalAlreadyCommitted() {}
-    }
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/sink2/mocks/MockCommitRequest.java b/flink-core/src/test/java/org/apache/flink/api/connector/sink2/mocks/MockCommitRequest.java
new file mode 100644
index 0000000..75a652a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/sink2/mocks/MockCommitRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2.mocks;
+
+import org.apache.flink.api.connector.sink2.Committer;
+
+/**
+ * A simple {@link Committer.CommitRequest} used for testing.
+ *
+ * @param <CommT> committable type
+ */
+public class MockCommitRequest<CommT> implements Committer.CommitRequest<CommT> {
+
+    private final CommT committable;
+    private int retries = 0;
+    private Throwable failedWithUnknownReason;
+
+    public MockCommitRequest(CommT committable) {
+        this.committable = committable;
+    }
+
+    @Override
+    public CommT getCommittable() {
+        return committable;
+    }
+
+    @Override
+    public int getNumberOfRetries() {
+        return retries;
+    }
+
+    @Override
+    public void signalFailedWithKnownReason(Throwable t) {}
+
+    @Override
+    public void signalFailedWithUnknownReason(Throwable t) {
+        failedWithUnknownReason = t;
+    }
+
+    @Override
+    public void retryLater() {
+        retries++;
+    }
+
+    @Override
+    public void updateAndRetryLater(CommT committable) {}
+
+    @Override
+    public void signalAlreadyCommitted() {}
+
+    public Throwable getFailedWithUnknownReason() {
+        return failedWithUnknownReason;
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java
index 8949160..626a395 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
@@ -56,7 +56,7 @@ import java.util.Set;
 
 /**
  * Test implementation of {@link DynamicTableSourceFactory} and {@link DynamicTableSinkFactory} that
- * creates a file source and sink based on {@link SourceProvider} and {@link SinkProvider}.
+ * creates a file source and sink based on {@link SourceProvider} and {@link SinkV2Provider}.
  */
 public class TestFileFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
 
@@ -158,7 +158,7 @@ public class TestFileFactory implements DynamicTableSourceFactory, DynamicTableS
         public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
             final FileSink<RowData> fileSink =
                     FileSink.forRowFormat(path, new RowDataEncoder()).build();
-            return SinkProvider.of(fileSink);
+            return SinkV2Provider.of(fileSink);
         }
 
         @Override