You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/01 01:15:41 UTC

[flink-table-store] branch master updated: [FLINK-28299] Get rid of Sink v2

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new e5a37fd0 [FLINK-28299] Get rid of Sink v2
e5a37fd0 is described below

commit e5a37fd07aa7996b037f88e48b1a19ba5b6ab89d
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Jul 1 09:15:36 2022 +0800

    [FLINK-28299] Get rid of Sink v2
    
    This closes #183
---
 .../CatalogLock.java}                              |  29 +-
 .../table/store/connector/sink/Committable.java    |   8 +-
 .../connector/sink/CommittableSerializer.java      |  14 +-
 .../store/connector/sink/CommittableTypeInfo.java  |  95 ++++++
 .../GlobalCommitter.java => Committer.java}        |  27 +-
 ...mmitterOperator.java => CommitterOperator.java} |  78 +++--
 .../store/connector/sink/FlinkSinkBuilder.java     |  19 +-
 .../connector/sink/PrepareCommitOperator.java      |  63 ++++
 ...oreGlobalCommitter.java => StoreCommitter.java} |  14 +-
 .../store/connector/sink/StoreCompactOperator.java |  66 ++++
 .../store/connector/sink/StoreLocalCommitter.java  |  69 -----
 .../table/store/connector/sink/StoreSink.java      | 175 +++--------
 .../store/connector/sink/StoreSinkCompactor.java   |  69 -----
 .../store/connector/sink/StoreSinkWriter.java      | 152 ----------
 .../store/connector/sink/StoreWriteOperator.java   | 233 ++++++++++++++
 .../table/store/connector/sink/TableStoreSink.java |  44 +--
 .../sink/global/GlobalCommitterOperator.java       |  76 -----
 .../sink/global/GlobalCommittingSink.java          |  46 ---
 .../global/GlobalCommittingSinkTranslator.java     |  78 -----
 .../sink/global/LocalCommitterOperator.java        | 205 -------------
 .../store/connector/source/TableStoreSource.java   |  25 +-
 .../connector/sink/CommittableSerializerTest.java  |  21 +-
 .../store/connector/sink/LogStoreSinkITCase.java   |   9 +-
 .../table/store/connector/sink/StoreSinkTest.java  | 335 ---------------------
 .../table/store/connector/sink/TestFileStore.java  | 278 -----------------
 .../store/connector/sink/TestFileStoreTable.java   | 110 -------
 .../sink/global/GlobalCommitterOperatorTest.java   | 278 -----------------
 .../sink/global/LocalCommitterOperatorTest.java    | 255 ----------------
 .../flink/table/store/log/LogInitContext.java      |  87 ------
 .../flink/table/store/log/LogSinkProvider.java     |  28 +-
 .../table/store/log/LogStoreTableFactory.java      |  12 +-
 .../flink/table/store/log/LogWriteCallback.java    |   2 +-
 .../table/store/table/FileStoreTableFactory.java   |   8 +
 .../sink/LogSinkFunction.java}                     |  21 +-
 .../store/kafka/KafkaLogSerializationSchema.java   |  10 +-
 .../table/store/kafka/KafkaLogSinkProvider.java    |  32 +-
 .../table/store/kafka/KafkaLogStoreFactory.java    |  20 +-
 .../flink/table/store/kafka/KafkaSinkFunction.java |  80 +++++
 .../flink/table/store/kafka/KafkaLogITCase.java    |  28 +-
 .../store/kafka/KafkaLogSerializationTest.java     |   4 +-
 .../flink/table/store/kafka/KafkaLogTestUtils.java |   8 +-
 .../table/store/kafka/TestOffsetsLogSink.java      | 105 -------
 42 files changed, 738 insertions(+), 2578 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/StatefulPrecommittingSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CatalogLock.java
similarity index 54%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/StatefulPrecommittingSinkWriter.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CatalogLock.java
index f4266d82..08b5b520 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/StatefulPrecommittingSinkWriter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CatalogLock.java
@@ -16,18 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.connector;
+package org.apache.flink.table.store.connector.sink;
 
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.connector.sink.Committable;
+import org.apache.flink.annotation.Internal;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.concurrent.Callable;
 
 /**
- * The base interface for file store sink writers.
- *
- * @param <WriterStateT> The type of the writer's state.
+ * An interface that allows source and sink to use global lock to some transaction-related things.
  */
-public interface StatefulPrecommittingSinkWriter<WriterStateT>
-        extends StatefulSink.StatefulSinkWriter<RowData, WriterStateT>,
-                TwoPhaseCommittingSink.PrecommittingSinkWriter<RowData, Committable> {}
+@Internal
+public interface CatalogLock extends Closeable {
+
+    /** Run with catalog lock. The caller should tell catalog the database and table name. */
+    <T> T runWithLock(String database, String table, Callable<T> callable) throws Exception;
+
+    /** Factory to create {@link CatalogLock}. */
+    interface Factory extends Serializable {
+        CatalogLock create();
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
index cea1b359..158287a7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committable.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.connector.sink;
 
-/** Committable produced by {@link StoreSinkWriter}. */
+/** Committable produced by {@link PrepareCommitOperator}. */
 public class Committable {
 
     private final Kind kind;
@@ -41,9 +41,7 @@ public class Committable {
     enum Kind {
         FILE((byte) 0),
 
-        LOG((byte) 1),
-
-        LOG_OFFSET((byte) 2);
+        LOG_OFFSET((byte) 1);
 
         private final byte value;
 
@@ -60,8 +58,6 @@ public class Committable {
                 case 0:
                     return FILE;
                 case 1:
-                    return LOG;
-                case 2:
                     return LOG_OFFSET;
                 default:
                     throw new UnsupportedOperationException(
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
index 9f6dc1d1..b5e2ed77 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableSerializer.java
@@ -29,13 +29,8 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
 
     private final FileCommittableSerializer fileCommittableSerializer;
 
-    private final SimpleVersionedSerializer<Object> logCommittableSerializer;
-
-    public CommittableSerializer(
-            FileCommittableSerializer fileCommittableSerializer,
-            SimpleVersionedSerializer<Object> logCommittableSerializer) {
+    public CommittableSerializer(FileCommittableSerializer fileCommittableSerializer) {
         this.fileCommittableSerializer = fileCommittableSerializer;
-        this.logCommittableSerializer = logCommittableSerializer;
     }
 
     @Override
@@ -54,10 +49,6 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
                         fileCommittableSerializer.serialize(
                                 (FileCommittable) committable.wrappedCommittable());
                 break;
-            case LOG:
-                version = logCommittableSerializer.getVersion();
-                wrapped = logCommittableSerializer.serialize(committable.wrappedCommittable());
-                break;
             case LOG_OFFSET:
                 version = 1;
                 wrapped = ((LogOffsetCommittable) committable.wrappedCommittable()).toBytes();
@@ -86,9 +77,6 @@ public class CommittableSerializer implements SimpleVersionedSerializer<Committa
             case FILE:
                 wrappedCommittable = fileCommittableSerializer.deserialize(version, wrapped);
                 break;
-            case LOG:
-                wrappedCommittable = logCommittableSerializer.deserialize(version, wrapped);
-                break;
             case LOG_OFFSET:
                 wrappedCommittable = LogOffsetCommittable.fromBytes(wrapped);
                 break;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableTypeInfo.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableTypeInfo.java
new file mode 100644
index 00000000..dcc2cd67
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommittableTypeInfo.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+
+/** Type information of {@link Committable}. */
+public class CommittableTypeInfo extends TypeInformation<Committable> {
+
+    @Override
+    public boolean isBasicType() {
+        return false;
+    }
+
+    @Override
+    public boolean isTupleType() {
+        return false;
+    }
+
+    @Override
+    public int getArity() {
+        return 1;
+    }
+
+    @Override
+    public int getTotalFields() {
+        return 1;
+    }
+
+    @Override
+    public Class<Committable> getTypeClass() {
+        return Committable.class;
+    }
+
+    @Override
+    public boolean isKeyType() {
+        return false;
+    }
+
+    @Override
+    public TypeSerializer<Committable> createSerializer(ExecutionConfig config) {
+        // no copy, so that data from writer is directly going into committer while chaining
+        return new SimpleVersionedSerializerTypeSerializerProxy<Committable>(
+                () -> new CommittableSerializer(new FileCommittableSerializer())) {
+            @Override
+            public Committable copy(Committable from) {
+                return from;
+            }
+
+            @Override
+            public Committable copy(Committable from, Committable reuse) {
+                return from;
+            }
+        };
+    }
+
+    @Override
+    public int hashCode() {
+        return 0;
+    }
+
+    @Override
+    public boolean canEqual(Object obj) {
+        return obj instanceof CommittableTypeInfo;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof CommittableTypeInfo;
+    }
+
+    @Override
+    public String toString() {
+        return "CommittableTypeInfo";
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committer.java
similarity index 56%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitter.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committer.java
index 90eee7be..b7d97d55 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/Committer.java
@@ -17,29 +17,30 @@
  *
  */
 
-package org.apache.flink.table.store.connector.sink.global;
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 
 import java.io.IOException;
 import java.util.List;
 
 /**
- * The {@code GlobalCommitter} is responsible for creating and committing an aggregated committable,
- * which we call global committable (see {@link #combine}).
- *
- * <p>The {@code GlobalCommitter} runs with parallelism equal to 1.
+ * The {@code Committer} is responsible for creating and committing an aggregated committable, which
+ * we call committable (see {@link #combine}).
  *
- * @param <CommT> The type of information needed to commit data staged by the sink
- * @param <GlobalCommT> The type of the aggregated committable
+ * <p>The {@code Committer} runs with parallelism equal to 1.
  */
-public interface GlobalCommitter<CommT, GlobalCommT> extends AutoCloseable {
+public interface Committer extends AutoCloseable {
 
     /** Find out which global committables need to be retried when recovering from the failure. */
-    List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables)
-            throws IOException;
+    List<ManifestCommittable> filterRecoveredCommittables(
+            List<ManifestCommittable> globalCommittables) throws IOException;
 
     /** Compute an aggregated committable from a list of committables. */
-    GlobalCommT combine(long checkpointId, List<CommT> committables) throws IOException;
+    ManifestCommittable combine(long checkpointId, List<Committable> committables)
+            throws IOException;
 
-    /** Commits the given {@link GlobalCommT}. */
-    void commit(List<GlobalCommT> globalCommittables) throws IOException, InterruptedException;
+    /** Commits the given {@link ManifestCommittable}. */
+    void commit(List<ManifestCommittable> globalCommittables)
+            throws IOException, InterruptedException;
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
similarity index 68%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
index fad3e8df..fa7f60a7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/AbstractCommitterOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/CommitterOperator.java
@@ -15,23 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.connector.sink.global;
+package org.apache.flink.table.store.connector.sink;
 
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.util.function.SerializableSupplier;
 
 import java.util.ArrayDeque;
@@ -41,16 +39,16 @@ import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 
-/** An operator that processes committables of a {@link Sink}. */
-public abstract class AbstractCommitterOperator<IN, CommT>
-        extends AbstractStreamOperator<CommittableMessage<IN>>
-        implements OneInputStreamOperator<CommittableMessage<IN>, CommittableMessage<IN>>,
-                BoundedOneInput {
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Committer operator to commit {@link Committable}. */
+public class CommitterOperator extends AbstractStreamOperator<Committable>
+        implements OneInputStreamOperator<Committable, Committable>, BoundedOneInput {
 
     private static final long serialVersionUID = 1L;
 
     /** Record all the inputs until commit. */
-    private final Deque<IN> inputs = new ArrayDeque<>();
+    private final Deque<Committable> inputs = new ArrayDeque<>();
 
     /** The operator's state descriptor. */
     private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
@@ -58,43 +56,64 @@ public abstract class AbstractCommitterOperator<IN, CommT>
                     "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
 
     /** Group the committable by the checkpoint id. */
-    private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
+    private final NavigableMap<Long, ManifestCommittable> committablesPerCheckpoint;
 
     /** The committable's serializer. */
-    private final SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializer;
+    private final SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
+            committableSerializer;
 
     /** The operator's state. */
-    private ListState<CommT> streamingCommitterState;
+    private ListState<ManifestCommittable> streamingCommitterState;
+
+    private final SerializableSupplier<Committer> committerFactory;
 
-    public AbstractCommitterOperator(
-            SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializer) {
+    /**
+     * Aggregate committables to global committables and commit the global committables to the
+     * external system.
+     */
+    private Committer committer;
+
+    public CommitterOperator(
+            SerializableSupplier<Committer> committerFactory,
+            SerializableSupplier<SimpleVersionedSerializer<ManifestCommittable>>
+                    committableSerializer) {
         this.committableSerializer = committableSerializer;
         this.committablesPerCheckpoint = new TreeMap<>();
+        this.committerFactory = checkNotNull(committerFactory);
         setChainingStrategy(ChainingStrategy.ALWAYS);
     }
 
     @Override
     public void initializeState(StateInitializationContext context) throws Exception {
         super.initializeState(context);
+        committer = committerFactory.get();
         streamingCommitterState =
                 new SimpleVersionedListState<>(
                         context.getOperatorStateStore()
                                 .getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
                         committableSerializer.get());
-        List<CommT> restored = new ArrayList<>();
+        List<ManifestCommittable> restored = new ArrayList<>();
         streamingCommitterState.get().forEach(restored::add);
         streamingCommitterState.clear();
         commit(true, restored);
     }
 
-    public abstract void commit(boolean isRecover, List<CommT> committables) throws Exception;
+    public void commit(boolean isRecover, List<ManifestCommittable> committables) throws Exception {
+        if (isRecover) {
+            committables = committer.filterRecoveredCommittables(committables);
+        }
+        committer.commit(committables);
+    }
 
-    public abstract List<CommT> toCommittables(long checkpoint, List<IN> inputs) throws Exception;
+    public ManifestCommittable toCommittables(long checkpoint, List<Committable> inputs)
+            throws Exception {
+        return committer.combine(checkpoint, inputs);
+    }
 
     @Override
     public void snapshotState(StateSnapshotContext context) throws Exception {
         super.snapshotState(context);
-        List<IN> poll = pollInputs();
+        List<Committable> poll = pollInputs();
         if (poll.size() > 0) {
             committablesPerCheckpoint.put(
                     context.getCheckpointId(), toCommittables(context.getCheckpointId(), poll));
@@ -102,10 +121,8 @@ public abstract class AbstractCommitterOperator<IN, CommT>
         streamingCommitterState.update(committables(committablesPerCheckpoint));
     }
 
-    private List<CommT> committables(NavigableMap<Long, List<CommT>> map) {
-        List<CommT> committables = new ArrayList<>();
-        map.values().forEach(committables::addAll);
-        return committables;
+    private List<ManifestCommittable> committables(NavigableMap<Long, ManifestCommittable> map) {
+        return new ArrayList<>(map.values());
     }
 
     @Override
@@ -118,7 +135,7 @@ public abstract class AbstractCommitterOperator<IN, CommT>
         // 5. this.notifyCheckpointComplete(5)
         // So we should submit all the data in the endInput in order to avoid disordered commits.
         long checkpointId = Long.MAX_VALUE;
-        List<IN> poll = pollInputs();
+        List<Committable> poll = pollInputs();
         if (!poll.isEmpty()) {
             committablesPerCheckpoint.put(checkpointId, toCommittables(checkpointId, poll));
         }
@@ -132,19 +149,16 @@ public abstract class AbstractCommitterOperator<IN, CommT>
     }
 
     private void commitUpToCheckpoint(long checkpointId) throws Exception {
-        NavigableMap<Long, List<CommT>> headMap =
+        NavigableMap<Long, ManifestCommittable> headMap =
                 committablesPerCheckpoint.headMap(checkpointId, true);
         commit(false, committables(headMap));
         headMap.clear();
     }
 
     @Override
-    public void processElement(StreamRecord<CommittableMessage<IN>> element) {
+    public void processElement(StreamRecord<Committable> element) {
         output.collect(element);
-        CommittableMessage<IN> message = element.getValue();
-        if (message instanceof CommittableWithLineage) {
-            this.inputs.add(((CommittableWithLineage<IN>) message).getCommittable());
-        }
+        this.inputs.add(element.getValue());
     }
 
     @Override
@@ -154,8 +168,8 @@ public abstract class AbstractCommitterOperator<IN, CommT>
         super.close();
     }
 
-    private List<IN> pollInputs() {
-        List<IN> poll = new ArrayList<>(this.inputs);
+    private List<Committable> pollInputs() {
+        List<Committable> poll = new ArrayList<>(this.inputs);
         this.inputs.clear();
         return poll;
     }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
index 8215d475..91141ece 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSinkBuilder.java
@@ -22,15 +22,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.table.catalog.CatalogLock;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.connector.TableStoreFactoryOptions;
-import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSinkTranslator;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
-import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
 
 import javax.annotation.Nullable;
 
@@ -46,7 +44,7 @@ public class FlinkSinkBuilder {
     private DataStream<RowData> input;
     @Nullable private CatalogLock.Factory lockFactory;
     @Nullable private Map<String, String> overwritePartition;
-    @Nullable private LogSinkProvider logSinkProvider;
+    @Nullable private LogSinkFunction logSinkFunction;
     @Nullable private Integer parallelism;
 
     public FlinkSinkBuilder(ObjectIdentifier tableIdentifier, FileStoreTable table) {
@@ -70,8 +68,8 @@ public class FlinkSinkBuilder {
         return this;
     }
 
-    public FlinkSinkBuilder withLogSinkProvider(LogSinkProvider logSinkProvider) {
-        this.logSinkProvider = logSinkProvider;
+    public FlinkSinkBuilder withLogSinkFunction(@Nullable LogSinkFunction logSinkFunction) {
+        this.logSinkFunction = logSinkFunction;
         return this;
     }
 
@@ -101,16 +99,15 @@ public class FlinkSinkBuilder {
             partitioned.setParallelism(parallelism);
         }
 
-        StoreSink<?, ?> sink =
-                new StoreSink<>(
+        StoreSink sink =
+                new StoreSink(
                         tableIdentifier,
                         table,
                         conf.get(TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED),
                         getCompactPartSpec(),
                         lockFactory,
                         overwritePartition,
-                        logSinkProvider);
-        return GlobalCommittingSinkTranslator.translate(
-                new DataStream<>(input.getExecutionEnvironment(), partitioned), sink);
+                        logSinkFunction);
+        return sink.sinkTo(new DataStream<>(input.getExecutionEnvironment(), partitioned));
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
new file mode 100644
index 00000000..7d686d9e
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Prepare commit operator to emit {@link Committable}s. */
+public abstract class PrepareCommitOperator extends AbstractStreamOperator<Committable>
+        implements OneInputStreamOperator<RowData, Committable>, BoundedOneInput {
+
+    private boolean endOfInput = false;
+
+    public PrepareCommitOperator() {
+        setChainingStrategy(ChainingStrategy.ALWAYS);
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {}
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        if (!endOfInput) {
+            emitCommittables();
+        }
+        // no records are expected to emit after endOfInput
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        endOfInput = true;
+        emitCommittables();
+    }
+
+    private void emitCommittables() throws IOException {
+        prepareCommit().forEach(committable -> output.collect(new StreamRecord<>(committable)));
+    }
+
+    protected abstract List<Committable> prepareCommit() throws IOException;
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
similarity index 83%
rename from flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
rename to flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
index eaa2c9c9..0d35d4ad 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.connector.sink;
 
-import org.apache.flink.table.catalog.CatalogLock;
-import org.apache.flink.table.store.connector.sink.global.GlobalCommitter;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.TableCommit;
@@ -29,14 +27,14 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 
-/** {@link GlobalCommitter} for dynamic store. */
-public class StoreGlobalCommitter implements GlobalCommitter<Committable, ManifestCommittable> {
+/** {@link Committer} for dynamic store. */
+public class StoreCommitter implements Committer {
 
     private final TableCommit commit;
 
     @Nullable private final CatalogLock lock;
 
-    public StoreGlobalCommitter(TableCommit commit, @Nullable CatalogLock lock) {
+    public StoreCommitter(TableCommit commit, @Nullable CatalogLock lock) {
         this.commit = commit;
         this.lock = lock;
     }
@@ -55,8 +53,7 @@ public class StoreGlobalCommitter implements GlobalCommitter<Committable, Manife
     }
 
     @Override
-    public ManifestCommittable combine(long checkpointId, List<Committable> committables)
-            throws IOException {
+    public ManifestCommittable combine(long checkpointId, List<Committable> committables) {
         ManifestCommittable fileCommittable = new ManifestCommittable(String.valueOf(checkpointId));
         for (Committable committable : committables) {
             switch (committable.kind()) {
@@ -70,9 +67,6 @@ public class StoreGlobalCommitter implements GlobalCommitter<Committable, Manife
                             (LogOffsetCommittable) committable.wrappedCommittable();
                     fileCommittable.addLogOffset(offset.bucket(), offset.offset());
                     break;
-                case LOG:
-                    // log should be committed in local committer
-                    break;
             }
         }
         return fileCommittable;
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
new file mode 100644
index 00000000..8ad6bf07
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.TableCompact;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** A dedicated operator for manual triggered compaction. */
+public class StoreCompactOperator extends PrepareCommitOperator {
+
+    private final FileStoreTable table;
+
+    @Nullable private final Map<String, String> compactPartitionSpec;
+
+    private TableCompact compact;
+
+    public StoreCompactOperator(
+            FileStoreTable table, @Nullable Map<String, String> compactPartitionSpec) {
+        this.table = table;
+        this.compactPartitionSpec = compactPartitionSpec;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        int task = getRuntimeContext().getIndexOfThisSubtask();
+        int numTask = getRuntimeContext().getNumberOfParallelSubtasks();
+        compact = table.newCompact();
+        compact.withPartitions(
+                compactPartitionSpec == null ? Collections.emptyMap() : compactPartitionSpec);
+        compact.withFilter(
+                (partition, bucket) -> task == Math.abs(Objects.hash(partition, bucket) % numTask));
+    }
+
+    @Override
+    protected List<Committable> prepareCommit() throws IOException {
+        return compact.compact().stream()
+                .map(c -> new Committable(Committable.Kind.FILE, c))
+                .collect(Collectors.toList());
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreLocalCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreLocalCommitter.java
deleted file mode 100644
index d3bda6ab..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreLocalCommitter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.api.connector.sink2.Committer;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Objects;
-
-import static org.apache.flink.table.store.connector.sink.global.LocalCommitterOperator.convertCommitRequest;
-
-/** Store local {@link Committer} to commit log sink. */
-public class StoreLocalCommitter<LogCommT> implements Committer<Committable> {
-
-    @Nullable private final Committer<LogCommT> logCommitter;
-
-    public StoreLocalCommitter(@Nullable Committer<LogCommT> logCommitter) {
-        this.logCommitter = logCommitter;
-    }
-
-    @Override
-    public void commit(Collection<CommitRequest<Committable>> requests)
-            throws IOException, InterruptedException {
-        List<CommitRequest<LogCommT>> logRequests = new ArrayList<>();
-        for (CommitRequest<Committable> request : requests) {
-            if (request.getCommittable().kind() == Committable.Kind.LOG) {
-                //noinspection unchecked
-                logRequests.add(
-                        convertCommitRequest(
-                                request,
-                                committable -> (LogCommT) committable.wrappedCommittable(),
-                                committable -> new Committable(Committable.Kind.LOG, committable)));
-            }
-        }
-
-        if (logRequests.size() > 0) {
-            Objects.requireNonNull(logCommitter, "logCommitter should not be null.");
-            logCommitter.commit(logRequests);
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (logCommitter != null) {
-            logCommitter.close();
-        }
-    }
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index eb60b409..3a8ed3a3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -18,45 +18,33 @@
 
 package org.apache.flink.table.store.connector.sink;
 
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
-import org.apache.flink.table.store.connector.sink.global.GlobalCommittingSink;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
 import org.apache.flink.table.store.file.operation.Lock;
-import org.apache.flink.table.store.log.LogInitContext;
-import org.apache.flink.table.store.log.LogSinkProvider;
-import org.apache.flink.table.store.log.LogWriteCallback;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.SinkRecord;
-import org.apache.flink.table.store.table.sink.TableCompact;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Collection;
-import java.util.Collections;
+import java.io.Serializable;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.Callable;
-import java.util.function.Consumer;
 
-/** {@link Sink} of dynamic store. */
-public class StoreSink<WriterStateT, LogCommT>
-        implements StatefulSink<RowData, WriterStateT>,
-                GlobalCommittingSink<RowData, Committable, ManifestCommittable> {
+/** Sink of dynamic store. */
+public class StoreSink implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    private static final String WRITER_NAME = "Writer";
+
+    private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
+
     private final ObjectIdentifier tableIdentifier;
 
     private final FileStoreTable table;
@@ -69,7 +57,7 @@ public class StoreSink<WriterStateT, LogCommT>
 
     @Nullable private final Map<String, String> overwritePartition;
 
-    @Nullable private final LogSinkProvider logSinkProvider;
+    @Nullable private final LogSinkFunction logSinkFunction;
 
     public StoreSink(
             ObjectIdentifier tableIdentifier,
@@ -78,100 +66,24 @@ public class StoreSink<WriterStateT, LogCommT>
             @Nullable Map<String, String> compactPartitionSpec,
             @Nullable CatalogLock.Factory lockFactory,
             @Nullable Map<String, String> overwritePartition,
-            @Nullable LogSinkProvider logSinkProvider) {
+            @Nullable LogSinkFunction logSinkFunction) {
         this.tableIdentifier = tableIdentifier;
         this.table = table;
         this.compactionTask = compactionTask;
         this.compactPartitionSpec = compactPartitionSpec;
         this.lockFactory = lockFactory;
         this.overwritePartition = overwritePartition;
-        this.logSinkProvider = logSinkProvider;
-    }
-
-    @Override
-    public StatefulPrecommittingSinkWriter<WriterStateT> createWriter(InitContext initContext)
-            throws IOException {
-        return restoreWriter(initContext, null);
+        this.logSinkFunction = logSinkFunction;
     }
 
-    @Override
-    public StatefulPrecommittingSinkWriter<WriterStateT> restoreWriter(
-            InitContext initContext, Collection<WriterStateT> states) throws IOException {
+    private OneInputStreamOperator<RowData, Committable> createWriteOperator() {
         if (compactionTask) {
-            return createCompactWriter(initContext);
-        }
-        SinkWriter<SinkRecord> logWriter = null;
-        LogWriteCallback logCallback = null;
-        if (logSinkProvider != null) {
-            logCallback = new LogWriteCallback();
-            Consumer<?> metadataConsumer = logSinkProvider.createMetadataConsumer(logCallback);
-            LogInitContext logInitContext = new LogInitContext(initContext, metadataConsumer);
-            Sink<SinkRecord> logSink = logSinkProvider.createSink();
-            logWriter =
-                    states == null
-                            ? logSink.createWriter(logInitContext)
-                            : ((StatefulSink<SinkRecord, WriterStateT>) logSink)
-                                    .restoreWriter(logInitContext, states);
-        }
-        return new StoreSinkWriter<>(
-                table.newWrite().withOverwrite(overwritePartition != null), logWriter, logCallback);
-    }
-
-    private StoreSinkCompactor<WriterStateT> createCompactWriter(InitContext initContext) {
-        int task = initContext.getSubtaskId();
-        int numTask = initContext.getNumberOfParallelSubtasks();
-        TableCompact tableCompact = table.newCompact();
-        tableCompact.withPartitions(
-                compactPartitionSpec == null ? Collections.emptyMap() : compactPartitionSpec);
-        tableCompact.withFilter(
-                (partition, bucket) -> task == Math.abs(Objects.hash(partition, bucket) % numTask));
-        return new StoreSinkCompactor<>(tableCompact);
-    }
-
-    @Override
-    public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
-        return logSinkProvider == null
-                ? new NoOutputSerializer<>()
-                : ((StatefulSink<SinkRecord, WriterStateT>) logSinkProvider.createSink())
-                        .getWriterStateSerializer();
-    }
-
-    @Nullable
-    private Committer<LogCommT> logCommitter() {
-        if (logSinkProvider != null) {
-            Sink<SinkRecord> sink = logSinkProvider.createSink();
-            if (sink instanceof TwoPhaseCommittingSink) {
-                try {
-                    return ((TwoPhaseCommittingSink<SinkRecord, LogCommT>) sink).createCommitter();
-                } catch (IOException e) {
-                    throw new UncheckedIOException(e);
-                }
-            }
+            return new StoreCompactOperator(table, compactPartitionSpec);
         }
-
-        return null;
+        return new StoreWriteOperator(table, overwritePartition, logSinkFunction);
     }
 
-    @Nullable
-    private SimpleVersionedSerializer<LogCommT> logCommitSerializer() {
-        if (logSinkProvider != null) {
-            Sink<SinkRecord> sink = logSinkProvider.createSink();
-            if (sink instanceof TwoPhaseCommittingSink) {
-                return ((TwoPhaseCommittingSink<SinkRecord, LogCommT>) sink)
-                        .getCommittableSerializer();
-            }
-        }
-
-        return null;
-    }
-
-    @Override
-    public Committer<Committable> createCommitter() {
-        return new StoreLocalCommitter<>(logCommitter());
-    }
-
-    @Override
-    public StoreGlobalCommitter createGlobalCommitter() {
+    private StoreCommitter createCommitter() {
         CatalogLock catalogLock;
         Lock lock;
         if (lockFactory == null) {
@@ -191,40 +103,25 @@ public class StoreSink<WriterStateT, LogCommT>
                     };
         }
 
-        return new StoreGlobalCommitter(
+        return new StoreCommitter(
                 table.newCommit().withOverwritePartition(overwritePartition).withLock(lock),
                 catalogLock);
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public SimpleVersionedSerializer<Committable> getCommittableSerializer() {
-        return new CommittableSerializer(
-                fileCommitSerializer(), (SimpleVersionedSerializer<Object>) logCommitSerializer());
-    }
-
-    @Override
-    public ManifestCommittableSerializer getGlobalCommittableSerializer() {
-        return new ManifestCommittableSerializer();
-    }
-
-    private FileCommittableSerializer fileCommitSerializer() {
-        return new FileCommittableSerializer();
-    }
-
-    private static class NoOutputSerializer<T> implements SimpleVersionedSerializer<T> {
-        private NoOutputSerializer() {}
-
-        public int getVersion() {
-            return 1;
-        }
-
-        public byte[] serialize(T obj) {
-            throw new IllegalStateException("Should not serialize anything");
-        }
-
-        public T deserialize(int version, byte[] serialized) {
-            throw new IllegalStateException("Should not deserialize anything");
-        }
+    public DataStreamSink<?> sinkTo(DataStream<RowData> input) {
+        CommittableTypeInfo typeInfo = new CommittableTypeInfo();
+        SingleOutputStreamOperator<Committable> written =
+                input.transform(WRITER_NAME, typeInfo, createWriteOperator())
+                        .setParallelism(input.getParallelism());
+
+        SingleOutputStreamOperator<?> committed =
+                written.transform(
+                                GLOBAL_COMMITTER_NAME,
+                                typeInfo,
+                                new CommitterOperator(
+                                        this::createCommitter, ManifestCommittableSerializer::new))
+                        .setParallelism(1)
+                        .setMaxParallelism(1);
+        return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
     }
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
deleted file mode 100644
index 4a79a8c6..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
-import org.apache.flink.table.store.table.sink.TableCompact;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
-/** A dedicated {@link SinkWriter} for manual triggered compaction. */
-public class StoreSinkCompactor<WriterStateT>
-        implements StatefulPrecommittingSinkWriter<WriterStateT> {
-
-    private final TableCompact tableCompact;
-
-    public StoreSinkCompactor(TableCompact tableCompact) {
-        this.tableCompact = tableCompact;
-    }
-
-    @Override
-    public void flush(boolean endOfInput) {
-        // nothing to flush
-    }
-
-    @Override
-    public void write(RowData element, Context context) throws IOException, InterruptedException {
-        // nothing to write
-    }
-
-    @Override
-    public void close() throws Exception {
-        // nothing to close
-    }
-
-    @Override
-    public List<WriterStateT> snapshotState(long checkpointId) {
-        // nothing to snapshot
-        return Collections.emptyList();
-    }
-
-    @Override
-    public Collection<Committable> prepareCommit() {
-        return tableCompact.compact().stream()
-                .map(c -> new Committable(Committable.Kind.FILE, c))
-                .collect(Collectors.toList());
-    }
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
deleted file mode 100644
index c3c6a7db..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.log.LogWriteCallback;
-import org.apache.flink.table.store.table.sink.AbstractTableWrite;
-import org.apache.flink.table.store.table.sink.FileCommittable;
-import org.apache.flink.table.store.table.sink.SinkRecord;
-import org.apache.flink.table.store.table.sink.SinkRecordConverter;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/** A {@link SinkWriter} for dynamic store. */
-public class StoreSinkWriter<WriterStateT>
-        implements StatefulPrecommittingSinkWriter<WriterStateT> {
-
-    private final TableWrite write;
-
-    @Nullable private final SinkWriter<SinkRecord> logWriter;
-
-    @Nullable private final LogWriteCallback logCallback;
-
-    private final ExecutorService compactExecutor;
-
-    public StoreSinkWriter(
-            TableWrite write,
-            @Nullable SinkWriter<SinkRecord> logWriter,
-            @Nullable LogWriteCallback logCallback) {
-        this.write = write;
-        this.logWriter = logWriter;
-        this.logCallback = logCallback;
-        this.compactExecutor =
-                Executors.newSingleThreadScheduledExecutor(
-                        new ExecutorThreadFactory("compaction-thread"));
-    }
-
-    @Override
-    public void write(RowData rowData, Context context) throws IOException, InterruptedException {
-        SinkRecord record;
-        try {
-            record = write.write(rowData);
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-
-        // write to log store, need to preserve original pk (which includes partition fields)
-        if (logWriter != null) {
-            SinkRecordConverter converter = write.recordConverter();
-            logWriter.write(converter.convertToLogSinkRecord(record), context);
-        }
-    }
-
-    @Override
-    public void flush(boolean endOfInput) throws IOException, InterruptedException {
-        if (logWriter != null) {
-            logWriter.flush(endOfInput);
-        }
-    }
-
-    @Override
-    public List<WriterStateT> snapshotState(long checkpointId) throws IOException {
-        if (logWriter != null && logWriter instanceof StatefulSinkWriter) {
-            return ((StatefulSinkWriter<?, WriterStateT>) logWriter).snapshotState(checkpointId);
-        }
-        return Collections.emptyList();
-    }
-
-    @Override
-    public List<Committable> prepareCommit() throws IOException, InterruptedException {
-        List<Committable> committables = new ArrayList<>();
-        try {
-            for (FileCommittable committable : write.prepareCommit()) {
-                committables.add(new Committable(Committable.Kind.FILE, committable));
-            }
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-
-        if (logWriter != null) {
-            if (logWriter instanceof PrecommittingSinkWriter) {
-                Collection<?> logCommittables =
-                        ((PrecommittingSinkWriter<?, ?>) logWriter).prepareCommit();
-                for (Object logCommittable : logCommittables) {
-                    committables.add(new Committable(Committable.Kind.LOG, logCommittable));
-                }
-            }
-
-            Objects.requireNonNull(logCallback, "logCallback should not be null.");
-            logCallback
-                    .offsets()
-                    .forEach(
-                            (k, v) ->
-                                    committables.add(
-                                            new Committable(
-                                                    Committable.Kind.LOG_OFFSET,
-                                                    new LogOffsetCommittable(k, v))));
-        }
-        return committables;
-    }
-
-    @Override
-    public void close() throws Exception {
-        this.compactExecutor.shutdownNow();
-        write.close();
-
-        if (logWriter != null) {
-            logWriter.close();
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    @VisibleForTesting
-    Map<BinaryRowData, Map<Integer, RecordWriter<?>>> writers() {
-        return ((AbstractTableWrite) write).writers();
-    }
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
new file mode 100644
index 00000000..6679ca21
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.log.LogWriteCallback;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.TableWrite;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** A {@link PrepareCommitOperator} to write records. */
+public class StoreWriteOperator extends PrepareCommitOperator {
+
+    private final FileStoreTable table;
+
+    @Nullable private final Map<String, String> overwritePartition;
+
+    @Nullable private final LogSinkFunction logSinkFunction;
+
+    private transient SimpleContext sinkContext;
+
+    /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
+    private long currentWatermark = Long.MIN_VALUE;
+
+    private TableWrite write;
+
+    @Nullable private LogWriteCallback logCallback;
+
+    public StoreWriteOperator(
+            FileStoreTable table,
+            @Nullable Map<String, String> overwritePartition,
+            @Nullable LogSinkFunction logSinkFunction) {
+        this.table = table;
+        this.overwritePartition = overwritePartition;
+        this.logSinkFunction = logSinkFunction;
+    }
+
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<Committable>> output) {
+        super.setup(containingTask, config, output);
+        if (logSinkFunction != null) {
+            FunctionUtils.setFunctionRuntimeContext(logSinkFunction, getRuntimeContext());
+        }
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws Exception {
+        super.initializeState(context);
+        if (logSinkFunction != null) {
+            StreamingFunctionUtils.restoreFunctionState(context, logSinkFunction);
+        }
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.write = table.newWrite().withOverwrite(overwritePartition != null);
+        this.sinkContext = new SimpleContext(getProcessingTimeService());
+        if (logSinkFunction != null) {
+            FunctionUtils.openFunction(logSinkFunction, new Configuration());
+            logCallback = new LogWriteCallback();
+            logSinkFunction.setWriteCallback(logCallback);
+        }
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        super.processWatermark(mark);
+        this.currentWatermark = mark.getTimestamp();
+        if (logSinkFunction != null) {
+            logSinkFunction.writeWatermark(
+                    new org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp()));
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception {
+        sinkContext.timestamp = element.hasTimestamp() ? element.getTimestamp() : null;
+
+        SinkRecord record;
+        try {
+            record = write.write(element.getValue());
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+
+        if (logSinkFunction != null) {
+            // write to log store, need to preserve original pk (which includes partition fields)
+            SinkRecord logRecord = write.recordConverter().convertToLogSinkRecord(record);
+            logSinkFunction.invoke(logRecord, sinkContext);
+        }
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        if (logSinkFunction != null) {
+            StreamingFunctionUtils.snapshotFunctionState(
+                    context, getOperatorStateBackend(), logSinkFunction);
+        }
+    }
+
+    @Override
+    public void finish() throws Exception {
+        super.finish();
+        if (logSinkFunction != null) {
+            logSinkFunction.finish();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (write != null) {
+            write.close();
+        }
+
+        if (logSinkFunction != null) {
+            FunctionUtils.closeFunction(logSinkFunction);
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        super.notifyCheckpointComplete(checkpointId);
+
+        if (logSinkFunction instanceof CheckpointListener) {
+            ((CheckpointListener) logSinkFunction).notifyCheckpointComplete(checkpointId);
+        }
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        super.notifyCheckpointAborted(checkpointId);
+
+        if (logSinkFunction instanceof CheckpointListener) {
+            ((CheckpointListener) logSinkFunction).notifyCheckpointAborted(checkpointId);
+        }
+    }
+
+    @Override
+    protected List<Committable> prepareCommit() throws IOException {
+        List<Committable> committables = new ArrayList<>();
+        try {
+            for (FileCommittable committable : write.prepareCommit()) {
+                committables.add(new Committable(Committable.Kind.FILE, committable));
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+
+        if (logCallback != null) {
+            logCallback
+                    .offsets()
+                    .forEach(
+                            (k, v) ->
+                                    committables.add(
+                                            new Committable(
+                                                    Committable.Kind.LOG_OFFSET,
+                                                    new LogOffsetCommittable(k, v))));
+        }
+
+        return committables;
+    }
+
+    private class SimpleContext implements SinkFunction.Context {
+
+        @Nullable private Long timestamp;
+
+        private final ProcessingTimeService processingTimeService;
+
+        public SimpleContext(ProcessingTimeService processingTimeService) {
+            this.processingTimeService = processingTimeService;
+        }
+
+        @Override
+        public long currentProcessingTime() {
+            return processingTimeService.getCurrentProcessingTime();
+        }
+
+        @Override
+        public long currentWatermark() {
+            return currentWatermark;
+        }
+
+        @Override
+        public Long timestamp() {
+            return timestamp;
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index 5cf04f8c..b08b81d7 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -18,15 +18,12 @@
 
 package org.apache.flink.table.store.connector.sink;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogLock;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.RequireCatalogLock;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
@@ -40,8 +37,7 @@ import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
 import org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable;
 import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
 import org.apache.flink.types.RowKind;
 
 import javax.annotation.Nullable;
@@ -50,8 +46,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 /** Table sink to create {@link StoreSink}. */
-public class TableStoreSink
-        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning, RequireCatalogLock {
+public class TableStoreSink implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning {
 
     private final ObjectIdentifier tableIdentifier;
     private final FileStoreTable table;
@@ -113,41 +108,15 @@ public class TableStoreSink
     public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         LogSinkProvider logSinkProvider = null;
         if (logStoreTableFactory != null) {
-            logSinkProvider =
-                    logStoreTableFactory.createSinkProvider(
-                            logStoreContext,
-                            new LogStoreTableFactory.SinkContext() {
-                                @Override
-                                public boolean isBounded() {
-                                    return context.isBounded();
-                                }
-
-                                @Override
-                                public <T> TypeInformation<T> createTypeInformation(
-                                        DataType consumedDataType) {
-                                    return context.createTypeInformation(consumedDataType);
-                                }
-
-                                @Override
-                                public <T> TypeInformation<T> createTypeInformation(
-                                        LogicalType consumedLogicalType) {
-                                    return context.createTypeInformation(consumedLogicalType);
-                                }
-
-                                @Override
-                                public DynamicTableSink.DataStructureConverter
-                                        createDataStructureConverter(DataType consumedDataType) {
-                                    return context.createDataStructureConverter(consumedDataType);
-                                }
-                            });
+            logSinkProvider = logStoreTableFactory.createSinkProvider(logStoreContext, context);
         }
 
         Configuration conf = Configuration.fromMap(table.schema().options());
         // Do not sink to log store when overwrite mode
-        final LogSinkProvider finalLogSinkProvider =
+        final LogSinkFunction logSinkFunction =
                 overwrite || conf.get(TableStoreFactoryOptions.COMPACTION_MANUAL_TRIGGERED)
                         ? null
-                        : logSinkProvider;
+                        : (logSinkProvider == null ? null : logSinkProvider.createSink());
         return (DataStreamSinkProvider)
                 (providerContext, dataStream) ->
                         new FlinkSinkBuilder(tableIdentifier, table)
@@ -156,7 +125,7 @@ public class TableStoreSink
                                                 dataStream.getExecutionEnvironment(),
                                                 dataStream.getTransformation()))
                                 .withLockFactory(lockFactory)
-                                .withLogSinkProvider(finalLogSinkProvider)
+                                .withLogSinkFunction(logSinkFunction)
                                 .withOverwritePartition(overwrite ? staticPartitions : null)
                                 .withParallelism(
                                         conf.get(TableStoreFactoryOptions.SINK_PARALLELISM))
@@ -196,7 +165,6 @@ public class TableStoreSink
         this.overwrite = overwrite;
     }
 
-    @Override
     public void setLockFactory(@Nullable CatalogLock.Factory lockFactory) {
         this.lockFactory = lockFactory;
     }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
deleted file mode 100644
index a606bf97..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink.global;
-
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.util.function.SerializableSupplier;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/** An {@link AbstractCommitterOperator} to process global committer. */
-public class GlobalCommitterOperator<CommT, GlobalCommT>
-        extends AbstractCommitterOperator<CommT, GlobalCommT> {
-
-    private static final long serialVersionUID = 1L;
-
-    private final SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>> committerFactory;
-
-    /**
-     * Aggregate committables to global committables and commit the global committables to the
-     * external system.
-     */
-    private GlobalCommitter<CommT, GlobalCommT> committer;
-
-    public GlobalCommitterOperator(
-            SerializableSupplier<GlobalCommitter<CommT, GlobalCommT>> committerFactory,
-            SerializableSupplier<SimpleVersionedSerializer<GlobalCommT>> committableSerializer) {
-        super(committableSerializer);
-        this.committerFactory = checkNotNull(committerFactory);
-    }
-
-    @Override
-    public void initializeState(StateInitializationContext context) throws Exception {
-        committer = committerFactory.get();
-        super.initializeState(context);
-    }
-
-    @Override
-    public void commit(boolean isRecover, List<GlobalCommT> committables)
-            throws IOException, InterruptedException {
-        if (isRecover) {
-            committables = committer.filterRecoveredCommittables(committables);
-        }
-        committer.commit(committables);
-    }
-
-    @Override
-    public List<GlobalCommT> toCommittables(long checkpoint, List<CommT> inputs) throws Exception {
-        return Collections.singletonList(committer.combine(checkpoint, inputs));
-    }
-
-    @Override
-    public void close() throws Exception {
-        committer.close();
-        super.close();
-    }
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
deleted file mode 100644
index 90d97b9f..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSink.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink.global;
-
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-/**
- * A {@link Sink} for exactly-once semantics using a two-phase commit protocol. The {@link Sink}
- * consists of a {@link SinkWriter} that performs the precommits and a {@link GlobalCommitter} that
- * actually commits the data.
- *
- * @param <InputT> The type of the sink's input
- * @param <CommT> The type of the committables.
- * @param <GlobalCommT> The type of the aggregated committable.
- */
-public interface GlobalCommittingSink<InputT, CommT, GlobalCommT>
-        extends TwoPhaseCommittingSink<InputT, CommT> {
-
-    /**
-     * Creates a {@link GlobalCommitter} that permanently makes the previously written data visible
-     * through {@link GlobalCommitter#commit}.
-     */
-    GlobalCommitter<CommT, GlobalCommT> createGlobalCommitter();
-
-    /** Returns the serializer of the global committable type. */
-    SimpleVersionedSerializer<GlobalCommT> getGlobalCommittableSerializer();
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
deleted file mode 100644
index c034276f..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/GlobalCommittingSinkTranslator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink.global;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-
-/** A translator for the {@link GlobalCommittingSink}. */
-public class GlobalCommittingSinkTranslator {
-
-    private static final String WRITER_NAME = "Writer";
-
-    private static final String LOCAL_COMMITTER_NAME = "Local Committer";
-
-    private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
-
-    public static <T, CommT, GlobalCommT> DataStreamSink<?> translate(
-            DataStream<T> input, GlobalCommittingSink<T, CommT, GlobalCommT> sink) {
-        TypeInformation<CommittableMessage<CommT>> commitType =
-                CommittableMessageTypeInfo.of(sink::getCommittableSerializer);
-
-        SingleOutputStreamOperator<CommittableMessage<CommT>> written =
-                input.transform(WRITER_NAME, commitType, new SinkWriterOperatorFactory<>(sink))
-                        .setParallelism(input.getParallelism());
-
-        SingleOutputStreamOperator<CommittableMessage<CommT>> local =
-                written.transform(
-                                LOCAL_COMMITTER_NAME,
-                                commitType,
-                                new LocalCommitterOperator<>(
-                                        () -> {
-                                            try {
-                                                return sink.createCommitter();
-                                            } catch (IOException e) {
-                                                throw new UncheckedIOException(e);
-                                            }
-                                        },
-                                        sink::getCommittableSerializer))
-                        .setParallelism(written.getParallelism());
-
-        SingleOutputStreamOperator<?> committed =
-                local.global()
-                        .transform(
-                                GLOBAL_COMMITTER_NAME,
-                                commitType,
-                                new GlobalCommitterOperator<>(
-                                        sink::createGlobalCommitter,
-                                        sink::getGlobalCommittableSerializer))
-                        .setParallelism(1)
-                        .setMaxParallelism(1);
-        return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
-    }
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator.java
deleted file mode 100644
index 9e7974ce..00000000
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperator.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink.global;
-
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
-import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestState;
-import org.apache.flink.util.function.SerializableSupplier;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.function.Function;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/** A {@link AbstractCommitterOperator} to process local committer. */
-public class LocalCommitterOperator<CommT> extends AbstractCommitterOperator<CommT, CommT> {
-
-    private static final long serialVersionUID = 1L;
-
-    private final SerializableSupplier<Committer<CommT>> committerFactory;
-
-    private Committer<CommT> committer;
-
-    public LocalCommitterOperator(
-            SerializableSupplier<Committer<CommT>> committerFactory,
-            SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializer) {
-        super(committableSerializer);
-        this.committerFactory = checkNotNull(committerFactory);
-    }
-
-    @Override
-    public void initializeState(StateInitializationContext context) throws Exception {
-        committer = committerFactory.get();
-        super.initializeState(context);
-    }
-
-    @Override
-    public void commit(boolean isRecover, List<CommT> committables)
-            throws IOException, InterruptedException {
-        if (committables.isEmpty()) {
-            return;
-        }
-
-        List<CommitRequestImpl> requests = new ArrayList<>(committables.size());
-        for (CommT comm : committables) {
-            requests.add(new CommitRequestImpl(comm));
-        }
-
-        long sleep = 1000;
-        while (true) {
-            // commit
-            requests.forEach(CommitRequestImpl::setSelected);
-            committer.commit(new ArrayList<>(requests));
-            requests.forEach(CommitRequestImpl::setCommittedIfNoError);
-
-            // drain finished
-            requests.removeIf(CommitRequestImpl::isFinished);
-            if (requests.isEmpty()) {
-                return;
-            }
-
-            //noinspection BusyWait
-            Thread.sleep(sleep);
-            sleep *= 2;
-        }
-    }
-
-    @Override
-    public List<CommT> toCommittables(long checkpoint, List<CommT> inputs) {
-        return inputs;
-    }
-
-    @Override
-    public void close() throws Exception {
-        committer.close();
-        super.close();
-    }
-
-    /** {@link CommitRequest} implementation. */
-    public class CommitRequestImpl implements CommitRequest<CommT> {
-
-        private CommT committable;
-        private int numRetries;
-        private CommitRequestState state;
-
-        private CommitRequestImpl(CommT committable) {
-            this.committable = committable;
-            this.state = CommitRequestState.RECEIVED;
-        }
-
-        private boolean isFinished() {
-            return state.isFinalState();
-        }
-
-        @Override
-        public CommT getCommittable() {
-            return this.committable;
-        }
-
-        @Override
-        public int getNumberOfRetries() {
-            return this.numRetries;
-        }
-
-        @Override
-        public void signalFailedWithKnownReason(Throwable t) {
-            this.state = CommitRequestState.FAILED;
-        }
-
-        @Override
-        public void signalFailedWithUnknownReason(Throwable t) {
-            this.state = CommitRequestState.FAILED;
-            throw new IllegalStateException("Failed to commit " + this.committable, t);
-        }
-
-        @Override
-        public void retryLater() {
-            this.state = CommitRequestState.RETRY;
-            ++this.numRetries;
-        }
-
-        @Override
-        public void updateAndRetryLater(CommT committable) {
-            this.committable = committable;
-            this.retryLater();
-        }
-
-        @Override
-        public void signalAlreadyCommitted() {
-            this.state = CommitRequestState.COMMITTED;
-        }
-
-        void setSelected() {
-            state = CommitRequestState.RECEIVED;
-        }
-
-        void setCommittedIfNoError() {
-            if (state == CommitRequestState.RECEIVED) {
-                state = CommitRequestState.COMMITTED;
-            }
-        }
-    }
-
-    /** Convert a {@link CommitRequest} to another type. */
-    public static <CommT, NewT> CommitRequest<NewT> convertCommitRequest(
-            CommitRequest<CommT> request, Function<CommT, NewT> to, Function<NewT, CommT> from) {
-        return new CommitRequest<NewT>() {
-
-            @Override
-            public NewT getCommittable() {
-                return to.apply(request.getCommittable());
-            }
-
-            @Override
-            public int getNumberOfRetries() {
-                return request.getNumberOfRetries();
-            }
-
-            @Override
-            public void signalFailedWithKnownReason(Throwable throwable) {
-                request.signalFailedWithKnownReason(throwable);
-            }
-
-            @Override
-            public void signalFailedWithUnknownReason(Throwable throwable) {
-                request.signalFailedWithUnknownReason(throwable);
-            }
-
-            @Override
-            public void retryLater() {
-                request.retryLater();
-            }
-
-            @Override
-            public void updateAndRetryLater(NewT committable) {
-                request.updateAndRetryLater(from.apply(committable));
-            }
-
-            @Override
-            public void signalAlreadyCommitted() {
-                request.signalAlreadyCommitted();
-            }
-        };
-    }
-}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index d181e426..6c6ea646 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.connector.source;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -46,7 +45,6 @@ import org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable;
 import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
@@ -119,28 +117,7 @@ public class TableStoreSource
         if (logStoreTableFactory != null) {
             logSourceProvider =
                     logStoreTableFactory.createSourceProvider(
-                            logStoreContext,
-                            new LogStoreTableFactory.SourceContext() {
-                                @Override
-                                public <T> TypeInformation<T> createTypeInformation(
-                                        DataType producedDataType) {
-                                    return scanContext.createTypeInformation(producedDataType);
-                                }
-
-                                @Override
-                                public <T> TypeInformation<T> createTypeInformation(
-                                        LogicalType producedLogicalType) {
-                                    return scanContext.createTypeInformation(producedLogicalType);
-                                }
-
-                                @Override
-                                public DataStructureConverter createDataStructureConverter(
-                                        DataType producedDataType) {
-                                    return scanContext.createDataStructureConverter(
-                                            producedDataType);
-                                }
-                            },
-                            projectFields);
+                            logStoreContext, scanContext, projectFields);
         }
 
         FlinkSourceBuilder sourceBuilder =
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
index 98db8f5b..03b5e614 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.connector.sink;
 
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.streaming.runtime.operators.sink.TestSink;
 import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.table.sink.FileCommittable;
 
@@ -36,10 +34,7 @@ public class CommittableSerializerTest {
 
     private final FileCommittableSerializer fileSerializer = new FileCommittableSerializer();
 
-    private final CommittableSerializer serializer =
-            new CommittableSerializer(
-                    fileSerializer,
-                    (SimpleVersionedSerializer) TestSink.StringCommittableSerializer.INSTANCE);
+    private final CommittableSerializer serializer = new CommittableSerializer(fileSerializer);
 
     @Test
     public void testFile() throws IOException {
@@ -71,18 +66,4 @@ public class CommittableSerializerTest {
                                 .wrappedCommittable();
         assertThat(newCommittable).isEqualTo(committable);
     }
-
-    @Test
-    public void testLog() throws IOException {
-        String log = "random_string";
-        String newCommittable =
-                (String)
-                        serializer
-                                .deserialize(
-                                        1,
-                                        serializer.serialize(
-                                                new Committable(Committable.Kind.LOG, log)))
-                                .wrappedCommittable();
-        assertThat(newCommittable).isEqualTo(log);
-    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index 9458e504..201f1692 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -47,8 +47,6 @@ import static org.apache.flink.table.store.connector.FileStoreITCase.buildFileSt
 import static org.apache.flink.table.store.connector.FileStoreITCase.buildStreamEnv;
 import static org.apache.flink.table.store.connector.FileStoreITCase.buildTestSource;
 import static org.apache.flink.table.store.connector.FileStoreITCase.executeAndCollect;
-import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.SINK_CONTEXT;
-import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.SOURCE_CONTEXT;
 import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -112,9 +110,8 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
                         hasPk ? new int[] {2} : new int[0]);
 
         KafkaLogStoreFactory factory = discoverKafkaLogFactory();
-        KafkaLogSinkProvider sinkProvider = factory.createSinkProvider(context, SINK_CONTEXT);
-        KafkaLogSourceProvider sourceProvider =
-                factory.createSourceProvider(context, SOURCE_CONTEXT, null);
+        KafkaLogSinkProvider sinkProvider = factory.createSinkProvider(context, null);
+        KafkaLogSourceProvider sourceProvider = factory.createSourceProvider(context, null, null);
 
         factory.onCreateTable(context, 3, true);
 
@@ -122,7 +119,7 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
             // write
             new FlinkSinkBuilder(IDENTIFIER, table)
                     .withInput(buildTestSource(env, isBatch))
-                    .withLogSinkProvider(sinkProvider)
+                    .withLogSinkFunction(sinkProvider.createSink())
                     .build();
             env.execute();
 
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
deleted file mode 100644
index 561dbce0..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.table.catalog.CatalogLock;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
-import org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.schema.UpdateSchema;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.types.RowKind;
-import org.apache.flink.util.UserCodeClassLoader;
-
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import javax.annotation.Nullable;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.OptionalLong;
-import java.util.concurrent.Callable;
-
-import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link StoreSink}. */
-@RunWith(Parameterized.class)
-public class StoreSinkTest {
-
-    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
-    private final boolean hasPk;
-
-    private final boolean partitioned;
-
-    private final ObjectIdentifier identifier =
-            ObjectIdentifier.of("my_catalog", "my_database", "my_table");
-
-    private final TestLock lock = new TestLock();
-
-    private TestFileStore fileStore;
-    private TestFileStoreTable table;
-
-    public StoreSinkTest(boolean hasPk, boolean partitioned) {
-        this.hasPk = hasPk;
-        this.partitioned = partitioned;
-    }
-
-    @Before
-    public void before() throws Exception {
-        Path path = new Path(tempFolder.newFolder().toURI().toString());
-        TableSchema tableSchema =
-                new SchemaManager(path)
-                        .commitNewVersion(
-                                new UpdateSchema(
-                                        RowType.of(
-                                                new LogicalType[] {
-                                                    new IntType(), new IntType(), new IntType()
-                                                },
-                                                new String[] {"a", "b", "c"}),
-                                        partitioned
-                                                ? Collections.singletonList("a")
-                                                : Collections.emptyList(),
-                                        hasPk ? Arrays.asList("a", "b") : Collections.emptyList(),
-                                        new HashMap<>(),
-                                        ""));
-
-        RowType partitionType = tableSchema.logicalPartitionType();
-        fileStore = new TestFileStore(hasPk, partitionType);
-        table = new TestFileStoreTable(path, fileStore, tableSchema);
-    }
-
-    @Parameterized.Parameters(name = "hasPk-{0}, partitioned-{1}")
-    public static List<Boolean[]> data() {
-        return Arrays.asList(
-                new Boolean[] {true, true},
-                new Boolean[] {true, false},
-                new Boolean[] {false, false},
-                new Boolean[] {false, true});
-    }
-
-    @Test
-    public void testChangelogs() throws Exception {
-        Assume.assumeTrue(hasPk && partitioned);
-        StoreSink<?, ?> sink = newSink(null);
-        writeAndCommit(
-                sink,
-                GenericRowData.ofKind(RowKind.INSERT, 0, 0, 1),
-                GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 0, 2, 3),
-                GenericRowData.ofKind(RowKind.UPDATE_AFTER, 0, 7, 5),
-                GenericRowData.ofKind(RowKind.DELETE, 1, 0, 1));
-        assertThat(fileStore.committedFiles.get(row(1)).get(1))
-                .isEqualTo(Collections.singletonList("DELETE-key-0-value-1/0/1"));
-        assertThat(fileStore.committedFiles.get(row(0)).get(0))
-                .isEqualTo(Collections.singletonList("DELETE-key-2-value-0/2/3"));
-        assertThat(fileStore.committedFiles.get(row(0)).get(1))
-                .isEqualTo(Arrays.asList("INSERT-key-0-value-0/0/1", "INSERT-key-7-value-0/7/5"));
-    }
-
-    @Test
-    public void testNoKeyChangelogs() throws Exception {
-        Assume.assumeTrue(!hasPk && partitioned);
-        StoreSink<?, ?> sink =
-                new StoreSink<>(identifier, table, false, null, () -> lock, new HashMap<>(), null);
-        writeAndCommit(
-                sink,
-                GenericRowData.ofKind(RowKind.INSERT, 0, 0, 1),
-                GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 0, 2, 3),
-                GenericRowData.ofKind(RowKind.UPDATE_AFTER, 0, 4, 5),
-                GenericRowData.ofKind(RowKind.DELETE, 1, 0, 1));
-        assertThat(fileStore.committedFiles.get(row(1)).get(0))
-                .isEqualTo(Collections.singletonList("INSERT-key-1/0/1-value--1"));
-        assertThat(fileStore.committedFiles.get(row(0)).get(0))
-                .isEqualTo(Collections.singletonList("INSERT-key-0/4/5-value-1"));
-        assertThat(fileStore.committedFiles.get(row(0)).get(1))
-                .isEqualTo(Arrays.asList("INSERT-key-0/0/1-value-1", "INSERT-key-0/2/3-value--1"));
-    }
-
-    @Test
-    public void testAppend() throws Exception {
-        Assume.assumeTrue(hasPk && partitioned);
-        StoreSink<?, ?> sink = newSink(null);
-        writeAndAssert(sink);
-
-        writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
-        assertThat(fileStore.committedFiles.get(row(1)).get(0))
-                .isEqualTo(Collections.singletonList("INSERT-key-10-value-1/10/11"));
-        assertThat(fileStore.committedFiles.get(row(0)).get(0))
-                .isEqualTo(Arrays.asList("INSERT-key-2-value-0/2/3", "INSERT-key-8-value-0/8/9"));
-    }
-
-    @Test
-    public void testOverwrite() throws Exception {
-        Assume.assumeTrue(hasPk && partitioned);
-        StoreSink<?, ?> sink = newSink(new HashMap<>());
-        writeAndAssert(sink);
-
-        writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
-        assertThat(fileStore.committedFiles.get(row(1)).get(1)).isNull();
-        assertThat(fileStore.committedFiles.get(row(1)).get(0))
-                .isEqualTo(Collections.singletonList("INSERT-key-10-value-1/10/11"));
-        assertThat(fileStore.committedFiles.get(row(0)).get(0))
-                .isEqualTo(Collections.singletonList("INSERT-key-8-value-0/8/9"));
-    }
-
-    @Test
-    public void testOverwritePartition() throws Exception {
-        Assume.assumeTrue(hasPk && partitioned);
-        HashMap<String, String> partition = new HashMap<>();
-        partition.put("part", "0");
-        StoreSink<?, ?> sink = newSink(partition);
-        writeAndAssert(sink);
-
-        writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
-        assertThat(fileStore.committedFiles.get(row(1)).get(1))
-                .isEqualTo(Collections.singletonList("INSERT-key-0-value-1/0/1"));
-        assertThat(fileStore.committedFiles.get(row(1)).get(0))
-                .isEqualTo(Collections.singletonList("INSERT-key-10-value-1/10/11"));
-        assertThat(fileStore.committedFiles.get(row(0)).get(0))
-                .isEqualTo(Collections.singletonList("INSERT-key-8-value-0/8/9"));
-    }
-
-    private void writeAndAssert(StoreSink<?, ?> sink) throws Exception {
-        writeAndCommit(
-                sink,
-                GenericRowData.of(0, 0, 1),
-                GenericRowData.of(0, 2, 3),
-                GenericRowData.of(0, 7, 5),
-                GenericRowData.of(1, 0, 1));
-        assertThat(fileStore.committedFiles.get(row(1)).get(1))
-                .isEqualTo(Collections.singletonList("INSERT-key-0-value-1/0/1"));
-        assertThat(fileStore.committedFiles.get(row(0)).get(0))
-                .isEqualTo(Collections.singletonList("INSERT-key-2-value-0/2/3"));
-        assertThat(fileStore.committedFiles.get(row(0)).get(1))
-                .isEqualTo(Arrays.asList("INSERT-key-0-value-0/0/1", "INSERT-key-7-value-0/7/5"));
-    }
-
-    private void writeAndCommit(StoreSink<?, ?> sink, RowData... rows) throws Exception {
-        commit(sink, write(sink, rows));
-    }
-
-    private List<Committable> write(StoreSink<?, ?> sink, RowData... rows) throws Exception {
-        StatefulPrecommittingSinkWriter<?> writer = sink.createWriter(null);
-        for (RowData row : rows) {
-            writer.write(row, null);
-        }
-
-        List<Committable> committables = ((StoreSinkWriter) writer).prepareCommit();
-        Map<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> writers =
-                new HashMap<>(((StoreSinkWriter) writer).writers());
-        assertThat(writers.size()).isGreaterThan(0);
-
-        writer.close();
-        writers.forEach(
-                (part, map) ->
-                        map.forEach(
-                                (bucket, recordWriter) ->
-                                        assertThat(((TestRecordWriter) recordWriter).closed)
-                                                .isTrue()));
-        return committables;
-    }
-
-    private void commit(StoreSink<?, ?> sink, List<Committable> fileCommittables) throws Exception {
-        StoreGlobalCommitter committer = sink.createGlobalCommitter();
-        ManifestCommittable committable = committer.combine(0, fileCommittables);
-
-        fileStore.expired = false;
-        lock.locked = false;
-        committer.commit(Collections.singletonList(committable));
-        assertThat(fileStore.expired).isTrue();
-        assertThat(lock.locked).isTrue();
-
-        assertThat(
-                        committer
-                                .filterRecoveredCommittables(Collections.singletonList(committable))
-                                .size())
-                .isEqualTo(0);
-
-        lock.closed = false;
-        committer.close();
-        assertThat(lock.closed).isTrue();
-    }
-
-    private StoreSink<?, ?> newSink(@Nullable Map<String, String> overwritePartition) {
-        return new StoreSink<>(
-                identifier, table, false, null, () -> lock, overwritePartition, null);
-    }
-
-    private class TestLock implements CatalogLock {
-
-        private boolean locked = false;
-
-        private boolean closed = false;
-
-        @Override
-        public <T> T runWithLock(String database, String table, Callable<T> callable)
-                throws Exception {
-            assertThat(database).isEqualTo(identifier.getDatabaseName());
-            assertThat(table).isEqualTo(identifier.getObjectName());
-            locked = true;
-            return callable.call();
-        }
-
-        @Override
-        public void close() {
-            closed = true;
-        }
-    }
-
-    private Sink.InitContext initContext() {
-        return new Sink.InitContext() {
-            @Override
-            public UserCodeClassLoader getUserCodeClassLoader() {
-                return null;
-            }
-
-            @Override
-            public MailboxExecutor getMailboxExecutor() {
-                return null;
-            }
-
-            @Override
-            public ProcessingTimeService getProcessingTimeService() {
-                return null;
-            }
-
-            @Override
-            public int getSubtaskId() {
-                return 0;
-            }
-
-            @Override
-            public int getNumberOfParallelSubtasks() {
-                return 1;
-            }
-
-            @Override
-            public SinkWriterMetricGroup metricGroup() {
-                return null;
-            }
-
-            @Override
-            public OptionalLong getRestoredCheckpointId() {
-                return OptionalLong.empty();
-            }
-
-            @Override
-            public SerializationSchema.InitializationContext
-                    asSerializationSchemaInitializationContext() {
-                return null;
-            }
-        };
-    }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
deleted file mode 100644
index aa8be1f0..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.manifest.ManifestCommittable;
-import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
-import org.apache.flink.table.store.file.operation.FileStoreCommit;
-import org.apache.flink.table.store.file.operation.FileStoreExpire;
-import org.apache.flink.table.store.file.operation.FileStoreRead;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.operation.FileStoreWrite;
-import org.apache.flink.table.store.file.operation.Lock;
-import org.apache.flink.table.store.file.stats.StatsTestUtils;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.types.logical.RowType;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
-import static org.apache.flink.table.store.file.stats.StatsTestUtils.newEmptyTableStats;
-
-/** Test {@link FileStore}. */
-public class TestFileStore implements FileStore<KeyValue> {
-
-    public final Set<ManifestCommittable> committed = new HashSet<>();
-
-    public final Map<BinaryRowData, Map<Integer, List<String>>> committedFiles = new HashMap<>();
-
-    public final boolean hasPk;
-    private final RowType partitionType;
-
-    public boolean expired = false;
-
-    public TestFileStore(boolean hasPk, RowType partitionType) {
-        this.hasPk = hasPk;
-        this.partitionType = partitionType;
-    }
-
-    @Override
-    public FileStoreWrite<KeyValue> newWrite() {
-        return new FileStoreWrite<KeyValue>() {
-            @Override
-            public RecordWriter<KeyValue> createWriter(
-                    BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
-                TestRecordWriter writer = new TestRecordWriter(hasPk);
-                writer.records.addAll(
-                        committedFiles
-                                .computeIfAbsent(partition, k -> new HashMap<>())
-                                .computeIfAbsent(bucket, k -> new ArrayList<>()));
-                committedFiles.get(partition).remove(bucket);
-                return writer;
-            }
-
-            @Override
-            public RecordWriter<KeyValue> createEmptyWriter(
-                    BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
-                return new TestRecordWriter(hasPk);
-            }
-
-            @Override
-            public Callable<CompactResult> createCompactWriter(
-                    BinaryRowData partition,
-                    int bucket,
-                    @Nullable List<DataFileMeta> compactFiles) {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-
-    @Override
-    public FileStoreRead<KeyValue> newRead() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public FileStoreCommit newCommit() {
-        return new TestCommit();
-    }
-
-    @Override
-    public FileStoreExpire newExpire() {
-        return new FileStoreExpire() {
-            @Override
-            public FileStoreExpire withLock(Lock lock) {
-                return this;
-            }
-
-            @Override
-            public void expire() {
-                expired = true;
-            }
-        };
-    }
-
-    @Override
-    public RowType partitionType() {
-        return partitionType;
-    }
-
-    @Override
-    public FileStoreScan newScan() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public SnapshotManager snapshotManager() {
-        throw new UnsupportedOperationException();
-    }
-
-    static class TestRecordWriter implements RecordWriter<KeyValue> {
-
-        final List<String> records = new ArrayList<>();
-        final boolean hasPk;
-
-        boolean closed = false;
-
-        TestRecordWriter(boolean hasPk) {
-            this.hasPk = hasPk;
-        }
-
-        private String rowToString(RowData row, boolean key) {
-            StringBuilder builder = new StringBuilder();
-            for (int i = 0; i < row.getArity(); i++) {
-                if (i != 0) {
-                    builder.append("/");
-                }
-                if (key) {
-                    builder.append(row.getInt(i));
-                } else {
-                    if (i < row.getArity() - 1) {
-                        builder.append(row.getInt(i));
-                    } else {
-                        builder.append(hasPk ? row.getInt(i) : row.getLong(i));
-                    }
-                }
-            }
-            return builder.toString();
-        }
-
-        @Override
-        public void write(KeyValue kv) {
-            if (!hasPk) {
-                assert kv.value().getArity() == 1;
-                assert kv.value().getLong(0) >= -1L;
-            }
-            records.add(
-                    kv.valueKind().toString()
-                            + "-key-"
-                            + rowToString(kv.key(), true)
-                            + "-value-"
-                            + rowToString(kv.value(), false));
-        }
-
-        @Override
-        public Increment prepareCommit() {
-            List<DataFileMeta> newFiles =
-                    records.stream()
-                            .map(
-                                    s ->
-                                            new DataFileMeta(
-                                                    s,
-                                                    0,
-                                                    0,
-                                                    null,
-                                                    null,
-                                                    StatsTestUtils.newEmptyTableStats(),
-                                                    newEmptyTableStats(3),
-                                                    0,
-                                                    0,
-                                                    0,
-                                                    0))
-                            .collect(Collectors.toList());
-            return new Increment(newFiles, Collections.emptyList(), Collections.emptyList());
-        }
-
-        @Override
-        public void sync() {}
-
-        @Override
-        public List<DataFileMeta> close() {
-            closed = true;
-            return Collections.emptyList();
-        }
-    }
-
-    class TestCommit implements FileStoreCommit {
-
-        Lock lock;
-
-        @Override
-        public FileStoreCommit withLock(Lock lock) {
-            this.lock = lock;
-            return this;
-        }
-
-        @Override
-        public List<ManifestCommittable> filterCommitted(
-                List<ManifestCommittable> committableList) {
-            return committableList.stream()
-                    .filter(c -> !committed.contains(c))
-                    .collect(Collectors.toList());
-        }
-
-        @Override
-        public void commit(ManifestCommittable committable, Map<String, String> properties) {
-            try {
-                lock.runWithLock(() -> committed.add(committable));
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-            committable
-                    .newFiles()
-                    .forEach(
-                            (part, bMap) ->
-                                    bMap.forEach(
-                                            (bucket, files) -> {
-                                                List<String> committed =
-                                                        committedFiles
-                                                                .computeIfAbsent(
-                                                                        part, k -> new HashMap<>())
-                                                                .computeIfAbsent(
-                                                                        bucket,
-                                                                        k -> new ArrayList<>());
-                                                files.stream()
-                                                        .map(DataFileMeta::fileName)
-                                                        .forEach(committed::add);
-                                            }));
-        }
-
-        @Override
-        public void overwrite(
-                Map<String, String> partition,
-                ManifestCommittable committable,
-                Map<String, String> properties) {
-            if (partition.isEmpty()) {
-                committedFiles.clear();
-            } else {
-                BinaryRowData partRow = row(Integer.parseInt(partition.get("part")));
-                committedFiles.remove(partRow);
-            }
-            commit(committable, properties);
-        }
-    }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
deleted file mode 100644
index 52b7febd..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.schema.TableSchema;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.AbstractTableWrite;
-import org.apache.flink.table.store.table.sink.SinkRecord;
-import org.apache.flink.table.store.table.sink.SinkRecordConverter;
-import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableCompact;
-import org.apache.flink.table.store.table.sink.TableWrite;
-import org.apache.flink.table.store.table.source.TableRead;
-import org.apache.flink.table.store.table.source.TableScan;
-import org.apache.flink.types.RowKind;
-
-/** {@link FileStoreTable} for tests. */
-public class TestFileStoreTable implements FileStoreTable {
-
-    private final Path path;
-    private final TestFileStore store;
-    private final TableSchema tableSchema;
-
-    public TestFileStoreTable(Path path, TestFileStore store, TableSchema tableSchema) {
-        this.path = path;
-        this.store = store;
-        this.tableSchema = tableSchema;
-    }
-
-    @Override
-    public Path location() {
-        return path;
-    }
-
-    @Override
-    public TableSchema schema() {
-        return tableSchema;
-    }
-
-    @Override
-    public SnapshotManager snapshotManager() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public TableScan newScan() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public TableRead newRead() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public TableWrite newWrite() {
-        return new AbstractTableWrite<KeyValue>(
-                store.newWrite(), new SinkRecordConverter(2, tableSchema)) {
-            @Override
-            protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
-                    throws Exception {
-                boolean isInsert =
-                        record.row().getRowKind() == RowKind.INSERT
-                                || record.row().getRowKind() == RowKind.UPDATE_AFTER;
-                KeyValue kv = new KeyValue();
-                if (store.hasPk) {
-                    kv.replace(
-                            record.primaryKey(),
-                            isInsert ? RowKind.INSERT : RowKind.DELETE,
-                            record.row());
-                } else {
-                    kv.replace(
-                            record.row(), RowKind.INSERT, GenericRowData.of(isInsert ? 1L : -1L));
-                }
-                writer.write(kv);
-            }
-        };
-    }
-
-    @Override
-    public TableCommit newCommit() {
-        return new TableCommit(store.newCommit(), store.newExpire());
-    }
-
-    @Override
-    public TableCompact newCompact() {
-        throw new UnsupportedOperationException();
-    }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
deleted file mode 100644
index 17cbba1f..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/GlobalCommitterOperatorTest.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink.global;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import org.apache.flink.streaming.runtime.operators.sink.TestSink.StringCommittableSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.util.function.SerializableSupplier;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link GlobalCommitterOperator}. */
-public class GlobalCommitterOperatorTest {
-
-    @Test
-    public void closeCommitter() throws Exception {
-        final DefaultGlobalCommitter globalCommitter = new DefaultGlobalCommitter();
-        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> testHarness =
-                createTestHarness(globalCommitter);
-        testHarness.initializeEmptyState();
-        testHarness.open();
-        testHarness.close();
-        assertThat(globalCommitter.isClosed()).isTrue();
-    }
-
-    @Test
-    public void restoredFromMergedState() throws Exception {
-
-        final List<String> input1 = Arrays.asList("host", "drop");
-        final OperatorSubtaskState operatorSubtaskState1 =
-                buildSubtaskState(createTestHarness(), input1);
-
-        final List<String> input2 = Arrays.asList("future", "evil", "how");
-        final OperatorSubtaskState operatorSubtaskState2 =
-                buildSubtaskState(createTestHarness(), input2);
-
-        final DefaultGlobalCommitter globalCommitter = new DefaultGlobalCommitter();
-        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> testHarness =
-                createTestHarness(globalCommitter);
-
-        final OperatorSubtaskState mergedOperatorSubtaskState =
-                OneInputStreamOperatorTestHarness.repackageState(
-                        operatorSubtaskState1, operatorSubtaskState2);
-
-        testHarness.initializeState(
-                OneInputStreamOperatorTestHarness.repartitionOperatorState(
-                        mergedOperatorSubtaskState, 2, 2, 1, 0));
-        testHarness.open();
-
-        final List<String> expectedOutput = new ArrayList<>();
-        expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input1));
-        expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input2));
-
-        testHarness.snapshot(1L, 1L);
-        testHarness.notifyOfCompletedCheckpoint(1L);
-        testHarness.close();
-
-        assertThat(globalCommitter.getCommittedData())
-                .containsExactlyInAnyOrder(expectedOutput.toArray(new String[0]));
-    }
-
-    @Test
-    public void commitMultipleStagesTogether() throws Exception {
-
-        final DefaultGlobalCommitter globalCommitter = new DefaultGlobalCommitter();
-
-        final List<String> input1 = Arrays.asList("cautious", "nature");
-        final List<String> input2 = Arrays.asList("count", "over");
-        final List<String> input3 = Arrays.asList("lawyer", "grammar");
-
-        final List<String> expectedOutput = new ArrayList<>();
-
-        expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input1));
-        expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input2));
-        expectedOutput.add(DefaultGlobalCommitter.COMBINER.apply(input3));
-
-        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> testHarness =
-                createTestHarness(globalCommitter);
-        testHarness.initializeEmptyState();
-        testHarness.open();
-
-        testHarness.processElements(committableRecords(input1));
-        testHarness.snapshot(1L, 1L);
-        testHarness.processElements(committableRecords(input2));
-        testHarness.snapshot(2L, 2L);
-        testHarness.processElements(committableRecords(input3));
-        testHarness.snapshot(3L, 3L);
-
-        testHarness.notifyOfCompletedCheckpoint(3L);
-
-        testHarness.close();
-
-        assertThat(globalCommitter.getCommittedData())
-                .containsExactlyInAnyOrder(expectedOutput.toArray(new String[0]));
-    }
-
-    @Test
-    public void filterRecoveredCommittables() throws Exception {
-        final List<String> input = Arrays.asList("silent", "elder", "patience");
-        final String successCommittedCommittable = DefaultGlobalCommitter.COMBINER.apply(input);
-
-        final OperatorSubtaskState operatorSubtaskState =
-                buildSubtaskState(createTestHarness(), input);
-        final DefaultGlobalCommitter globalCommitter =
-                new DefaultGlobalCommitter(successCommittedCommittable);
-
-        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> testHarness =
-                createTestHarness(globalCommitter);
-
-        // all data from previous checkpoint are expected to be committed,
-        // so we expect no data to be re-committed.
-        testHarness.initializeState(operatorSubtaskState);
-        testHarness.open();
-        testHarness.snapshot(1L, 1L);
-        testHarness.notifyOfCompletedCheckpoint(1L);
-        assertThat(globalCommitter.getCommittedData()).isEmpty();
-        testHarness.close();
-    }
-
-    @Test
-    public void endOfInput() throws Exception {
-        final DefaultGlobalCommitter globalCommitter = new DefaultGlobalCommitter();
-
-        final OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> testHarness =
-                createTestHarness(globalCommitter);
-        testHarness.initializeEmptyState();
-        testHarness.open();
-        List<String> input = Arrays.asList("silent", "elder", "patience");
-        testHarness.processElements(committableRecords(input));
-        testHarness.endInput();
-        testHarness.close();
-        assertThat(globalCommitter.getCommittedData()).contains("elder+patience+silent");
-    }
-
-    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> createTestHarness()
-            throws Exception {
-        return createTestHarness(new DefaultGlobalCommitter());
-    }
-
-    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> createTestHarness(
-            GlobalCommitter<String, String> globalCommitter) throws Exception {
-        return new OneInputStreamOperatorTestHarness<>(
-                new GlobalCommitterOperator<>(
-                        () -> globalCommitter, () -> StringCommittableSerializer.INSTANCE),
-                CommittableMessageTypeInfo.of(
-                                (SerializableSupplier<SimpleVersionedSerializer<String>>)
-                                        () -> StringCommittableSerializer.INSTANCE)
-                        .createSerializer(new ExecutionConfig()));
-    }
-
-    public static OperatorSubtaskState buildSubtaskState(
-            OneInputStreamOperatorTestHarness<CommittableMessage<String>, ?> testHarness,
-            List<String> input)
-            throws Exception {
-        testHarness.initializeEmptyState();
-        testHarness.open();
-        testHarness.processElements(
-                input.stream()
-                        .map(GlobalCommitterOperatorTest::toCommittableMessage)
-                        .map(StreamRecord::new)
-                        .collect(Collectors.toList()));
-        testHarness.prepareSnapshotPreBarrier(1L);
-        OperatorSubtaskState operatorSubtaskState = testHarness.snapshot(1L, 1L);
-        testHarness.close();
-        return operatorSubtaskState;
-    }
-
-    static List<StreamRecord<CommittableMessage<String>>> committableRecords(
-            Collection<String> elements) {
-        return elements.stream()
-                .map(GlobalCommitterOperatorTest::toCommittableMessage)
-                .map(StreamRecord::new)
-                .collect(Collectors.toList());
-    }
-
-    private static CommittableMessage<String> toCommittableMessage(String input) {
-        return new CommittableWithLineage<>(input, null, -1);
-    }
-
-    /** A {@link GlobalCommitter} that always commits global committables successfully. */
-    private static class DefaultGlobalCommitter implements GlobalCommitter<String, String> {
-
-        private static final Function<List<String>, String> COMBINER =
-                strings -> {
-                    // we sort here because we want to have a deterministic result during the unit
-                    // test
-                    Collections.sort(strings);
-                    return String.join("+", strings);
-                };
-
-        private final Queue<String> committedData;
-
-        private boolean isClosed;
-
-        private final String committedSuccessData;
-
-        DefaultGlobalCommitter() {
-            this("");
-        }
-
-        DefaultGlobalCommitter(String committedSuccessData) {
-            this.committedData = new ConcurrentLinkedQueue<>();
-            this.isClosed = false;
-            this.committedSuccessData = committedSuccessData;
-        }
-
-        @Override
-        public List<String> filterRecoveredCommittables(List<String> globalCommittables) {
-            if (committedSuccessData == null) {
-                return globalCommittables;
-            }
-            return globalCommittables.stream()
-                    .filter(s -> !s.equals(committedSuccessData))
-                    .collect(Collectors.toList());
-        }
-
-        @Override
-        public String combine(long checkpointId, List<String> committables) {
-            return COMBINER.apply(committables);
-        }
-
-        @Override
-        public void commit(List<String> committables) {
-            committedData.addAll(committables);
-        }
-
-        public List<String> getCommittedData() {
-            if (committedData != null) {
-                return new ArrayList<>(committedData);
-            } else {
-                return Collections.emptyList();
-            }
-        }
-
-        @Override
-        public void close() {
-            isClosed = true;
-        }
-
-        public boolean isClosed() {
-            return isClosed;
-        }
-    }
-}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java
deleted file mode 100644
index 528f5dc5..00000000
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/global/LocalCommitterOperatorTest.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.connector.sink.global;
-
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
-import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
-import org.apache.flink.streaming.runtime.operators.sink.TestSink.StringCommittableSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-
-import org.junit.Test;
-
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.table.store.connector.sink.global.GlobalCommitterOperatorTest.buildSubtaskState;
-import static org.apache.flink.table.store.connector.sink.global.GlobalCommitterOperatorTest.committableRecords;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertNotNull;
-
-/** Test the {@link LocalCommitterOperator}. */
-public class LocalCommitterOperatorTest {
-
-    @Test
-    public void supportRetry() throws Exception {
-        final List<String> input = Arrays.asList("lazy", "leaf");
-        final RetryOnceCommitter committer = new RetryOnceCommitter();
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(committer);
-
-        testHarness.initializeEmptyState();
-        testHarness.open();
-        testHarness.processElements(committableRecords(input));
-        testHarness.prepareSnapshotPreBarrier(1);
-        testHarness.snapshot(1L, 1L);
-        testHarness.notifyOfCompletedCheckpoint(1L);
-        testHarness.snapshot(2L, 2L);
-        testHarness.notifyOfCompletedCheckpoint(2L);
-
-        testHarness.close();
-
-        assertThat(committer.getCommittedData()).contains("lazy", "leaf");
-    }
-
-    @Test
-    public void closeCommitter() throws Exception {
-        final DefaultCommitter committer = new DefaultCommitter();
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(committer);
-        testHarness.initializeEmptyState();
-        testHarness.open();
-        testHarness.close();
-        assertThat(committer.isClosed()).isTrue();
-    }
-
-    @Test
-    public void restoredFromMergedState() throws Exception {
-        final List<String> input1 = Arrays.asList("today", "whom");
-        final OperatorSubtaskState operatorSubtaskState1 =
-                buildSubtaskState(createTestHarness(), input1);
-
-        final List<String> input2 = Arrays.asList("future", "evil", "how");
-        final OperatorSubtaskState operatorSubtaskState2 =
-                buildSubtaskState(createTestHarness(), input2);
-
-        final DefaultCommitter committer = new DefaultCommitter();
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(committer);
-
-        final OperatorSubtaskState mergedOperatorSubtaskState =
-                OneInputStreamOperatorTestHarness.repackageState(
-                        operatorSubtaskState1, operatorSubtaskState2);
-
-        testHarness.initializeState(
-                OneInputStreamOperatorTestHarness.repartitionOperatorState(
-                        mergedOperatorSubtaskState, 2, 2, 1, 0));
-        testHarness.open();
-
-        final List<String> expectedOutput = new ArrayList<>();
-        expectedOutput.addAll(input1);
-        expectedOutput.addAll(input2);
-
-        testHarness.prepareSnapshotPreBarrier(1L);
-        testHarness.snapshot(1L, 1L);
-        testHarness.notifyOfCompletedCheckpoint(1);
-
-        testHarness.close();
-
-        assertThat(committer.getCommittedData())
-                .containsExactlyInAnyOrder(expectedOutput.toArray(new String[0]));
-    }
-
-    @Test
-    public void commitMultipleStagesTogether() throws Exception {
-
-        final DefaultCommitter committer = new DefaultCommitter();
-
-        final List<String> input1 = Arrays.asList("cautious", "nature");
-        final List<String> input2 = Arrays.asList("count", "over");
-        final List<String> input3 = Arrays.asList("lawyer", "grammar");
-
-        final List<String> expectedOutput = new ArrayList<>();
-
-        expectedOutput.addAll(input1);
-        expectedOutput.addAll(input2);
-        expectedOutput.addAll(input3);
-
-        final OneInputStreamOperatorTestHarness<
-                        CommittableMessage<String>, CommittableMessage<String>>
-                testHarness = createTestHarness(committer);
-        testHarness.initializeEmptyState();
-        testHarness.open();
-
-        testHarness.processElements(committableRecords(input1));
-        testHarness.prepareSnapshotPreBarrier(1L);
-        testHarness.snapshot(1L, 1L);
-        testHarness.processElements(committableRecords(input2));
-        testHarness.prepareSnapshotPreBarrier(2L);
-        testHarness.snapshot(2L, 2L);
-        testHarness.processElements(committableRecords(input3));
-        testHarness.prepareSnapshotPreBarrier(3L);
-        testHarness.snapshot(3L, 3L);
-
-        testHarness.notifyOfCompletedCheckpoint(1);
-        testHarness.notifyOfCompletedCheckpoint(3);
-
-        testHarness.close();
-
-        assertThat(fromRecords(testHarness.getRecordOutput())).isEqualTo(expectedOutput);
-
-        assertThat(committer.getCommittedData()).isEqualTo(expectedOutput);
-    }
-
-    private static List<String> fromRecords(
-            Collection<StreamRecord<CommittableMessage<String>>> elements) {
-        return elements.stream()
-                .map(StreamRecord::getValue)
-                .filter(message -> message instanceof CommittableWithLineage)
-                .map(message -> ((CommittableWithLineage<String>) message).getCommittable())
-                .collect(Collectors.toList());
-    }
-
-    private OneInputStreamOperatorTestHarness<
-                    CommittableMessage<String>, CommittableMessage<String>>
-            createTestHarness() throws Exception {
-        return createTestHarness(new DefaultCommitter());
-    }
-
-    private OneInputStreamOperatorTestHarness<
-                    CommittableMessage<String>, CommittableMessage<String>>
-            createTestHarness(Committer<String> committer) throws Exception {
-        return new OneInputStreamOperatorTestHarness<>(
-                new LocalCommitterOperator<>(
-                        () -> committer, () -> StringCommittableSerializer.INSTANCE));
-    }
-
-    /** Base class for testing {@link Committer}. */
-    private static class DefaultCommitter implements Committer<String>, Serializable {
-
-        @Nullable protected Queue<String> committedData;
-
-        private boolean isClosed;
-
-        @Nullable private final Supplier<Queue<String>> queueSupplier;
-
-        public DefaultCommitter() {
-            this.committedData = new ConcurrentLinkedQueue<>();
-            this.isClosed = false;
-            this.queueSupplier = null;
-        }
-
-        public List<String> getCommittedData() {
-            if (committedData != null) {
-                return new ArrayList<>(committedData);
-            } else {
-                return Collections.emptyList();
-            }
-        }
-
-        @Override
-        public void close() {
-            isClosed = true;
-        }
-
-        public boolean isClosed() {
-            return isClosed;
-        }
-
-        @Override
-        public void commit(Collection<CommitRequest<String>> requests) {
-            if (committedData == null) {
-                assertNotNull(queueSupplier);
-                committedData = queueSupplier.get();
-            }
-            committedData.addAll(
-                    requests.stream()
-                            .map(CommitRequest::getCommittable)
-                            .collect(Collectors.toList()));
-        }
-    }
-
-    /** A {@link Committer} that always re-commits the committables data it received. */
-    private static class RetryOnceCommitter extends DefaultCommitter implements Committer<String> {
-
-        private final Set<String> seen = new LinkedHashSet<>();
-
-        @Override
-        public void commit(Collection<CommitRequest<String>> requests) {
-            requests.forEach(
-                    c -> {
-                        if (seen.remove(c.getCommittable())) {
-                            checkNotNull(committedData);
-                            committedData.add(c.getCommittable());
-                        } else {
-                            seen.add(c.getCommittable());
-                            c.retryLater();
-                        }
-                    });
-        }
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogInitContext.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogInitContext.java
deleted file mode 100644
index a5983a93..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogInitContext.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.log;
-
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.common.operators.ProcessingTimeService;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.connector.sink2.Sink.InitContext;
-import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
-import org.apache.flink.util.UserCodeClassLoader;
-
-import java.util.Optional;
-import java.util.OptionalLong;
-import java.util.function.Consumer;
-
-/** A {@link InitContext} with {@link InitContext#metadataConsumer()}. */
-public class LogInitContext implements InitContext {
-
-    private final InitContext context;
-    private final Consumer<?> metaConsumer;
-
-    public LogInitContext(InitContext context, Consumer<?> metaConsumer) {
-        this.context = context;
-        this.metaConsumer = metaConsumer;
-    }
-
-    @Override
-    public UserCodeClassLoader getUserCodeClassLoader() {
-        return context.getUserCodeClassLoader();
-    }
-
-    @Override
-    public MailboxExecutor getMailboxExecutor() {
-        return context.getMailboxExecutor();
-    }
-
-    @Override
-    public ProcessingTimeService getProcessingTimeService() {
-        return context.getProcessingTimeService();
-    }
-
-    @Override
-    public int getSubtaskId() {
-        return context.getSubtaskId();
-    }
-
-    @Override
-    public int getNumberOfParallelSubtasks() {
-        return context.getNumberOfParallelSubtasks();
-    }
-
-    @Override
-    public SinkWriterMetricGroup metricGroup() {
-        return context.metricGroup();
-    }
-
-    @Override
-    public OptionalLong getRestoredCheckpointId() {
-        return context.getRestoredCheckpointId();
-    }
-
-    @Override
-    public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
-        return context.asSerializationSchemaInitializationContext();
-    }
-
-    @Override
-    public Optional<Consumer<?>> metadataConsumer() {
-        return Optional.of(metaConsumer);
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
index effa38e8..319324f1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
@@ -18,35 +18,13 @@
 
 package org.apache.flink.table.store.log;
 
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
 
 import java.io.Serializable;
-import java.util.function.Consumer;
 
 /** A {@link Serializable} sink provider for log store. */
 public interface LogSinkProvider extends Serializable {
 
-    /** Creates a {@link Sink} instance. */
-    Sink<SinkRecord> createSink();
-
-    /**
-     * Create a metadata consumer for {@link Sink.InitContext#metadataConsumer()} from {@link
-     * WriteCallback}.
-     */
-    Consumer<?> createMetadataConsumer(WriteCallback callback);
-
-    /**
-     * A callback interface that the user can implement to know the offset of the bucket when the
-     * request is complete.
-     */
-    interface WriteCallback {
-
-        /**
-         * A callback method the user can implement to provide asynchronous handling of request
-         * completion. This method will be called when the record sent to the server has been
-         * acknowledged.
-         */
-        void onCompletion(int bucket, long offset);
-    }
+    /** Creates a {@link LogSinkFunction} instance. */
+    LogSinkFunction createSink();
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
index 22c4e6c7..7032ea06 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -63,19 +63,15 @@ public interface LogStoreTableFactory extends DynamicTableFactory {
      * context information.
      */
     LogSourceProvider createSourceProvider(
-            Context context, SourceContext sourceContext, @Nullable int[][] projectFields);
+            Context context,
+            DynamicTableSource.Context sourceContext,
+            @Nullable int[][] projectFields);
 
     /**
      * Creates a {@link LogSinkProvider} instance from a {@link CatalogTable} and additional context
      * information.
      */
-    LogSinkProvider createSinkProvider(Context context, SinkContext sinkContext);
-
-    /** Context for create runtime source. */
-    interface SourceContext extends DynamicTableSource.Context {}
-
-    /** Context for create runtime sink. */
-    interface SinkContext extends DynamicTableSink.Context {}
+    LogSinkProvider createSinkProvider(Context context, DynamicTableSink.Context sinkContext);
 
     // --------------------------------------------------------------------------------------------
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
index d6c849ea..972d2dc8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogWriteCallback.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.store.log;
 
-import org.apache.flink.table.store.log.LogSinkProvider.WriteCallback;
+import org.apache.flink.table.store.table.sink.LogSinkFunction.WriteCallback;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
index 104ae0bb..e94c38ff 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTableFactory.java
@@ -29,9 +29,17 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+
 /** Factory to create {@link FileStoreTable}. */
 public class FileStoreTableFactory {
 
+    public static FileStoreTable create(Path path) {
+        Configuration conf = new Configuration();
+        conf.set(PATH, path.toString());
+        return create(conf);
+    }
+
     public static FileStoreTable create(Configuration conf) {
         return create(conf, UUID.randomUUID().toString());
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java
similarity index 67%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java
index effa38e8..82d28146 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java
@@ -16,25 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.log;
+package org.apache.flink.table.store.table.sink;
 
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 
-import java.io.Serializable;
-import java.util.function.Consumer;
+/** Log {@link SinkFunction} with {@link WriteCallback}. */
+public interface LogSinkFunction extends SinkFunction<SinkRecord> {
 
-/** A {@link Serializable} sink provider for log store. */
-public interface LogSinkProvider extends Serializable {
-
-    /** Creates a {@link Sink} instance. */
-    Sink<SinkRecord> createSink();
-
-    /**
-     * Create a metadata consumer for {@link Sink.InitContext#metadataConsumer()} from {@link
-     * WriteCallback}.
-     */
-    Consumer<?> createMetadataConsumer(WriteCallback callback);
+    void setWriteCallback(WriteCallback writeCallback);
 
     /**
      * A callback interface that the user can implement to know the offset of the bucket when the
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
index ccc42576..bf0aa4c5 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.kafka;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
 import org.apache.flink.table.store.table.sink.SinkRecord;
@@ -30,7 +31,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import javax.annotation.Nullable;
 
 /** A {@link KafkaRecordSerializationSchema} for the table in log store. */
-public class KafkaLogSerializationSchema implements KafkaRecordSerializationSchema<SinkRecord> {
+public class KafkaLogSerializationSchema implements KafkaSerializationSchema<SinkRecord> {
 
     private static final long serialVersionUID = 1L;
 
@@ -55,9 +56,7 @@ public class KafkaLogSerializationSchema implements KafkaRecordSerializationSche
     }
 
     @Override
-    public void open(
-            SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext)
-            throws Exception {
+    public void open(SerializationSchema.InitializationContext context) throws Exception {
         if (primaryKeySerializer != null) {
             primaryKeySerializer.open(context);
         }
@@ -65,8 +64,7 @@ public class KafkaLogSerializationSchema implements KafkaRecordSerializationSche
     }
 
     @Override
-    public ProducerRecord<byte[], byte[]> serialize(
-            SinkRecord element, KafkaSinkContext context, Long timestamp) {
+    public ProducerRecord<byte[], byte[]> serialize(SinkRecord element, @Nullable Long timestamp) {
         RowKind kind = element.row().getRowKind();
 
         byte[] primaryKeyBytes = null;
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
index 1f1b79b8..7365e44d 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
@@ -20,22 +20,18 @@ package org.apache.flink.table.store.kafka;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.connector.base.DeliveryGuarantee;
-import org.apache.flink.connector.kafka.sink.KafkaSink;
-import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
 import org.apache.flink.table.store.log.LogOptions.LogConsistency;
 import org.apache.flink.table.store.log.LogSinkProvider;
-import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.RecordMetadata;
 
 import javax.annotation.Nullable;
 
 import java.util.Properties;
-import java.util.function.Consumer;
 
 /** A Kafka {@link LogSinkProvider}. */
 public class KafkaLogSinkProvider implements LogSinkProvider {
@@ -70,32 +66,24 @@ public class KafkaLogSinkProvider implements LogSinkProvider {
     }
 
     @Override
-    public KafkaSink<SinkRecord> createSink() {
-        KafkaSinkBuilder<SinkRecord> builder = KafkaSink.builder();
+    public LogSinkFunction createSink() {
+        Semantic semantic;
         switch (consistency) {
             case TRANSACTIONAL:
-                builder.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
-                        .setTransactionalIdPrefix("log-store-" + topic);
+                properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "log-store-" + topic);
+                semantic = Semantic.EXACTLY_ONCE;
                 break;
             case EVENTUAL:
                 if (primaryKeySerializer == null) {
                     throw new IllegalArgumentException(
                             "Can not use EVENTUAL consistency mode for non-pk table.");
                 }
-                builder.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE);
+                semantic = Semantic.AT_LEAST_ONCE;
                 break;
+            default:
+                throw new IllegalArgumentException("Unsupported: " + consistency);
         }
-
-        return builder.setBootstrapServers(
-                        properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString())
-                .setKafkaProducerConfig(properties)
-                .setRecordSerializer(createSerializationSchema())
-                .build();
-    }
-
-    @Override
-    public Consumer<RecordMetadata> createMetadataConsumer(WriteCallback callback) {
-        return meta -> callback.onCompletion(meta.partition(), meta.offset());
+        return new KafkaSinkFunction(topic, createSerializationSchema(), properties, semantic);
     }
 
     @VisibleForTesting
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
index 9828daf8..b247f575 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -25,6 +25,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
@@ -44,6 +47,7 @@ import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -182,13 +186,13 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
     @Override
     public KafkaLogSourceProvider createSourceProvider(
             DynamicTableFactory.Context context,
-            SourceContext sourceContext,
+            DynamicTableSource.Context sourceContext,
             @Nullable int[][] projectFields) {
         FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
         ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
         DataType physicalType = schema.toPhysicalRowDataType();
         DeserializationSchema<RowData> primaryKeyDeserializer = null;
-        int[] primaryKey = schema.getPrimaryKeyIndexes();
+        int[] primaryKey = getPrimaryKeyIndexes(schema);
         if (primaryKey.length > 0) {
             DataType keyType = DataTypeUtils.projectRow(physicalType, primaryKey);
             primaryKeyDeserializer =
@@ -213,12 +217,12 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
 
     @Override
     public KafkaLogSinkProvider createSinkProvider(
-            DynamicTableFactory.Context context, SinkContext sinkContext) {
+            DynamicTableFactory.Context context, DynamicTableSink.Context sinkContext) {
         FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
         ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
         DataType physicalType = schema.toPhysicalRowDataType();
         SerializationSchema<RowData> primaryKeySerializer = null;
-        int[] primaryKey = schema.getPrimaryKeyIndexes();
+        int[] primaryKey = getPrimaryKeyIndexes(schema);
         if (primaryKey.length > 0) {
             DataType keyType = DataTypeUtils.projectRow(physicalType, primaryKey);
             primaryKeySerializer =
@@ -237,6 +241,14 @@ public class KafkaLogStoreFactory implements LogStoreTableFactory {
                 helper.getOptions().get(CHANGELOG_MODE));
     }
 
+    private int[] getPrimaryKeyIndexes(ResolvedSchema schema) {
+        final List<String> columns = schema.getColumnNames();
+        return schema.getPrimaryKey()
+                .map(UniqueConstraint::getColumns)
+                .map(pkColumns -> pkColumns.stream().mapToInt(columns::indexOf).toArray())
+                .orElseGet(() -> new int[] {});
+    }
+
     public static Properties toKafkaProperties(ReadableConfig options) {
         Properties properties = new Properties();
         Map<String, String> optionMap = ((Configuration) options).toMap();
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java
new file mode 100644
index 00000000..587a1813
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.flink.table.store.table.sink.LogSinkFunction;
+import org.apache.flink.table.store.table.sink.LogSinkFunction.WriteCallback;
+import org.apache.flink.table.store.table.sink.SinkRecord;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A {@link FlinkKafkaProducer} which implements {@link LogSinkFunction} to register {@link
+ * WriteCallback}.
+ */
+public class KafkaSinkFunction extends FlinkKafkaProducer<SinkRecord> implements LogSinkFunction {
+
+    private WriteCallback writeCallback;
+
+    /**
+     * Creates a {@link KafkaSinkFunction} for a given topic. The sink produces its input to the
+     * topic. It accepts a {@link KafkaSerializationSchema} for serializing records to a {@link
+     * ProducerRecord}, including partitioning information.
+     *
+     * @param defaultTopic The default topic to write data to
+     * @param serializationSchema A serializable serialization schema for turning user objects into
+     *     a kafka-consumable byte[] supporting key/value messages
+     * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
+     *     the only required argument.
+     * @param semantic Defines semantic that will be used by this producer (see {@link
+     *     KafkaSinkFunction.Semantic}).
+     */
+    public KafkaSinkFunction(
+            String defaultTopic,
+            KafkaSerializationSchema<SinkRecord> serializationSchema,
+            Properties producerConfig,
+            KafkaSinkFunction.Semantic semantic) {
+        super(defaultTopic, serializationSchema, producerConfig, semantic);
+    }
+
+    public void setWriteCallback(WriteCallback writeCallback) {
+        this.writeCallback = writeCallback;
+    }
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        super.open(configuration);
+        Callback baseCallback = requireNonNull(callback);
+        callback =
+                (metadata, exception) -> {
+                    if (writeCallback != null) {
+                        writeCallback.onCompletion(metadata.partition(), metadata.offset());
+                    }
+                    baseCallback.onCompletion(metadata, exception);
+                };
+    }
+}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
index bb8a788f..30c66cfa 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
@@ -34,7 +34,6 @@ import org.junit.jupiter.api.Assertions;
 
 import java.util.Comparator;
 import java.util.List;
-import java.util.UUID;
 
 import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;
 import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.SINK_CONTEXT;
@@ -146,17 +145,16 @@ public class KafkaLogITCase extends KafkaTableTestBase {
             // transactional need to commit
             enableCheckpoint();
 
-            // 1.1 sink
-            String uuid = UUID.randomUUID().toString();
+            // 1 sink
             env.fromElements(
                             testRecord(true, 2, 1, 2, RowKind.DELETE),
                             testRecord(true, 1, 3, 4, RowKind.INSERT),
                             testRecord(true, 0, 5, 6, RowKind.INSERT),
                             testRecord(true, 0, 7, 8, RowKind.INSERT))
-                    .sinkTo(new TestOffsetsLogSink<>(sinkProvider, uuid));
+                    .addSink(sinkProvider.createSink());
             env.execute();
 
-            // 1.2 read
+            // 2 read
             List<RowData> records =
                     collect(
                             factory.createSourceProvider(context, SOURCE_CONTEXT, null)
@@ -174,7 +172,7 @@ public class KafkaLogITCase extends KafkaTableTestBase {
             assertRow(records.get(2), RowKind.INSERT, 5, 6);
             assertRow(records.get(3), RowKind.INSERT, 7, 8);
 
-            // 1.3 read with projection
+            // 3 read with projection
             records =
                     collect(
                             factory.createSourceProvider(
@@ -192,24 +190,6 @@ public class KafkaLogITCase extends KafkaTableTestBase {
             assertValue(records.get(1), RowKind.INSERT, 4);
             assertValue(records.get(2), RowKind.INSERT, 6);
             assertValue(records.get(3), RowKind.INSERT, 8);
-
-            // 2.1 sink
-            env.fromElements(
-                            testRecord(true, 0, 9, 10, RowKind.INSERT),
-                            testRecord(true, 1, 11, 12, RowKind.INSERT),
-                            testRecord(true, 2, 13, 14, RowKind.INSERT))
-                    .sinkTo(new TestOffsetsLogSink<>(sinkProvider, UUID.randomUUID().toString()));
-            env.execute();
-
-            // 2.2 read from offsets
-            records =
-                    collect(
-                            factory.createSourceProvider(context, SOURCE_CONTEXT, null)
-                                    .createSource(TestOffsetsLogSink.drainOffsets(uuid)),
-                            3);
-            assertRow(records.get(0), RowKind.INSERT, 9, 10);
-            assertRow(records.get(1), RowKind.INSERT, 11, 12);
-            assertRow(records.get(2), RowKind.INSERT, 13, 14);
         } finally {
             factory.onDropTable(context, true);
         }
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
index 37e7aeea..35ff9960 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
@@ -84,13 +84,13 @@ public class KafkaLogSerializationTest {
             throws Exception {
         KafkaLogSerializationSchema serializer =
                 createTestSerializationSchema(testContext("", mode, keyed));
-        serializer.open(null, null);
+        serializer.open(null);
         KafkaRecordDeserializationSchema<RowData> deserializer =
                 createTestDeserializationSchema(testContext("", mode, keyed));
         deserializer.open(null);
 
         SinkRecord input = testRecord(keyed, bucket, key, value, rowKind);
-        ProducerRecord<byte[], byte[]> record = serializer.serialize(input, null, null);
+        ProducerRecord<byte[], byte[]> record = serializer.serialize(input, null);
 
         assertThat(record.partition().intValue()).isEqualTo(bucket);
 
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index 4e920f04..ac225429 100644
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -65,8 +65,8 @@ import static org.apache.flink.table.store.log.LogOptions.LogConsistency;
 /** Utils for the test of {@link KafkaLogStoreFactory}. */
 public class KafkaLogTestUtils {
 
-    public static final LogStoreTableFactory.SourceContext SOURCE_CONTEXT =
-            new LogStoreTableFactory.SourceContext() {
+    public static final DynamicTableSource.Context SOURCE_CONTEXT =
+            new DynamicTableSource.Context() {
                 @Override
                 public <T> TypeInformation<T> createTypeInformation(DataType producedDataType) {
                     return createTypeInformation(
@@ -87,8 +87,8 @@ public class KafkaLogTestUtils {
                 }
             };
 
-    public static final LogStoreTableFactory.SinkContext SINK_CONTEXT =
-            new LogStoreTableFactory.SinkContext() {
+    public static final DynamicTableSink.Context SINK_CONTEXT =
+            new DynamicTableSink.Context() {
 
                 @Override
                 public boolean isBounded() {
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
deleted file mode 100644
index fa4dbdd1..00000000
--- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.kafka;
-
-import org.apache.flink.api.connector.sink2.Committer;
-import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.table.store.log.LogInitContext;
-import org.apache.flink.table.store.log.LogSinkProvider;
-import org.apache.flink.table.store.table.sink.SinkRecord;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
-
-/** Test kafka {@link Sink}. */
-public class TestOffsetsLogSink<
-                WriterT extends
-                        StatefulSinkWriter<SinkRecord, WriterStateT>
-                                & PrecommittingSinkWriter<SinkRecord, CommT>,
-                CommT,
-                WriterStateT>
-        implements StatefulSink<SinkRecord, WriterStateT>,
-                TwoPhaseCommittingSink<SinkRecord, CommT> {
-
-    private static final Map<String, Map<Integer, Long>> GLOBAL_OFFSETS = new ConcurrentHashMap<>();
-
-    private final LogSinkProvider sinkProvider;
-    private final String uuid;
-    private final Sink<SinkRecord> sink;
-
-    public TestOffsetsLogSink(LogSinkProvider sinkProvider, String uuid) {
-        this.sinkProvider = sinkProvider;
-        this.uuid = uuid;
-        this.sink = sinkProvider.createSink();
-    }
-
-    public static Map<Integer, Long> drainOffsets(String uuid) {
-        return GLOBAL_OFFSETS.remove(uuid);
-    }
-
-    @Override
-    public WriterT createWriter(InitContext initContext) throws IOException {
-        return (WriterT) sink.createWriter(wrapContext(initContext));
-    }
-
-    private InitContext wrapContext(InitContext initContext) {
-        Consumer<?> consumer =
-                sinkProvider.createMetadataConsumer(
-                        (bucket, offset) -> {
-                            Map<Integer, Long> offsets =
-                                    GLOBAL_OFFSETS.computeIfAbsent(
-                                            uuid, k -> new ConcurrentHashMap<>());
-                            long nextOffset = offset + 1;
-                            offsets.compute(
-                                    bucket,
-                                    (k, v) -> v == null ? nextOffset : Math.max(v, nextOffset));
-                        });
-        return new LogInitContext(initContext, consumer);
-    }
-
-    @Override
-    public StatefulSinkWriter<SinkRecord, WriterStateT> restoreWriter(
-            InitContext initContext, Collection<WriterStateT> collection) throws IOException {
-        return ((StatefulSink<SinkRecord, WriterStateT>) sink)
-                .restoreWriter(wrapContext(initContext), collection);
-    }
-
-    @Override
-    public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
-        return ((StatefulSink<SinkRecord, WriterStateT>) sink).getWriterStateSerializer();
-    }
-
-    @Override
-    public Committer<CommT> createCommitter() throws IOException {
-        return ((TwoPhaseCommittingSink<SinkRecord, CommT>) sink).createCommitter();
-    }
-
-    @Override
-    public SimpleVersionedSerializer<CommT> getCommittableSerializer() {
-        return ((TwoPhaseCommittingSink<SinkRecord, CommT>) sink).getCommittableSerializer();
-    }
-}