You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/01/20 03:23:21 UTC

[flink-table-store] branch master updated: [FLINK-25680] Introduce Table Store Flink Sink

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new beaf4ad  [FLINK-25680] Introduce Table Store Flink Sink
beaf4ad is described below

commit beaf4adb2b3aae44df3334c4a562a927afefd160
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jan 20 11:23:18 2022 +0800

    [FLINK-25680] Introduce Table Store Flink Sink
    
    This closes #10
---
 flink-table-store-connector/pom.xml                | 105 +++++++++
 .../store/connector/sink/LocalCommittable.java     |  71 +++++++
 .../connector/sink/LocalCommittableSerializer.java |  89 ++++++++
 .../table/store/connector/sink/SinkRecord.java     |  71 +++++++
 .../store/connector/sink/SinkRecordConverter.java  |  56 +++++
 .../store/connector/sink/StoreGlobalCommitter.java | 102 +++++++++
 .../table/store/connector/sink/StoreSink.java      | 140 ++++++++++++
 .../store/connector/sink/StoreSinkWriter.java      | 165 +++++++++++++++
 .../store/connector/utils/ProjectionUtils.java     |  63 ++++++
 .../sink/LocalCommittableSerializerTest.java       |  49 +++++
 .../table/store/connector/sink/StoreSinkTest.java  | 235 +++++++++++++++++++++
 .../table/store/connector/sink/TestFileStore.java  | 202 ++++++++++++++++++
 .../manifest/ManifestCommittableSerializer.java    |   5 +-
 .../table/store/file/mergetree/Increment.java      |  20 ++
 .../ManifestCommittableSerializerTest.java         |  20 +-
 pom.xml                                            |   1 +
 16 files changed, 1381 insertions(+), 13 deletions(-)

diff --git a/flink-table-store-connector/pom.xml b/flink-table-store-connector/pom.xml
new file mode 100644
index 0000000..896a793
--- /dev/null
+++ b/flink-table-store-connector/pom.xml
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-connector</artifactId>
+    <name>Flink Table Store : Connector</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- flink dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-core</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittable.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittable.java
new file mode 100644
index 0000000..c6bbeca
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittable.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.mergetree.Increment;
+
+import java.util.Objects;
+
+/** Local committable for sink. */
+public class LocalCommittable {
+
+    private final BinaryRowData partition;
+
+    private final int bucket;
+
+    private final Increment increment;
+
+    public LocalCommittable(BinaryRowData partition, int bucket, Increment increment) {
+        this.partition = partition;
+        this.bucket = bucket;
+        this.increment = increment;
+    }
+
+    public BinaryRowData partition() {
+        return partition;
+    }
+
+    public int bucket() {
+        return bucket;
+    }
+
+    public Increment increment() {
+        return increment;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        LocalCommittable that = (LocalCommittable) o;
+        return bucket == that.bucket
+                && Objects.equals(partition, that.partition)
+                && Objects.equals(increment, that.increment);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(partition, bucket, increment);
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializer.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializer.java
new file mode 100644
index 0000000..a8c698d
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMetaSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** {@link SimpleVersionedSerializer} for {@link LocalCommittable}. */
+public class LocalCommittableSerializer implements SimpleVersionedSerializer<LocalCommittable> {
+
+    private final BinaryRowDataSerializer partSerializer;
+    private final SstFileMetaSerializer sstSerializer;
+
+    public LocalCommittableSerializer(RowType partitionType, RowType keyType, RowType rowType) {
+        this.partSerializer = new BinaryRowDataSerializer(partitionType.getFieldCount());
+        this.sstSerializer = new SstFileMetaSerializer(keyType, rowType);
+    }
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(LocalCommittable obj) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
+        partSerializer.serialize(obj.partition(), view);
+        view.writeInt(obj.bucket());
+        serializeFiles(view, obj.increment().newFiles());
+        serializeFiles(view, obj.increment().compactBefore());
+        serializeFiles(view, obj.increment().compactAfter());
+        return out.toByteArray();
+    }
+
+    private void serializeFiles(DataOutputViewStreamWrapper view, List<SstFileMeta> files)
+            throws IOException {
+        view.writeInt(files.size());
+        for (SstFileMeta file : files) {
+            sstSerializer.serialize(file, view);
+        }
+    }
+
+    private List<SstFileMeta> deserializeFiles(DataInputDeserializer view) throws IOException {
+        int fileNumber = view.readInt();
+        List<SstFileMeta> files = new ArrayList<>(fileNumber);
+        for (int i = 0; i < fileNumber; i++) {
+            files.add(sstSerializer.deserialize(view));
+        }
+        return files;
+    }
+
+    @Override
+    public LocalCommittable deserialize(int version, byte[] serialized) throws IOException {
+        DataInputDeserializer view = new DataInputDeserializer(serialized);
+        return new LocalCommittable(
+                partSerializer.deserialize(view),
+                view.readInt(),
+                new Increment(
+                        deserializeFiles(view), deserializeFiles(view), deserializeFiles(view)));
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/SinkRecord.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/SinkRecord.java
new file mode 100644
index 0000000..dae6e2f
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/SinkRecord.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.types.RowKind;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** A sink records contains key, value and partition, bucket, row kind information. */
+public class SinkRecord {
+
+    private final BinaryRowData partition;
+
+    private final int bucket;
+
+    private final RowKind rowKind;
+
+    private final BinaryRowData key;
+
+    private final RowData row;
+
+    public SinkRecord(
+            BinaryRowData partition, int bucket, RowKind rowKind, BinaryRowData key, RowData row) {
+        checkArgument(partition.getRowKind() == RowKind.INSERT);
+        checkArgument(key.getRowKind() == RowKind.INSERT);
+        checkArgument(row.getRowKind() == RowKind.INSERT);
+        this.partition = partition;
+        this.bucket = bucket;
+        this.rowKind = rowKind;
+        this.key = key;
+        this.row = row;
+    }
+
+    public BinaryRowData partition() {
+        return partition;
+    }
+
+    public int bucket() {
+        return bucket;
+    }
+
+    public BinaryRowData key() {
+        return key;
+    }
+
+    public RowData row() {
+        return row;
+    }
+
+    public RowKind rowKind() {
+        return rowKind;
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/SinkRecordConverter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/SinkRecordConverter.java
new file mode 100644
index 0000000..b5302d6
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/SinkRecordConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.connector.utils.ProjectionUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+/** Converter for converting {@link RowData} to {@link SinkRecord}. */
+public class SinkRecordConverter {
+
+    private final int numBucket;
+
+    private final RowDataSerializer rowSerializer;
+
+    private final Projection<RowData, BinaryRowData> partProjection;
+
+    private final Projection<RowData, BinaryRowData> keyProjection;
+
+    public SinkRecordConverter(int numBucket, RowType inputType, int[] partitions, int[] keys) {
+        this.numBucket = numBucket;
+        this.rowSerializer = new RowDataSerializer(inputType);
+        this.partProjection = ProjectionUtils.newProjection(inputType, partitions);
+        this.keyProjection = ProjectionUtils.newProjection(inputType, keys);
+    }
+
+    public SinkRecord convert(RowData row) {
+        RowKind rowKind = row.getRowKind();
+        row.setRowKind(RowKind.INSERT);
+        BinaryRowData partition = partProjection.apply(row);
+        BinaryRowData key = keyProjection.apply(row);
+        int hash = key.getArity() == 0 ? rowSerializer.toBinaryRow(row).hashCode() : key.hashCode();
+        int bucket = Math.abs(hash % numBucket);
+        return new SinkRecord(partition, bucket, rowKind, key, row);
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
new file mode 100644
index 0000000..e5dde92
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreGlobalCommitter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.FileStoreExpire;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** {@link GlobalCommitter} for dynamic store. */
+public class StoreGlobalCommitter
+        implements GlobalCommitter<LocalCommittable, ManifestCommittable> {
+
+    private final FileStoreCommit fileStoreCommit;
+
+    private final FileStoreExpire fileStoreExpire;
+
+    @Nullable private final CatalogLock lock;
+
+    @Nullable private final Map<String, String> overwritePartition;
+
+    public StoreGlobalCommitter(
+            FileStoreCommit fileStoreCommit,
+            FileStoreExpire fileStoreExpire,
+            @Nullable CatalogLock lock,
+            @Nullable Map<String, String> overwritePartition) {
+        this.fileStoreCommit = fileStoreCommit;
+        this.fileStoreExpire = fileStoreExpire;
+        this.lock = lock;
+        this.overwritePartition = overwritePartition;
+    }
+
+    @Override
+    public List<ManifestCommittable> filterRecoveredCommittables(
+            List<ManifestCommittable> globalCommittables) {
+        return fileStoreCommit.filterCommitted(globalCommittables);
+    }
+
+    @Override
+    public ManifestCommittable combine(List<LocalCommittable> committables) {
+        ManifestCommittable globalCommittable = new ManifestCommittable();
+        committables.forEach(
+                committable ->
+                        globalCommittable.add(
+                                committable.partition(),
+                                committable.bucket(),
+                                committable.increment()));
+        return globalCommittable;
+    }
+
+    @Override
+    public List<ManifestCommittable> commit(List<ManifestCommittable> globalCommittables) {
+        Map<String, String> properties = new HashMap<>();
+        if (overwritePartition == null) {
+            for (ManifestCommittable committable : globalCommittables) {
+                fileStoreCommit.commit(committable, properties);
+            }
+        } else {
+            checkArgument(
+                    globalCommittables.size() == 1, "overwrite is only supported in batch mode.");
+            fileStoreCommit.overwrite(overwritePartition, globalCommittables.get(0), properties);
+        }
+        fileStoreExpire.expire();
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void endOfInput() {}
+
+    @Override
+    public void close() throws Exception {
+        if (lock != null) {
+            lock.close();
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
new file mode 100644
index 0000000..123f073
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.types.logical.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+
+import static org.apache.flink.table.store.connector.utils.ProjectionUtils.project;
+
+/** {@link Sink} of dynamic store. */
+public class StoreSink implements Sink<RowData, LocalCommittable, Void, ManifestCommittable> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final ObjectIdentifier tableIdentifier;
+
+    private final FileStore fileStore;
+
+    private final RowType rowType;
+
+    private final int[] partitions;
+
+    private final int[] keys;
+
+    private final int numBucket;
+
+    @Nullable private final CatalogLock.Factory lockFactory;
+
+    @Nullable private final Map<String, String> overwritePartition;
+
+    public StoreSink(
+            ObjectIdentifier tableIdentifier,
+            FileStore fileStore,
+            RowType rowType,
+            int[] partitions,
+            int[] keys,
+            int numBucket,
+            @Nullable CatalogLock.Factory lockFactory,
+            @Nullable Map<String, String> overwritePartition) {
+        this.tableIdentifier = tableIdentifier;
+        this.fileStore = fileStore;
+        this.rowType = rowType;
+        this.partitions = partitions;
+        this.keys = keys;
+        this.numBucket = numBucket;
+        this.lockFactory = lockFactory;
+        this.overwritePartition = overwritePartition;
+    }
+
+    @Override
+    public StoreSinkWriter createWriter(InitContext initContext, List<Void> list) {
+        SinkRecordConverter recordConverter =
+                new SinkRecordConverter(numBucket, rowType, partitions, keys);
+        return new StoreSinkWriter(
+                fileStore.newWrite(), recordConverter, overwritePartition != null);
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<Void>> getWriterStateSerializer() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<Committer<LocalCommittable>> createCommitter() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalCommitter<LocalCommittable, ManifestCommittable>>
+            createGlobalCommitter() {
+        FileStoreCommit commit = fileStore.newCommit();
+        CatalogLock lock;
+        if (lockFactory == null) {
+            lock = null;
+        } else {
+            lock = lockFactory.create();
+            commit.withLock(
+                    new Lock() {
+                        @Override
+                        public <T> T runWithLock(Callable<T> callable) throws Exception {
+                            return lock.runWithLock(
+                                    tableIdentifier.getDatabaseName(),
+                                    tableIdentifier.getObjectName(),
+                                    callable);
+                        }
+                    });
+        }
+        return Optional.of(
+                new StoreGlobalCommitter(commit, fileStore.newExpire(), lock, overwritePartition));
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<LocalCommittable>> getCommittableSerializer() {
+        return Optional.of(
+                new LocalCommittableSerializer(
+                        project(rowType, partitions), project(rowType, keys), rowType));
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<ManifestCommittable>>
+            getGlobalCommittableSerializer() {
+        return Optional.of(
+                new ManifestCommittableSerializer(
+                        project(rowType, partitions), project(rowType, keys), rowType));
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
new file mode 100644
index 0000000..453b245
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** A {@link SinkWriter} for dynamic store. */
+public class StoreSinkWriter implements SinkWriter<RowData, LocalCommittable, Void> {
+
+    private final FileStoreWrite fileStoreWrite;
+
+    private final SinkRecordConverter recordConverter;
+
+    private final boolean overwrite;
+
+    private final ExecutorService compactExecutor;
+
+    private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
+
+    public StoreSinkWriter(
+            FileStoreWrite fileStoreWrite, SinkRecordConverter recordConverter, boolean overwrite) {
+        this.fileStoreWrite = fileStoreWrite;
+        this.recordConverter = recordConverter;
+        this.overwrite = overwrite;
+        this.compactExecutor = Executors.newSingleThreadScheduledExecutor();
+        this.writers = new HashMap<>();
+    }
+
+    private RecordWriter getWriter(BinaryRowData partition, int bucket) {
+        Map<Integer, RecordWriter> buckets = writers.get(partition);
+        if (buckets == null) {
+            buckets = new HashMap<>();
+            writers.put(partition.copy(), buckets);
+        }
+        return buckets.computeIfAbsent(
+                bucket,
+                k ->
+                        overwrite
+                                ? fileStoreWrite.createEmptyWriter(
+                                        partition, bucket, compactExecutor)
+                                : fileStoreWrite.createWriter(partition, bucket, compactExecutor));
+    }
+
+    @Override
+    public void write(RowData rowData, Context context) throws IOException {
+        RowKind rowKind = rowData.getRowKind();
+        SinkRecord record = recordConverter.convert(rowData);
+        RecordWriter writer = getWriter(record.partition(), record.bucket());
+        try {
+            writeToFileStore(writer, record);
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+        rowData.setRowKind(rowKind);
+    }
+
+    private void writeToFileStore(RecordWriter writer, SinkRecord record) throws Exception {
+        switch (record.rowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                if (record.key().getArity() == 0) {
+                    writer.write(ValueKind.ADD, record.row(), GenericRowData.of(1));
+                } else {
+                    writer.write(ValueKind.ADD, record.key(), record.row());
+                }
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                if (record.key().getArity() == 0) {
+                    writer.write(ValueKind.ADD, record.row(), GenericRowData.of(-1));
+                } else {
+                    writer.write(ValueKind.DELETE, record.key(), record.row());
+                }
+                break;
+        }
+    }
+
+    @Override
+    public List<LocalCommittable> prepareCommit(boolean flush) throws IOException {
+        try {
+            return prepareCommit();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    private List<LocalCommittable> prepareCommit() throws Exception {
+        List<LocalCommittable> committables = new ArrayList<>();
+        for (BinaryRowData partition : writers.keySet()) {
+            Map<Integer, RecordWriter> buckets = writers.get(partition);
+            for (Integer bucket : buckets.keySet()) {
+                RecordWriter writer = buckets.get(bucket);
+                LocalCommittable committable =
+                        new LocalCommittable(partition, bucket, writer.prepareCommit());
+                committables.add(committable);
+
+                // clear if no update
+                // we need a mechanism to clear writers, otherwise there will be more and more
+                // such as yesterday's partition that no longer needs to be written.
+                if (committable.increment().newFiles().isEmpty()) {
+                    closeWriter(writer);
+                    buckets.remove(bucket);
+                }
+            }
+
+            if (buckets.isEmpty()) {
+                writers.remove(partition);
+            }
+        }
+        return committables;
+    }
+
+    private void closeWriter(RecordWriter writer) throws Exception {
+        writer.sync();
+        writer.close();
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.compactExecutor.shutdownNow();
+        for (Map<Integer, RecordWriter> bucketWriters : writers.values()) {
+            for (RecordWriter writer : bucketWriters.values()) {
+                closeWriter(writer);
+            }
+        }
+        writers.clear();
+    }
+
+    @VisibleForTesting
+    Map<BinaryRowData, Map<Integer, RecordWriter>> writers() {
+        return writers;
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/utils/ProjectionUtils.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/utils/ProjectionUtils.java
new file mode 100644
index 0000000..5c135b8
--- /dev/null
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/utils/ProjectionUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.utils;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.binary.BinaryRowDataUtil;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.runtime.generated.Projection;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Utils for {@link Projection}. */
+public class ProjectionUtils {
+
+    public static final Projection<RowData, BinaryRowData> EMPTY_PROJECTION =
+            input -> BinaryRowDataUtil.EMPTY_ROW;
+
+    public static RowType project(RowType inputType, int[] mapping) {
+        List<RowType.RowField> fields = inputType.getFields();
+        return new RowType(
+                Arrays.stream(mapping).mapToObj(fields::get).collect(Collectors.toList()));
+    }
+
+    public static Projection<RowData, BinaryRowData> newProjection(
+            RowType inputType, int[] mapping) {
+        if (mapping.length == 0) {
+            return EMPTY_PROJECTION;
+        }
+
+        @SuppressWarnings("unchecked")
+        Projection<RowData, BinaryRowData> projection =
+                ProjectionCodeGenerator.generateProjection(
+                                CodeGeneratorContext.apply(new TableConfig()),
+                                "Projection",
+                                inputType,
+                                project(inputType, mapping),
+                                mapping)
+                        .newInstance(Thread.currentThread().getContextClassLoader());
+        return projection;
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializerTest.java
new file mode 100644
index 0000000..6fd2ac3
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LocalCommittableSerializerTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest.randomIncrement;
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link LocalCommittableSerializer}. */
+public class LocalCommittableSerializerTest {
+
+    @Test
+    public void test() throws IOException {
+        LocalCommittableSerializer serializer =
+                new LocalCommittableSerializer(
+                        RowType.of(new IntType()),
+                        RowType.of(new IntType()),
+                        RowType.of(new IntType()));
+        Increment increment = randomIncrement();
+        LocalCommittable committable = new LocalCommittable(row(0), 1, increment);
+        LocalCommittable newCommittable =
+                serializer.deserialize(1, serializer.serialize(committable));
+        assertThat(newCommittable).isEqualTo(committable);
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
new file mode 100644
index 0000000..e69ec17
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.catalog.CatalogLock;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link StoreSink}. */
+public class StoreSinkTest {
+
+    private final ObjectIdentifier identifier =
+            ObjectIdentifier.of("my_catalog", "my_database", "my_table");
+
+    private final TestFileStore fileStore = new TestFileStore();
+
+    private final TestLock lock = new TestLock();
+
+    private final RowType rowType = RowType.of(new IntType(), new IntType(), new IntType());
+
+    @Test
+    public void testChangelogs() throws Exception {
+        StoreSink sink = newSink(null);
+        writeAndCommit(
+                sink,
+                GenericRowData.ofKind(RowKind.INSERT, 0, 0, 1),
+                GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 0, 2, 3),
+                GenericRowData.ofKind(RowKind.UPDATE_AFTER, 0, 7, 5),
+                GenericRowData.ofKind(RowKind.DELETE, 1, 0, 1));
+        assertThat(fileStore.committedFiles.get(row(1)).get(1))
+                .isEqualTo(Collections.singletonList("DELETE-key-0-value-1/0/1"));
+        assertThat(fileStore.committedFiles.get(row(0)).get(0))
+                .isEqualTo(Collections.singletonList("DELETE-key-2-value-0/2/3"));
+        assertThat(fileStore.committedFiles.get(row(0)).get(1))
+                .isEqualTo(Arrays.asList("ADD-key-0-value-0/0/1", "ADD-key-7-value-0/7/5"));
+    }
+
+    @Test
+    public void testNoKeyChangelogs() throws Exception {
+        StoreSink sink =
+                new StoreSink(
+                        identifier,
+                        fileStore,
+                        rowType,
+                        new int[] {0},
+                        new int[] {},
+                        2,
+                        () -> lock,
+                        new HashMap<>());
+        writeAndCommit(
+                sink,
+                GenericRowData.ofKind(RowKind.INSERT, 0, 0, 1),
+                GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 0, 2, 3),
+                GenericRowData.ofKind(RowKind.UPDATE_AFTER, 0, 4, 5),
+                GenericRowData.ofKind(RowKind.DELETE, 1, 0, 1));
+        assertThat(fileStore.committedFiles.get(row(1)).get(0))
+                .isEqualTo(Collections.singletonList("ADD-key-1/0/1-value--1"));
+        assertThat(fileStore.committedFiles.get(row(0)).get(0))
+                .isEqualTo(Collections.singletonList("ADD-key-0/4/5-value-1"));
+        assertThat(fileStore.committedFiles.get(row(0)).get(1))
+                .isEqualTo(Arrays.asList("ADD-key-0/0/1-value-1", "ADD-key-0/2/3-value--1"));
+    }
+
+    @Test
+    public void testAppend() throws Exception {
+        StoreSink sink = newSink(null);
+        writeAndAssert(sink);
+
+        writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
+        assertThat(fileStore.committedFiles.get(row(1)).get(0))
+                .isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
+        assertThat(fileStore.committedFiles.get(row(0)).get(0))
+                .isEqualTo(Arrays.asList("ADD-key-2-value-0/2/3", "ADD-key-8-value-0/8/9"));
+    }
+
+    @Test
+    public void testOverwrite() throws Exception {
+        StoreSink sink = newSink(new HashMap<>());
+        writeAndAssert(sink);
+
+        writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
+        assertThat(fileStore.committedFiles.get(row(1)).get(1)).isNull();
+        assertThat(fileStore.committedFiles.get(row(1)).get(0))
+                .isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
+        assertThat(fileStore.committedFiles.get(row(0)).get(0))
+                .isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
+    }
+
+    @Test
+    public void testOverwritePartition() throws Exception {
+        HashMap<String, String> partition = new HashMap<>();
+        partition.put("part", "0");
+        StoreSink sink = newSink(partition);
+        writeAndAssert(sink);
+
+        writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
+        assertThat(fileStore.committedFiles.get(row(1)).get(1))
+                .isEqualTo(Collections.singletonList("ADD-key-0-value-1/0/1"));
+        assertThat(fileStore.committedFiles.get(row(1)).get(0))
+                .isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
+        assertThat(fileStore.committedFiles.get(row(0)).get(0))
+                .isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
+    }
+
+    private void writeAndAssert(StoreSink sink) throws Exception {
+        writeAndCommit(
+                sink,
+                GenericRowData.of(0, 0, 1),
+                GenericRowData.of(0, 2, 3),
+                GenericRowData.of(0, 7, 5),
+                GenericRowData.of(1, 0, 1));
+        assertThat(fileStore.committedFiles.get(row(1)).get(1))
+                .isEqualTo(Collections.singletonList("ADD-key-0-value-1/0/1"));
+        assertThat(fileStore.committedFiles.get(row(0)).get(0))
+                .isEqualTo(Collections.singletonList("ADD-key-2-value-0/2/3"));
+        assertThat(fileStore.committedFiles.get(row(0)).get(1))
+                .isEqualTo(Arrays.asList("ADD-key-0-value-0/0/1", "ADD-key-7-value-0/7/5"));
+    }
+
+    private void writeAndCommit(StoreSink sink, RowData... rows) throws Exception {
+        commit(sink, write(sink, rows));
+    }
+
+    private List<LocalCommittable> write(StoreSink sink, RowData... rows) throws Exception {
+        StoreSinkWriter writer = sink.createWriter(null, null);
+        for (RowData row : rows) {
+            writer.write(row, null);
+        }
+
+        List<LocalCommittable> localCommittables = writer.prepareCommit(true);
+        Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>(writer.writers());
+        assertThat(writers.size()).isGreaterThan(0);
+
+        writer.close();
+        writers.forEach(
+                (part, map) ->
+                        map.forEach(
+                                (bucket, recordWriter) -> {
+                                    TestRecordWriter testWriter = (TestRecordWriter) recordWriter;
+                                    assertThat(testWriter.synced).isTrue();
+                                    assertThat(testWriter.closed).isTrue();
+                                }));
+        return localCommittables;
+    }
+
+    private void commit(StoreSink sink, List<LocalCommittable> localCommittables) throws Exception {
+        StoreGlobalCommitter committer = (StoreGlobalCommitter) sink.createGlobalCommitter().get();
+        ManifestCommittable committable = committer.combine(localCommittables);
+
+        fileStore.expired = false;
+        lock.locked = false;
+        committer.commit(Collections.singletonList(committable));
+        assertThat(fileStore.expired).isTrue();
+        assertThat(lock.locked).isTrue();
+
+        assertThat(
+                        committer
+                                .filterRecoveredCommittables(Collections.singletonList(committable))
+                                .size())
+                .isEqualTo(0);
+
+        lock.closed = false;
+        committer.close();
+        assertThat(lock.closed).isTrue();
+    }
+
+    private StoreSink newSink(Map<String, String> overwritePartition) {
+        return new StoreSink(
+                identifier,
+                fileStore,
+                rowType,
+                new int[] {0},
+                new int[] {1},
+                2,
+                () -> lock,
+                overwritePartition);
+    }
+
+    private class TestLock implements CatalogLock {
+
+        private boolean locked = false;
+
+        private boolean closed = false;
+
+        @Override
+        public <T> T runWithLock(String database, String table, Callable<T> callable)
+                throws Exception {
+            assertThat(database).isEqualTo(identifier.getDatabaseName());
+            assertThat(table).isEqualTo(identifier.getObjectName());
+            locked = true;
+            return callable.call();
+        }
+
+        @Override
+        public void close() {
+            closed = true;
+        }
+    }
+}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
new file mode 100644
index 0000000..febe35d
--- /dev/null
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.operation.FileStoreCommit;
+import org.apache.flink.table.store.file.operation.FileStoreExpire;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.file.utils.RecordWriter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+
+/** Test {@link FileStore}. */
+public class TestFileStore implements FileStore {
+
+    public final Set<ManifestCommittable> committed = new HashSet<>();
+
+    public final Map<BinaryRowData, Map<Integer, List<String>>> committedFiles = new HashMap<>();
+
+    public boolean expired = false;
+
+    @Override
+    public FileStoreWrite newWrite() {
+        return new FileStoreWrite() {
+            @Override
+            public RecordWriter createWriter(
+                    BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+                TestRecordWriter writer = new TestRecordWriter();
+                writer.records.addAll(
+                        committedFiles
+                                .computeIfAbsent(partition, k -> new HashMap<>())
+                                .computeIfAbsent(bucket, k -> new ArrayList<>()));
+                committedFiles.get(partition).remove(bucket);
+                return writer;
+            }
+
+            @Override
+            public RecordWriter createEmptyWriter(
+                    BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+                return new TestRecordWriter();
+            }
+        };
+    }
+
+    @Override
+    public FileStoreCommit newCommit() {
+        return new TestCommit();
+    }
+
+    @Override
+    public FileStoreExpire newExpire() {
+        return () -> expired = true;
+    }
+
+    @Override
+    public FileStoreScan newScan() {
+        throw new UnsupportedOperationException();
+    }
+
+    static class TestRecordWriter implements RecordWriter {
+
+        final List<String> records = new ArrayList<>();
+
+        boolean synced = false;
+
+        boolean closed = false;
+
+        private String rowToString(RowData row) {
+            StringBuilder builder = new StringBuilder();
+            for (int i = 0; i < row.getArity(); i++) {
+                if (i != 0) {
+                    builder.append("/");
+                }
+                builder.append(row.getInt(i));
+            }
+            return builder.toString();
+        }
+
+        @Override
+        public void write(ValueKind valueKind, RowData key, RowData value) {
+            records.add(
+                    valueKind.toString()
+                            + "-key-"
+                            + rowToString(key)
+                            + "-value-"
+                            + rowToString(value));
+        }
+
+        @Override
+        public Increment prepareCommit() {
+            List<SstFileMeta> newFiles =
+                    records.stream()
+                            .map(s -> new SstFileMeta(s, 0, 0, null, null, null, 0, 0, 0))
+                            .collect(Collectors.toList());
+            return new Increment(newFiles, Collections.emptyList(), Collections.emptyList());
+        }
+
+        @Override
+        public void sync() {
+            synced = true;
+        }
+
+        @Override
+        public List<SstFileMeta> close() {
+            closed = true;
+            return Collections.emptyList();
+        }
+    }
+
+    class TestCommit implements FileStoreCommit {
+
+        Lock lock;
+
+        @Override
+        public FileStoreCommit withLock(Lock lock) {
+            this.lock = lock;
+            return this;
+        }
+
+        @Override
+        public List<ManifestCommittable> filterCommitted(
+                List<ManifestCommittable> committableList) {
+            return committableList.stream()
+                    .filter(c -> !committed.contains(c))
+                    .collect(Collectors.toList());
+        }
+
+        @Override
+        public void commit(ManifestCommittable committable, Map<String, String> properties) {
+            try {
+                lock.runWithLock(() -> committed.add(committable));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            committable
+                    .newFiles()
+                    .forEach(
+                            (part, bMap) ->
+                                    bMap.forEach(
+                                            (bucket, files) -> {
+                                                List<String> committed =
+                                                        committedFiles
+                                                                .computeIfAbsent(
+                                                                        part, k -> new HashMap<>())
+                                                                .computeIfAbsent(
+                                                                        bucket,
+                                                                        k -> new ArrayList<>());
+                                                files.stream()
+                                                        .map(SstFileMeta::fileName)
+                                                        .forEach(committed::add);
+                                            }));
+        }
+
+        @Override
+        public void overwrite(
+                Map<String, String> partition,
+                ManifestCommittable committable,
+                Map<String, String> properties) {
+            if (partition.isEmpty()) {
+                committedFiles.clear();
+            } else {
+                BinaryRowData partRow = row(Integer.parseInt(partition.get("part")));
+                committedFiles.remove(partRow);
+            }
+            commit(committable, properties);
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
index c999fc0..65e18b4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
@@ -41,10 +41,9 @@ public class ManifestCommittableSerializer
     private final BinaryRowDataSerializer partSerializer;
     private final SstFileMetaSerializer sstSerializer;
 
-    public ManifestCommittableSerializer(
-            RowType partitionType, SstFileMetaSerializer sstSerializer) {
+    public ManifestCommittableSerializer(RowType partitionType, RowType keyType, RowType rowType) {
         this.partSerializer = new BinaryRowDataSerializer(partitionType.getFieldCount());
-        this.sstSerializer = sstSerializer;
+        this.sstSerializer = new SstFileMetaSerializer(keyType, rowType);
     }
 
     @Override
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
index c8afffe..0e29652 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Incremental files for merge tree. It consists of two parts:
@@ -60,4 +61,23 @@ public class Increment {
     public List<SstFileMeta> compactAfter() {
         return compactAfter;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Increment increment = (Increment) o;
+        return Objects.equals(newFiles, increment.newFiles)
+                && Objects.equals(compactBefore, increment.compactBefore)
+                && Objects.equals(compactAfter, increment.compactAfter);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(newFiles, compactBefore, compactAfter);
+    }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
index 2fe3241..4ebbf85 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.manifest;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
-import org.apache.flink.table.store.file.mergetree.sst.SstFileMetaSerializer;
 import org.apache.flink.table.store.file.stats.FieldStats;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -38,14 +37,15 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Test for {@link ManifestCommittableSerializer}. */
 public class ManifestCommittableSerializerTest {
 
-    private final AtomicInteger id = new AtomicInteger();
+    private static final AtomicInteger ID = new AtomicInteger();
 
     @Test
     public void testCommittableSerDe() throws IOException {
-        SstFileMetaSerializer sstSerializer =
-                new SstFileMetaSerializer(RowType.of(new IntType()), RowType.of(new IntType()));
         ManifestCommittableSerializer serializer =
-                new ManifestCommittableSerializer(RowType.of(new IntType()), sstSerializer);
+                new ManifestCommittableSerializer(
+                        RowType.of(new IntType()),
+                        RowType.of(new IntType()),
+                        RowType.of(new IntType()));
         ManifestCommittable committable = new ManifestCommittable();
         addAndAssert(committable, row(0), 0);
         addAndAssert(committable, row(0), 1);
@@ -57,7 +57,7 @@ public class ManifestCommittableSerializerTest {
 
     private void addAndAssert(
             ManifestCommittable committable, BinaryRowData partition, int bucket) {
-        Increment increment = newIncrement();
+        Increment increment = randomIncrement();
         committable.add(partition, bucket, increment);
         assertThat(committable.newFiles().get(partition).get(bucket))
                 .isEqualTo(increment.newFiles());
@@ -67,11 +67,11 @@ public class ManifestCommittableSerializerTest {
                 .isEqualTo(increment.compactAfter());
     }
 
-    private Increment newIncrement() {
+    public static Increment randomIncrement() {
         return new Increment(
-                Arrays.asList(newFile(id.incrementAndGet(), 0), newFile(id.incrementAndGet(), 0)),
-                Arrays.asList(newFile(id.incrementAndGet(), 0), newFile(id.incrementAndGet(), 0)),
-                Arrays.asList(newFile(id.incrementAndGet(), 0), newFile(id.incrementAndGet(), 0)));
+                Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)),
+                Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)),
+                Arrays.asList(newFile(ID.incrementAndGet(), 0), newFile(ID.incrementAndGet(), 0)));
     }
 
     public static SstFileMeta newFile(int name, int level) {
diff --git a/pom.xml b/pom.xml
index 5443b52..5046318 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,6 +53,7 @@ under the License.
 
     <modules>
         <module>flink-table-store-core</module>
+        <module>flink-table-store-connector</module>
     </modules>
 
     <properties>