You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/15 10:44:08 UTC

[flink-table-store] branch master updated: [FLINK-27957] Extract AppendOnlyFileStore out of KeyValueFileStore

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new a3e52f85 [FLINK-27957] Extract AppendOnlyFileStore out of KeyValueFileStore
a3e52f85 is described below

commit a3e52f85900758f6c6f4a751c1ac14d012e833a8
Author: tsreaper <ts...@gmail.com>
AuthorDate: Wed Jun 15 18:44:03 2022 +0800

    [FLINK-27957] Extract AppendOnlyFileStore out of KeyValueFileStore
    
    This closes #148
---
 .../store/connector/sink/StoreSinkCompactor.java   |   6 +-
 .../store/connector/sink/StoreSinkWriter.java      |   6 +-
 .../table/store/connector/sink/StoreSinkTest.java  |  13 +-
 .../table/store/connector/sink/TestFileStore.java  |  44 ++-
 .../store/connector/sink/TestFileStoreTable.java   |  19 +-
 .../source/FileStoreSourceSplitReaderTest.java     |  17 +-
 .../source/TestChangelogDataReadWrite.java         |  28 +-
 .../flink/table/store/file/AbstractFileStore.java  | 122 +++++++++
 .../table/store/file/AppendOnlyFileStore.java      |  72 +++++
 .../apache/flink/table/store/file/FileStore.java   |  24 +-
 .../flink/table/store/file/FileStoreImpl.java      | 295 ---------------------
 .../apache/flink/table/store/file/KeyValue.java    |   4 +
 .../flink/table/store/file/KeyValueFileStore.java  |  96 +++++++
 .../table/store/file/data/AppendOnlyReader.java    |  78 ++++++
 .../file/{writer => data}/AppendOnlyWriter.java    |  27 +-
 .../store/file/mergetree/MergeTreeReader.java      |   4 +-
 .../store/file/mergetree/MergeTreeWriter.java      |   9 +-
 .../file/mergetree/compact/ConcatRecordReader.java |  21 +-
 ...oreScanImpl.java => AbstractFileStoreScan.java} |  38 +--
 .../file/operation/AbstractFileStoreWrite.java     |  66 +++++
 .../file/operation/AppendOnlyFileStoreRead.java    |  88 ++++++
 .../file/operation/AppendOnlyFileStoreScan.java    |  64 +++++
 .../file/operation/AppendOnlyFileStoreWrite.java   |  89 +++++++
 .../table/store/file/operation/FileStoreRead.java  |  35 +--
 .../table/store/file/operation/FileStoreScan.java  |   4 -
 .../table/store/file/operation/FileStoreWrite.java |  15 +-
 ...oreReadImpl.java => KeyValueFileStoreRead.java} |  73 ++---
 .../file/operation/KeyValueFileStoreScan.java      |  63 +++++
 ...eWriteImpl.java => KeyValueFileStoreWrite.java} |  88 +-----
 .../table/store/file/writer/CompactWriter.java     |   7 +-
 .../table/store/file/writer/RecordWriter.java      |   8 +-
 .../store/table/AppendOnlyFileStoreTable.java      |  49 ++--
 .../table/ChangelogValueCountFileStoreTable.java   |  27 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |  27 +-
 .../{TableWrite.java => AbstractTableWrite.java}   |  45 ++--
 .../flink/table/store/table/sink/TableWrite.java   | 124 +--------
 .../{TableRead.java => KeyValueTableRead.java}     |  26 +-
 .../flink/table/store/table/source/TableRead.java  |  47 +---
 .../flink/table/store/table/source/TableScan.java  |   2 +-
 .../flink/table/store/file/TestFileStore.java      |  22 +-
 .../{writer => data}/AppendOnlyWriterTest.java     |  23 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |   5 +-
 ...eadTest.java => KeyValueFileStoreReadTest.java} |   6 +-
 ...canTest.java => KeyValueFileStoreScanTest.java} |  27 +-
 .../store/file/operation/TestCommitThread.java     |   6 +-
 .../store/table/AppendOnlyFileStoreTableTest.java  |  19 +-
 .../ChangelogValueCountFileStoreTableTest.java     |  23 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  22 +-
 .../table/store/format/FileFormatSuffixTest.java   |   6 +-
 49 files changed, 1100 insertions(+), 929 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
index d5fcf374..a0f7e1a3 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
@@ -53,14 +53,14 @@ public class StoreSinkCompactor implements StatefulPrecommittingSinkWriter<Void>
     private final int subTaskId;
     private final int numOfParallelInstances;
 
-    private final FileStore fileStore;
+    private final FileStore<?> fileStore;
     private final Map<String, String> partitionSpec;
     private final ExecutorService compactExecutor;
 
     public StoreSinkCompactor(
             int subTaskId,
             int numOfParallelInstances,
-            FileStore fileStore,
+            FileStore<?> fileStore,
             Map<String, String> partitionSpec) {
         this.subTaskId = subTaskId;
         this.numOfParallelInstances = numOfParallelInstances;
@@ -120,7 +120,7 @@ public class StoreSinkCompactor implements StatefulPrecommittingSinkWriter<Void>
                                 bucket,
                                 subTaskId);
                     }
-                    RecordWriter writer =
+                    RecordWriter<?> writer =
                             fileStore
                                     .newWrite()
                                     .createCompactWriter(
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 8229ea55..c3c6a7db 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.log.LogWriteCallback;
+import org.apache.flink.table.store.table.sink.AbstractTableWrite;
 import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
@@ -143,8 +144,9 @@ public class StoreSinkWriter<WriterStateT>
         }
     }
 
+    @SuppressWarnings("unchecked")
     @VisibleForTesting
-    Map<BinaryRowData, Map<Integer, RecordWriter>> writers() {
-        return write.writers();
+    Map<BinaryRowData, Map<Integer, RecordWriter<?>>> writers() {
+        return ((AbstractTableWrite) write).writers();
     }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index 2a6f03ea..edd19a5a 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -31,12 +31,12 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
 import org.apache.flink.table.store.connector.sink.TestFileStore.TestRecordWriter;
+import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
 import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -105,15 +105,8 @@ public class StoreSinkTest {
                                         new HashMap<>(),
                                         ""));
 
-        RowType keyType = hasPk ? schema.logicalTrimmedPrimaryKeysType() : schema.logicalRowType();
-        RowType valueType =
-                hasPk
-                        ? schema.logicalRowType()
-                        : new RowType(
-                                Collections.singletonList(
-                                        new RowType.RowField("COUNT", new BigIntType(false))));
         RowType partitionType = schema.logicalPartitionType();
-        fileStore = new TestFileStore(hasPk, keyType, valueType, partitionType);
+        fileStore = new TestFileStore(hasPk, partitionType);
         table = new TestFileStoreTable(fileStore, schema);
     }
 
@@ -241,7 +234,7 @@ public class StoreSinkTest {
         }
 
         List<Committable> committables = ((StoreSinkWriter) writer).prepareCommit();
-        Map<BinaryRowData, Map<Integer, RecordWriter>> writers =
+        Map<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> writers =
                 new HashMap<>(((StoreSinkWriter) writer).writers());
         assertThat(writers.size()).isGreaterThan(0);
 
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 639d5294..109c692c 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.store.connector.sink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.mergetree.Increment;
@@ -51,31 +51,27 @@ import static org.apache.flink.table.store.file.mergetree.compact.CompactManager
 import static org.apache.flink.table.store.file.stats.StatsTestUtils.newEmptyTableStats;
 
 /** Test {@link FileStore}. */
-public class TestFileStore implements FileStore {
+public class TestFileStore implements FileStore<KeyValue> {
 
     public final Set<ManifestCommittable> committed = new HashSet<>();
 
     public final Map<BinaryRowData, Map<Integer, List<String>>> committedFiles = new HashMap<>();
 
     public final boolean hasPk;
-    private final RowType keyType;
-    private final RowType valueType;
     private final RowType partitionType;
 
     public boolean expired = false;
 
-    public TestFileStore(boolean hasPk, RowType keyType, RowType valueType, RowType partitionType) {
+    public TestFileStore(boolean hasPk, RowType partitionType) {
         this.hasPk = hasPk;
-        this.keyType = keyType;
-        this.valueType = valueType;
         this.partitionType = partitionType;
     }
 
     @Override
-    public FileStoreWrite newWrite() {
-        return new FileStoreWrite() {
+    public FileStoreWrite<KeyValue> newWrite() {
+        return new FileStoreWrite<KeyValue>() {
             @Override
-            public RecordWriter createWriter(
+            public RecordWriter<KeyValue> createWriter(
                     BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
                 TestRecordWriter writer = new TestRecordWriter(hasPk);
                 writer.records.addAll(
@@ -87,7 +83,7 @@ public class TestFileStore implements FileStore {
             }
 
             @Override
-            public RecordWriter createEmptyWriter(
+            public RecordWriter<KeyValue> createEmptyWriter(
                     BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
                 return new TestRecordWriter(hasPk);
             }
@@ -104,7 +100,7 @@ public class TestFileStore implements FileStore {
     }
 
     @Override
-    public FileStoreRead newRead() {
+    public FileStoreRead<KeyValue> newRead() {
         throw new UnsupportedOperationException();
     }
 
@@ -128,16 +124,6 @@ public class TestFileStore implements FileStore {
         };
     }
 
-    @Override
-    public RowType keyType() {
-        return keyType;
-    }
-
-    @Override
-    public RowType valueType() {
-        return valueType;
-    }
-
     @Override
     public RowType partitionType() {
         return partitionType;
@@ -153,7 +139,7 @@ public class TestFileStore implements FileStore {
         throw new UnsupportedOperationException();
     }
 
-    static class TestRecordWriter implements RecordWriter {
+    static class TestRecordWriter implements RecordWriter<KeyValue> {
 
         final List<String> records = new ArrayList<>();
         final boolean hasPk;
@@ -186,17 +172,17 @@ public class TestFileStore implements FileStore {
         }
 
         @Override
-        public void write(ValueKind valueKind, RowData key, RowData value) {
+        public void write(KeyValue kv) {
             if (!hasPk) {
-                assert value.getArity() == 1;
-                assert value.getLong(0) >= -1L;
+                assert kv.value().getArity() == 1;
+                assert kv.value().getLong(0) >= -1L;
             }
             records.add(
-                    valueKind.toString()
+                    kv.valueKind().toString()
                             + "-key-"
-                            + rowToString(key, true)
+                            + rowToString(kv.key(), true)
                             + "-value-"
-                            + rowToString(value, false));
+                            + rowToString(kv.value(), false));
         }
 
         @Override
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
index 6a870f1f..49a387ac 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
@@ -19,12 +19,13 @@
 package org.apache.flink.table.store.connector.sink;
 
 import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.store.file.FileStore;
+import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.sink.AbstractTableWrite;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableCommit;
@@ -71,22 +72,24 @@ public class TestFileStoreTable implements FileStoreTable {
 
     @Override
     public TableWrite newWrite() {
-        return new TableWrite(store.newWrite(), new SinkRecordConverter(2, schema)) {
+        return new AbstractTableWrite<KeyValue>(
+                store.newWrite(), new SinkRecordConverter(2, schema)) {
             @Override
-            protected void writeSinkRecord(SinkRecord record, RecordWriter writer)
+            protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
                     throws Exception {
                 boolean isInsert =
                         record.row().getRowKind() == RowKind.INSERT
                                 || record.row().getRowKind() == RowKind.UPDATE_AFTER;
+                KeyValue kv = new KeyValue();
                 if (store.hasPk) {
-                    writer.write(
-                            isInsert ? ValueKind.ADD : ValueKind.DELETE,
+                    kv.replace(
                             record.primaryKey(),
+                            isInsert ? ValueKind.ADD : ValueKind.DELETE,
                             record.row());
                 } else {
-                    writer.write(
-                            ValueKind.ADD, record.row(), GenericRowData.of(isInsert ? 1L : -1L));
+                    kv.replace(record.row(), ValueKind.ADD, GenericRowData.of(isInsert ? 1L : -1L));
                 }
+                writer.write(kv);
             }
         };
     }
@@ -97,7 +100,7 @@ public class TestFileStoreTable implements FileStoreTable {
     }
 
     @Override
-    public FileStore store() {
+    public TestFileStore store() {
         return store;
     }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index cfe688bc..9e3e17ae 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.file.src.util.RecordAndPosition;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.writer.RecordWriter;
@@ -135,11 +136,21 @@ public class FileStoreSourceSplitReaderTest {
                 new FileStoreSourceSplitReader(rw.createReadWithKey().withIncremental(true));
 
         List<Tuple2<Long, Long>> input = kvs();
-        RecordWriter writer = rw.createMergeTreeWriter(row(1), 0);
+        RecordWriter<KeyValue> writer = rw.createMergeTreeWriter(row(1), 0);
         for (Tuple2<Long, Long> tuple2 : input) {
-            writer.write(ValueKind.ADD, GenericRowData.of(tuple2.f0), GenericRowData.of(tuple2.f1));
+            writer.write(
+                    new KeyValue()
+                            .replace(
+                                    GenericRowData.of(tuple2.f0),
+                                    ValueKind.ADD,
+                                    GenericRowData.of(tuple2.f1)));
         }
-        writer.write(ValueKind.DELETE, GenericRowData.of(222L), GenericRowData.of(333L));
+        writer.write(
+                new KeyValue()
+                        .replace(
+                                GenericRowData.of(222L),
+                                ValueKind.DELETE,
+                                GenericRowData.of(333L)));
         List<DataFileMeta> files = writer.prepareCommit().newFiles();
         writer.close();
 
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 4aa5f332..d4d26f50 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -27,19 +27,18 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.operation.FileStoreRead;
-import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
-import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.source.KeyValueTableRead;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
 import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
@@ -100,18 +99,17 @@ public class TestChangelogDataReadWrite {
     private TableRead createRead(
             Function<RecordReader.RecordIterator<KeyValue>, RecordReader.RecordIterator<RowData>>
                     rowDataIteratorCreator) {
-        FileStoreRead read =
-                new FileStoreReadImpl(
+        KeyValueFileStoreRead read =
+                new KeyValueFileStoreRead(
                         new SchemaManager(tablePath),
                         0,
-                        WriteMode.CHANGE_LOG,
                         KEY_TYPE,
                         VALUE_TYPE,
                         COMPARATOR,
                         new DeduplicateMergeFunction(),
                         avro,
                         pathFactory);
-        return new TableRead(read) {
+        return new KeyValueTableRead(read) {
             @Override
             public TableRead withProjection(int[][] projection) {
                 throw new UnsupportedOperationException();
@@ -135,19 +133,23 @@ public class TestChangelogDataReadWrite {
             BinaryRowData partition, int bucket, List<Tuple2<Long, Long>> kvs) throws Exception {
         Preconditions.checkNotNull(
                 service, "ExecutorService must be provided if writeFiles is needed");
-        RecordWriter writer = createMergeTreeWriter(partition, bucket);
+        RecordWriter<KeyValue> writer = createMergeTreeWriter(partition, bucket);
         for (Tuple2<Long, Long> tuple2 : kvs) {
-            writer.write(ValueKind.ADD, GenericRowData.of(tuple2.f0), GenericRowData.of(tuple2.f1));
+            writer.write(
+                    new KeyValue()
+                            .replace(
+                                    GenericRowData.of(tuple2.f0),
+                                    ValueKind.ADD,
+                                    GenericRowData.of(tuple2.f1)));
         }
         List<DataFileMeta> files = writer.prepareCommit().newFiles();
         writer.close();
         return new ArrayList<>(files);
     }
 
-    public RecordWriter createMergeTreeWriter(BinaryRowData partition, int bucket) {
+    public RecordWriter<KeyValue> createMergeTreeWriter(BinaryRowData partition, int bucket) {
         MergeTreeOptions options = new MergeTreeOptions(new Configuration());
-        return new FileStoreWriteImpl(
-                        WriteMode.CHANGE_LOG,
+        return new KeyValueFileStoreWrite(
                         new SchemaManager(tablePath),
                         0,
                         KEY_TYPE,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
new file mode 100644
index 00000000..4e3338f5
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AbstractFileStore.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
+import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Base {@link FileStore} implementation.
+ *
+ * @param <T> type of record to read and write.
+ */
+public abstract class AbstractFileStore<T> implements FileStore<T> {
+
+    protected final SchemaManager schemaManager;
+    protected final long schemaId;
+    protected final FileStoreOptions options;
+    protected final String user;
+    protected final RowType partitionType;
+
+    public AbstractFileStore(
+            SchemaManager schemaManager,
+            long schemaId,
+            FileStoreOptions options,
+            String user,
+            RowType partitionType) {
+        this.schemaManager = schemaManager;
+        this.schemaId = schemaId;
+        this.options = options;
+        this.user = user;
+        this.partitionType = partitionType;
+    }
+
+    public FileStorePathFactory pathFactory() {
+        return new FileStorePathFactory(
+                options.path(),
+                partitionType,
+                options.partitionDefaultName(),
+                options.fileFormat().getFormatIdentifier());
+    }
+
+    @Override
+    public SnapshotManager snapshotManager() {
+        return new SnapshotManager(options.path());
+    }
+
+    @VisibleForTesting
+    public ManifestFile.Factory manifestFileFactory() {
+        return new ManifestFile.Factory(
+                schemaManager,
+                schemaId,
+                partitionType,
+                options.manifestFormat(),
+                pathFactory(),
+                options.manifestTargetSize().getBytes());
+    }
+
+    @VisibleForTesting
+    public ManifestList.Factory manifestListFactory() {
+        return new ManifestList.Factory(partitionType, options.manifestFormat(), pathFactory());
+    }
+
+    @Override
+    public RowType partitionType() {
+        return partitionType;
+    }
+
+    public FileStoreOptions options() {
+        return options;
+    }
+
+    @Override
+    public FileStoreCommitImpl newCommit() {
+        return new FileStoreCommitImpl(
+                schemaId,
+                user,
+                partitionType,
+                pathFactory(),
+                snapshotManager(),
+                manifestFileFactory(),
+                manifestListFactory(),
+                newScan(),
+                options.bucket(),
+                options.manifestTargetSize(),
+                options.manifestMergeMinCount());
+    }
+
+    @Override
+    public FileStoreExpireImpl newExpire() {
+        return new FileStoreExpireImpl(
+                options.snapshotNumRetainMin(),
+                options.snapshotNumRetainMax(),
+                options.snapshotTimeRetain().toMillis(),
+                pathFactory(),
+                snapshotManager(),
+                manifestFileFactory(),
+                manifestListFactory());
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
new file mode 100644
index 00000000..d4c0ac43
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreRead;
+import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
+import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreWrite;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link FileStore} for reading and writing {@link RowData}. */
+public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
+
+    private final RowType rowType;
+
+    public AppendOnlyFileStore(
+            SchemaManager schemaManager,
+            long schemaId,
+            FileStoreOptions options,
+            String user,
+            RowType partitionType,
+            RowType rowType) {
+        super(schemaManager, schemaId, options, user, partitionType);
+        this.rowType = rowType;
+    }
+
+    @Override
+    public AppendOnlyFileStoreScan newScan() {
+        return new AppendOnlyFileStoreScan(
+                partitionType,
+                rowType,
+                snapshotManager(),
+                manifestFileFactory(),
+                manifestListFactory(),
+                options.bucket());
+    }
+
+    @Override
+    public AppendOnlyFileStoreRead newRead() {
+        return new AppendOnlyFileStoreRead(
+                schemaManager, schemaId, rowType, options.fileFormat(), pathFactory());
+    }
+
+    @Override
+    public AppendOnlyFileStoreWrite newWrite() {
+        return new AppendOnlyFileStoreWrite(
+                schemaId,
+                rowType,
+                options.fileFormat(),
+                pathFactory(),
+                snapshotManager(),
+                newScan(),
+                options.mergeTreeOptions().targetFileSize);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
index c6c205ff..9df275aa 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStore.java
@@ -28,24 +28,24 @@ import org.apache.flink.table.types.logical.RowType;
 
 import java.io.Serializable;
 
-/** File store interface. */
-public interface FileStore extends Serializable {
-
-    FileStoreWrite newWrite();
-
-    FileStoreRead newRead();
+/**
+ * File store interface.
+ *
+ * @param <T> type of record to read and write.
+ */
+public interface FileStore<T> extends Serializable {
 
-    FileStoreCommit newCommit();
+    SnapshotManager snapshotManager();
 
-    FileStoreExpire newExpire();
+    RowType partitionType();
 
     FileStoreScan newScan();
 
-    SnapshotManager snapshotManager();
+    FileStoreRead<T> newRead();
 
-    RowType keyType();
+    FileStoreWrite<T> newWrite();
 
-    RowType valueType();
+    FileStoreCommit newCommit();
 
-    RowType partitionType();
+    FileStoreExpire newExpire();
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
deleted file mode 100644
index f391f3b4..00000000
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreImpl.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.manifest.ManifestFile;
-import org.apache.flink.table.store.file.manifest.ManifestList;
-import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
-import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
-import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
-import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
-import org.apache.flink.table.store.file.operation.FileStoreReadImpl;
-import org.apache.flink.table.store.file.operation.FileStoreScanImpl;
-import org.apache.flink.table.store.file.operation.FileStoreWriteImpl;
-import org.apache.flink.table.store.file.schema.SchemaManager;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
-import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-
-import javax.annotation.Nullable;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-/** File store implementation. */
-public class FileStoreImpl implements FileStore {
-
-    private final SchemaManager schemaManager;
-    private final long schemaId;
-    private final WriteMode writeMode;
-    private final FileStoreOptions options;
-    private final String user;
-    private final RowType partitionType;
-    private final RowType keyType;
-    private final RowType valueType;
-    private final Supplier<Comparator<RowData>> keyComparatorSupplier;
-    @Nullable private final MergeFunction mergeFunction;
-
-    public FileStoreImpl(
-            SchemaManager schemaManager,
-            long schemaId,
-            FileStoreOptions options,
-            WriteMode writeMode,
-            String user,
-            RowType partitionType,
-            RowType keyType,
-            RowType valueType,
-            @Nullable MergeFunction mergeFunction) {
-        this.schemaManager = schemaManager;
-        this.schemaId = schemaId;
-        this.options = options;
-        this.writeMode = writeMode;
-        this.user = user;
-        this.partitionType = partitionType;
-        this.keyType = keyType;
-        this.valueType = valueType;
-        this.mergeFunction = mergeFunction;
-        this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
-    }
-
-    public FileStorePathFactory pathFactory() {
-        return new FileStorePathFactory(
-                options.path(),
-                partitionType,
-                options.partitionDefaultName(),
-                options.fileFormat().getFormatIdentifier());
-    }
-
-    public FileStoreOptions options() {
-        return options;
-    }
-
-    @VisibleForTesting
-    public ManifestFile.Factory manifestFileFactory() {
-        return new ManifestFile.Factory(
-                schemaManager,
-                schemaId,
-                partitionType,
-                options.manifestFormat(),
-                pathFactory(),
-                options.manifestTargetSize().getBytes());
-    }
-
-    @VisibleForTesting
-    public ManifestList.Factory manifestListFactory() {
-        return new ManifestList.Factory(partitionType, options.manifestFormat(), pathFactory());
-    }
-
-    @Override
-    public FileStoreWriteImpl newWrite() {
-        return new FileStoreWriteImpl(
-                writeMode,
-                schemaManager,
-                schemaId,
-                keyType,
-                valueType,
-                keyComparatorSupplier,
-                mergeFunction,
-                options.fileFormat(),
-                pathFactory(),
-                snapshotManager(),
-                newScan(),
-                options.mergeTreeOptions());
-    }
-
-    @Override
-    public FileStoreReadImpl newRead() {
-        return new FileStoreReadImpl(
-                schemaManager,
-                schemaId,
-                writeMode,
-                keyType,
-                valueType,
-                keyComparatorSupplier.get(),
-                mergeFunction,
-                options.fileFormat(),
-                pathFactory());
-    }
-
-    @Override
-    public FileStoreCommitImpl newCommit() {
-        return new FileStoreCommitImpl(
-                schemaId,
-                user,
-                partitionType,
-                pathFactory(),
-                snapshotManager(),
-                manifestFileFactory(),
-                manifestListFactory(),
-                newScan(),
-                options.bucket(),
-                options.manifestTargetSize(),
-                options.manifestMergeMinCount());
-    }
-
-    @Override
-    public FileStoreExpireImpl newExpire() {
-        return new FileStoreExpireImpl(
-                options.snapshotNumRetainMin(),
-                options.snapshotNumRetainMax(),
-                options.snapshotTimeRetain().toMillis(),
-                pathFactory(),
-                snapshotManager(),
-                manifestFileFactory(),
-                manifestListFactory());
-    }
-
-    @Override
-    public FileStoreScanImpl newScan() {
-        return new FileStoreScanImpl(
-                partitionType,
-                keyType,
-                valueType,
-                snapshotManager(),
-                manifestFileFactory(),
-                manifestListFactory(),
-                options.bucket());
-    }
-
-    @Override
-    public SnapshotManager snapshotManager() {
-        return new SnapshotManager(options.path());
-    }
-
-    @Override
-    public RowType keyType() {
-        return keyType;
-    }
-
-    @Override
-    public RowType valueType() {
-        return valueType;
-    }
-
-    @Override
-    public RowType partitionType() {
-        return partitionType;
-    }
-
-    public static FileStoreImpl createWithAppendOnly(
-            SchemaManager schemaManager,
-            long schemaId,
-            FileStoreOptions options,
-            String user,
-            RowType partitionType,
-            RowType rowType) {
-        return new FileStoreImpl(
-                schemaManager,
-                schemaId,
-                options,
-                WriteMode.APPEND_ONLY,
-                user,
-                partitionType,
-                RowType.of(),
-                rowType,
-                null);
-    }
-
-    public static FileStoreImpl createWithPrimaryKey(
-            SchemaManager schemaManager,
-            long schemaId,
-            FileStoreOptions options,
-            String user,
-            RowType partitionType,
-            RowType primaryKeyType,
-            RowType rowType,
-            FileStoreOptions.MergeEngine mergeEngine) {
-        // add _KEY_ prefix to avoid conflict with value
-        RowType keyType =
-                new RowType(
-                        primaryKeyType.getFields().stream()
-                                .map(
-                                        f ->
-                                                new RowType.RowField(
-                                                        "_KEY_" + f.getName(),
-                                                        f.getType(),
-                                                        f.getDescription().orElse(null)))
-                                .collect(Collectors.toList()));
-
-        MergeFunction mergeFunction;
-        switch (mergeEngine) {
-            case DEDUPLICATE:
-                mergeFunction = new DeduplicateMergeFunction();
-                break;
-            case PARTIAL_UPDATE:
-                List<LogicalType> fieldTypes = rowType.getChildren();
-                RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.size()];
-                for (int i = 0; i < fieldTypes.size(); i++) {
-                    fieldGetters[i] = RowData.createFieldGetter(fieldTypes.get(i), i);
-                }
-                mergeFunction = new PartialUpdateMergeFunction(fieldGetters);
-                break;
-            default:
-                throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine);
-        }
-
-        return new FileStoreImpl(
-                schemaManager,
-                schemaId,
-                options,
-                WriteMode.CHANGE_LOG,
-                user,
-                partitionType,
-                keyType,
-                rowType,
-                mergeFunction);
-    }
-
-    public static FileStoreImpl createWithValueCount(
-            SchemaManager schemaManager,
-            long schemaId,
-            FileStoreOptions options,
-            String user,
-            RowType partitionType,
-            RowType rowType) {
-        RowType countType =
-                RowType.of(
-                        new LogicalType[] {new BigIntType(false)}, new String[] {"_VALUE_COUNT"});
-        MergeFunction mergeFunction = new ValueCountMergeFunction();
-        return new FileStoreImpl(
-                schemaManager,
-                schemaId,
-                options,
-                WriteMode.CHANGE_LOG,
-                user,
-                partitionType,
-                rowType,
-                countType,
-                mergeFunction);
-    }
-}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index a4703760..a5baa708 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -46,6 +46,10 @@ public class KeyValue {
         return this;
     }
 
+    public KeyValue replace(RowData key, ValueKind valueKind, RowData value) {
+        return replace(key, -1, valueKind, value);
+    }
+
     public KeyValue replace(RowData key, long sequenceNumber, ValueKind valueKind, RowData value) {
         this.key = key;
         this.sequenceNumber = sequenceNumber;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
new file mode 100644
index 00000000..287f25d4
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreWrite;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.KeyComparatorSupplier;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Comparator;
+import java.util.function.Supplier;
+
+/** {@link FileStore} for querying and updating {@link KeyValue}s. */
+public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
+
+    private final RowType keyType;
+    private final RowType valueType;
+    private final Supplier<Comparator<RowData>> keyComparatorSupplier;
+    private final MergeFunction mergeFunction;
+
+    public KeyValueFileStore(
+            SchemaManager schemaManager,
+            long schemaId,
+            FileStoreOptions options,
+            String user,
+            RowType partitionType,
+            RowType keyType,
+            RowType valueType,
+            MergeFunction mergeFunction) {
+        super(schemaManager, schemaId, options, user, partitionType);
+        this.keyType = keyType;
+        this.valueType = valueType;
+        this.mergeFunction = mergeFunction;
+        this.keyComparatorSupplier = new KeyComparatorSupplier(keyType);
+    }
+
+    @Override
+    public KeyValueFileStoreScan newScan() {
+        return new KeyValueFileStoreScan(
+                partitionType,
+                keyType,
+                snapshotManager(),
+                manifestFileFactory(),
+                manifestListFactory(),
+                options.bucket());
+    }
+
+    @Override
+    public KeyValueFileStoreRead newRead() {
+        return new KeyValueFileStoreRead(
+                schemaManager,
+                schemaId,
+                keyType,
+                valueType,
+                keyComparatorSupplier.get(),
+                mergeFunction,
+                options.fileFormat(),
+                pathFactory());
+    }
+
+    @Override
+    public KeyValueFileStoreWrite newWrite() {
+        return new KeyValueFileStoreWrite(
+                schemaManager,
+                schemaId,
+                keyType,
+                valueType,
+                keyComparatorSupplier,
+                mergeFunction,
+                options.fileFormat(),
+                pathFactory(),
+                snapshotManager(),
+                newScan(),
+                options.mergeTreeOptions());
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
new file mode 100644
index 00000000..86ffaeed
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordAndPosition;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.utils.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** Reads {@link RowData} from data files. */
+public class AppendOnlyReader implements RecordReader<RowData> {
+
+    private final BulkFormat.Reader<RowData> reader;
+
+    public AppendOnlyReader(Path path, BulkFormat<RowData, FileSourceSplit> readerFactory)
+            throws IOException {
+        long fileSize = FileUtils.getFileSize(path);
+        FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize, 0, fileSize);
+        this.reader = readerFactory.createReader(FileUtils.DEFAULT_READER_CONFIG, split);
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<RowData> readBatch() throws IOException {
+        BulkFormat.RecordIterator<RowData> iterator = reader.readBatch();
+        return iterator == null ? null : new AppendOnlyRecordIterator(iterator);
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    private static class AppendOnlyRecordIterator implements RecordReader.RecordIterator<RowData> {
+
+        private final BulkFormat.RecordIterator<RowData> iterator;
+
+        private AppendOnlyRecordIterator(BulkFormat.RecordIterator<RowData> iterator) {
+            this.iterator = iterator;
+        }
+
+        @Override
+        public RowData next() throws IOException {
+            RecordAndPosition<RowData> result = iterator.next();
+
+            // TODO schema evolution
+            return result == null ? null : result.getRecord();
+        }
+
+        @Override
+        public void releaseBatch() {
+            iterator.releaseBatch();
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
similarity index 86%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
index a3842c77..3c3f9e7e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
@@ -17,19 +17,23 @@
  * under the License.
  */
 
-package org.apache.flink.table.store.file.writer;
+package org.apache.flink.table.store.file.data;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.data.DataFilePathFactory;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.stats.BinaryTableStats;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.store.file.writer.BaseFileWriter;
+import org.apache.flink.table.store.file.writer.FileWriter;
+import org.apache.flink.table.store.file.writer.Metric;
+import org.apache.flink.table.store.file.writer.MetricFileWriter;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.file.writer.RollingFileWriter;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -42,7 +46,7 @@ import java.util.function.Supplier;
  * A {@link RecordWriter} implementation that only accepts records which are always insert
  * operations and don't have any unique keys or sort keys.
  */
-public class AppendOnlyWriter implements RecordWriter {
+public class AppendOnlyWriter implements RecordWriter<RowData> {
     private final long schemaId;
     private final long targetFileSize;
     private final DataFilePathFactory pathFactory;
@@ -78,13 +82,12 @@ public class AppendOnlyWriter implements RecordWriter {
     }
 
     @Override
-    public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+    public void write(RowData rowData) throws Exception {
         Preconditions.checkArgument(
-                valueKind == ValueKind.ADD,
-                "Append-only writer cannot accept ValueKind: %s",
-                valueKind);
-
-        writer.write(value);
+                rowData.getRowKind() == RowKind.INSERT,
+                "Append-only writer can only accept insert row kind, but current row kind is: %s",
+                rowData.getRowKind());
+        writer.write(rowData);
     }
 
     @Override
@@ -99,6 +102,8 @@ public class AppendOnlyWriter implements RecordWriter {
             writer = createRollingRowWriter();
         }
 
+        System.out.println(newFiles);
+
         return Increment.forAppend(newFiles);
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
index 66fe012a..fbf17b83 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
@@ -52,7 +52,7 @@ public class MergeTreeReader implements RecordReader<KeyValue> {
             throws IOException {
         this.dropDelete = dropDelete;
 
-        List<ReaderSupplier> readers = new ArrayList<>();
+        List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
         for (List<SortedRun> section : sections) {
             readers.add(
                     () ->
@@ -117,7 +117,7 @@ public class MergeTreeReader implements RecordReader<KeyValue> {
 
     public static RecordReader<KeyValue> readerForRun(SortedRun run, DataFileReader dataFileReader)
             throws IOException {
-        List<ReaderSupplier> readers = new ArrayList<>();
+        List<ReaderSupplier<KeyValue>> readers = new ArrayList<>();
         for (DataFileMeta file : run.files()) {
             readers.add(() -> dataFileReader.read(file.fileName()));
         }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 0c62407d..677b02ff 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.mergetree;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileWriter;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
@@ -40,7 +39,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 /** A {@link RecordWriter} to write records and generate {@link Increment}. */
-public class MergeTreeWriter implements RecordWriter {
+public class MergeTreeWriter implements RecordWriter<KeyValue> {
 
     private final MemTable memTable;
 
@@ -100,12 +99,12 @@ public class MergeTreeWriter implements RecordWriter {
     }
 
     @Override
-    public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+    public void write(KeyValue kv) throws Exception {
         long sequenceNumber = newSequenceNumber();
-        boolean success = memTable.put(sequenceNumber, valueKind, key, value);
+        boolean success = memTable.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
         if (!success) {
             flush();
-            success = memTable.put(sequenceNumber, valueKind, key, value);
+            success = memTable.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
             if (!success) {
                 throw new RuntimeException("Mem table is too small to hold a single element.");
             }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java
index b9b86053..3cafd7a5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.store.file.mergetree.compact;
 
-import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.util.Preconditions;
 
@@ -34,29 +33,29 @@ import java.util.Queue;
  * input list is already sorted by key and sequence number, and the key intervals do not overlap
  * each other.
  */
-public class ConcatRecordReader implements RecordReader<KeyValue> {
+public class ConcatRecordReader<T> implements RecordReader<T> {
 
-    private final Queue<ReaderSupplier> queue;
+    private final Queue<ReaderSupplier<T>> queue;
 
-    private RecordReader<KeyValue> current;
+    private RecordReader<T> current;
 
-    protected ConcatRecordReader(List<ReaderSupplier> readerFactories) {
+    protected ConcatRecordReader(List<ReaderSupplier<T>> readerFactories) {
         readerFactories.forEach(
                 supplier ->
                         Preconditions.checkNotNull(supplier, "Reader factory must not be null."));
         this.queue = new LinkedList<>(readerFactories);
     }
 
-    public static RecordReader<KeyValue> create(List<ReaderSupplier> readers) throws IOException {
-        return readers.size() == 1 ? readers.get(0).get() : new ConcatRecordReader(readers);
+    public static <R> RecordReader<R> create(List<ReaderSupplier<R>> readers) throws IOException {
+        return readers.size() == 1 ? readers.get(0).get() : new ConcatRecordReader<>(readers);
     }
 
     @Nullable
     @Override
-    public RecordIterator<KeyValue> readBatch() throws IOException {
+    public RecordIterator<T> readBatch() throws IOException {
         while (true) {
             if (current != null) {
-                RecordIterator<KeyValue> iterator = current.readBatch();
+                RecordIterator<T> iterator = current.readBatch();
                 if (iterator != null) {
                     return iterator;
                 }
@@ -79,7 +78,7 @@ public class ConcatRecordReader implements RecordReader<KeyValue> {
 
     /** Supplier to get {@link RecordReader}. */
     @FunctionalInterface
-    public interface ReaderSupplier {
-        RecordReader<KeyValue> get() throws IOException;
+    public interface ReaderSupplier<T> {
+        RecordReader<T> get() throws IOException;
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
similarity index 89%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 873bcf2a..02d28007 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -46,11 +46,9 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /** Default implementation of {@link FileStoreScan}. */
-public class FileStoreScanImpl implements FileStoreScan {
+public abstract class AbstractFileStoreScan implements FileStoreScan {
 
     private final FieldStatsArraySerializer partitionStatsConverter;
-    private final FieldStatsArraySerializer keyStatsConverter;
-    private final FieldStatsArraySerializer valueStatsConverter;
     private final RowDataToObjectArrayConverter partitionConverter;
     private final SnapshotManager snapshotManager;
     private final ManifestFile.Factory manifestFileFactory;
@@ -58,25 +56,19 @@ public class FileStoreScanImpl implements FileStoreScan {
     private final int numOfBuckets;
 
     private Predicate partitionFilter;
-    private Predicate keyFilter;
-    private Predicate valueFilter;
 
     private Long specifiedSnapshotId = null;
     private Integer specifiedBucket = null;
     private List<ManifestFileMeta> specifiedManifests = null;
     private boolean isIncremental = false;
 
-    public FileStoreScanImpl(
+    public AbstractFileStoreScan(
             RowType partitionType,
-            RowType keyType,
-            RowType valueType,
             SnapshotManager snapshotManager,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets) {
         this.partitionStatsConverter = new FieldStatsArraySerializer(partitionType);
-        this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
-        this.valueStatsConverter = new FieldStatsArraySerializer(valueType);
         this.partitionConverter = new RowDataToObjectArrayConverter(partitionType);
         this.snapshotManager = snapshotManager;
         this.manifestFileFactory = manifestFileFactory;
@@ -117,18 +109,6 @@ public class FileStoreScanImpl implements FileStoreScan {
         }
     }
 
-    @Override
-    public FileStoreScan withKeyFilter(Predicate predicate) {
-        this.keyFilter = predicate;
-        return this;
-    }
-
-    @Override
-    public FileStoreScan withValueFilter(Predicate predicate) {
-        this.valueFilter = predicate;
-        return this;
-    }
-
     @Override
     public FileStoreScan withBucket(int bucket) {
         this.specifiedBucket = bucket;
@@ -250,6 +230,10 @@ public class FileStoreScanImpl implements FileStoreScan {
     }
 
     private boolean filterManifestEntry(ManifestEntry entry) {
+        return filterByPartitionAndBucket(entry) && filterByStats(entry);
+    }
+
+    private boolean filterByPartitionAndBucket(ManifestEntry entry) {
         if (specifiedBucket != null) {
             Preconditions.checkState(
                     specifiedBucket < entry.totalBuckets(),
@@ -257,17 +241,11 @@ public class FileStoreScanImpl implements FileStoreScan {
         }
         return (partitionFilter == null
                         || partitionFilter.test(partitionConverter.convert(entry.partition())))
-                && (keyFilter == null
-                        || keyFilter.test(
-                                entry.file().rowCount(),
-                                entry.file().keyStats().fields(keyStatsConverter)))
-                && (valueFilter == null
-                        || valueFilter.test(
-                                entry.file().rowCount(),
-                                entry.file().valueStats().fields(valueStatsConverter)))
                 && (specifiedBucket == null || entry.bucket() == specifiedBucket);
     }
 
+    protected abstract boolean filterByStats(ManifestEntry entry);
+
     private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) {
         return manifestFileFactory.create().read(manifest.fileName());
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
new file mode 100644
index 00000000..c3e09745
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Base {@link FileStoreWrite} implementation.
+ *
+ * @param <T> type of record to write.
+ */
+public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
+
+    private final SnapshotManager snapshotManager;
+    private final FileStoreScan scan;
+
+    protected AbstractFileStoreWrite(SnapshotManager snapshotManager, FileStoreScan scan) {
+        this.snapshotManager = snapshotManager;
+        this.scan = scan;
+    }
+
+    protected List<DataFileMeta> scanExistingFileMetas(BinaryRowData partition, int bucket) {
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+        List<DataFileMeta> existingFileMetas = Lists.newArrayList();
+        if (latestSnapshotId != null) {
+            // Concat all the DataFileMeta of existing files into existingFileMetas.
+            scan.withSnapshot(latestSnapshotId)
+                    .withPartitionFilter(Collections.singletonList(partition)).withBucket(bucket)
+                    .plan().files().stream()
+                    .map(ManifestEntry::file)
+                    .forEach(existingFileMetas::add);
+        }
+        return existingFileMetas;
+    }
+
+    protected long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
+        return fileMetas.stream()
+                .map(DataFileMeta::maxSequenceNumber)
+                .max(Long::compare)
+                .orElse(-1L);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
new file mode 100644
index 00000000..d8a77d63
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.AppendOnlyReader;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** {@link FileStoreRead} for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */
+public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> {
+
+    private final SchemaManager schemaManager;
+    private final long schemaId;
+    private final RowType rowType;
+    private final FileFormat fileFormat;
+    private final FileStorePathFactory pathFactory;
+
+    private int[][] projection;
+
+    public AppendOnlyFileStoreRead(
+            SchemaManager schemaManager,
+            long schemaId,
+            RowType rowType,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory) {
+        this.schemaManager = schemaManager;
+        this.schemaId = schemaId;
+        this.rowType = rowType;
+        this.fileFormat = fileFormat;
+        this.pathFactory = pathFactory;
+
+        this.projection = Projection.range(0, rowType.getFieldCount()).toNestedIndexes();
+    }
+
+    public FileStoreRead<RowData> withProjection(int[][] projectedFields) {
+        projection = projectedFields;
+        return this;
+    }
+
+    @Override
+    public RecordReader<RowData> createReader(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
+        BulkFormat<RowData, FileSourceSplit> readerFactory =
+                fileFormat.createReaderFactory(rowType, projection);
+        DataFilePathFactory dataFilePathFactory =
+                pathFactory.createDataFilePathFactory(partition, bucket);
+        List<ConcatRecordReader.ReaderSupplier<RowData>> suppliers = new ArrayList<>();
+        for (DataFileMeta file : files) {
+            suppliers.add(
+                    () ->
+                            new AppendOnlyReader(
+                                    dataFilePathFactory.toPath(file.fileName()), readerFactory));
+        }
+
+        return ConcatRecordReader.create(suppliers);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
new file mode 100644
index 00000000..521a91e4
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link FileStoreScan} for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */
+public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
+
+    private final FieldStatsArraySerializer rowStatsConverter;
+
+    private Predicate filter;
+
+    public AppendOnlyFileStoreScan(
+            RowType partitionType,
+            RowType rowType,
+            SnapshotManager snapshotManager,
+            ManifestFile.Factory manifestFileFactory,
+            ManifestList.Factory manifestListFactory,
+            int numOfBuckets) {
+        super(
+                partitionType,
+                snapshotManager,
+                manifestFileFactory,
+                manifestListFactory,
+                numOfBuckets);
+        this.rowStatsConverter = new FieldStatsArraySerializer(rowType);
+    }
+
+    public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
+        this.filter = predicate;
+        return this;
+    }
+
+    @Override
+    protected boolean filterByStats(ManifestEntry entry) {
+        return filter == null
+                || filter.test(
+                        entry.file().rowCount(),
+                        entry.file().valueStats().fields(rowStatsConverter));
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
new file mode 100644
index 00000000..76a780f9
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.AppendOnlyWriter;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/** {@link FileStoreWrite} for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */
+public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
+
+    private final long schemaId;
+    private final RowType rowType;
+    private final FileFormat fileFormat;
+    private final FileStorePathFactory pathFactory;
+    private final long targetFileSize;
+
+    public AppendOnlyFileStoreWrite(
+            long schemaId,
+            RowType rowType,
+            FileFormat fileFormat,
+            FileStorePathFactory pathFactory,
+            SnapshotManager snapshotManager,
+            FileStoreScan scan,
+            long targetFileSize) {
+        super(snapshotManager, scan);
+        this.schemaId = schemaId;
+        this.rowType = rowType;
+        this.fileFormat = fileFormat;
+        this.pathFactory = pathFactory;
+        this.targetFileSize = targetFileSize;
+    }
+
+    @Override
+    public RecordWriter<RowData> createWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        return createWriter(
+                partition, bucket, getMaxSequenceNumber(scanExistingFileMetas(partition, bucket)));
+    }
+
+    @Override
+    public RecordWriter<RowData> createEmptyWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
+        return createWriter(partition, bucket, -1L);
+    }
+
+    @Override
+    public RecordWriter<RowData> createCompactWriter(
+            BinaryRowData partition,
+            int bucket,
+            ExecutorService compactExecutor,
+            List<DataFileMeta> restoredFiles) {
+        throw new UnsupportedOperationException(
+                "Currently append only write mode does not support compaction.");
+    }
+
+    private RecordWriter<RowData> createWriter(
+            BinaryRowData partition, int bucket, long maxSeqNum) {
+        DataFilePathFactory factory = pathFactory.createDataFilePathFactory(partition, bucket);
+        return new AppendOnlyWriter(
+                schemaId, fileFormat, targetFileSize, rowType, maxSeqNum, factory);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
index b224529b..fbe02fea 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreRead.java
@@ -19,37 +19,20 @@
 package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.utils.RecordReader;
 
 import java.io.IOException;
 import java.util.List;
 
-/** Read operation which provides {@link RecordReader} creation. */
-public interface FileStoreRead {
-
-    /** With drop delete records. */
-    FileStoreRead withDropDelete(boolean dropDelete);
-
-    /** With key nested projection. */
-    FileStoreRead withKeyProjection(int[][] projectedFields);
-
-    /** With value nested projection. */
-    FileStoreRead withValueProjection(int[][] projectedFields);
+/**
+ * Read operation which provides {@link RecordReader} creation.
+ *
+ * @param <T> type of record to read.
+ */
+public interface FileStoreRead<T> {
 
-    /**
-     * Create a {@link RecordReader} from partition and bucket and files.
-     *
-     * <p>The resulting reader has the following characteristics:
-     *
-     * <ul>
-     *   <li>If {@link FileStoreRead#withKeyProjection} is called, key-values produced by this
-     *       reader may be unordered and may contain duplicated keys.
-     *   <li>If {@link FileStoreRead#withKeyProjection} is not called, key-values produced by this
-     *       reader is guaranteed to be ordered by keys and does not contain duplicated keys.
-     * </ul>
-     */
-    RecordReader<KeyValue> createReader(
-            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException;
+    /** Create a {@link RecordReader} from partition and bucket and files. */
+    RecordReader<T> createReader(BinaryRowData partition, int bucket, List<DataFileMeta> files)
+            throws IOException;
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 7a5eaa91..3d21746b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -41,10 +41,6 @@ public interface FileStoreScan {
 
     FileStoreScan withPartitionFilter(List<BinaryRowData> partitions);
 
-    FileStoreScan withKeyFilter(Predicate predicate);
-
-    FileStoreScan withValueFilter(Predicate predicate);
-
     FileStoreScan withBucket(int bucket);
 
     FileStoreScan withSnapshot(long snapshotId);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index d9e020b1..930bce95 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -25,18 +25,23 @@ import org.apache.flink.table.store.file.writer.RecordWriter;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
-/** Write operation which provides {@link RecordWriter} creation. */
-public interface FileStoreWrite {
+/**
+ * Write operation which provides {@link RecordWriter} creation.
+ *
+ * @param <T> type of record to write.
+ */
+public interface FileStoreWrite<T> {
 
     /** Create a {@link RecordWriter} from partition and bucket. */
-    RecordWriter createWriter(BinaryRowData partition, int bucket, ExecutorService compactExecutor);
+    RecordWriter<T> createWriter(
+            BinaryRowData partition, int bucket, ExecutorService compactExecutor);
 
     /** Create an empty {@link RecordWriter} from partition and bucket. */
-    RecordWriter createEmptyWriter(
+    RecordWriter<T> createEmptyWriter(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor);
 
     /** Create a compact {@link RecordWriter} from partition, bucket and restore files. */
-    RecordWriter createCompactWriter(
+    RecordWriter<T> createCompactWriter(
             BinaryRowData partition,
             int bucket,
             ExecutorService compactExecutor,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
similarity index 66%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
index 5d6ef5ca..12cc5ed9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreReadImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.operation;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileReader;
 import org.apache.flink.table.store.file.format.FileFormat;
@@ -33,102 +32,78 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 
-/** Default implementation of {@link FileStoreRead}. */
-public class FileStoreReadImpl implements FileStoreRead {
+/**
+ * {@link FileStoreRead} implementation for {@link
+ * org.apache.flink.table.store.file.KeyValueFileStore}.
+ */
+public class KeyValueFileStoreRead implements FileStoreRead<KeyValue> {
 
     private final DataFileReader.Factory dataFileReaderFactory;
-    private final WriteMode writeMode;
     private final Comparator<RowData> keyComparator;
-    @Nullable private final MergeFunction mergeFunction;
+    private final MergeFunction mergeFunction;
 
     private boolean keyProjected;
     private boolean dropDelete = true;
 
-    public FileStoreReadImpl(
+    public KeyValueFileStoreRead(
             SchemaManager schemaManager,
             long schemaId,
-            WriteMode writeMode,
             RowType keyType,
             RowType valueType,
             Comparator<RowData> keyComparator,
-            @Nullable MergeFunction mergeFunction,
+            MergeFunction mergeFunction,
             FileFormat fileFormat,
             FileStorePathFactory pathFactory) {
         this.dataFileReaderFactory =
                 new DataFileReader.Factory(
                         schemaManager, schemaId, keyType, valueType, fileFormat, pathFactory);
-        this.writeMode = writeMode;
         this.keyComparator = keyComparator;
         this.mergeFunction = mergeFunction;
 
         this.keyProjected = false;
     }
 
-    @Override
-    public FileStoreRead withDropDelete(boolean dropDelete) {
-        Preconditions.checkArgument(
-                writeMode != WriteMode.APPEND_ONLY || !dropDelete,
-                "Cannot drop delete message for append-only table.");
+    public KeyValueFileStoreRead withDropDelete(boolean dropDelete) {
         this.dropDelete = dropDelete;
         return this;
     }
 
-    @Override
-    public FileStoreRead withKeyProjection(int[][] projectedFields) {
+    public KeyValueFileStoreRead withKeyProjection(int[][] projectedFields) {
         dataFileReaderFactory.withKeyProjection(projectedFields);
         keyProjected = true;
         return this;
     }
 
-    @Override
-    public FileStoreRead withValueProjection(int[][] projectedFields) {
+    public KeyValueFileStoreRead withValueProjection(int[][] projectedFields) {
         dataFileReaderFactory.withValueProjection(projectedFields);
         return this;
     }
 
+    /**
+     * The resulting reader has the following characteristics:
+     *
+     * <ul>
+     *   <li>If {@link KeyValueFileStoreRead#withKeyProjection} is called, key-values produced by
+     *       this reader may be unordered and may contain duplicated keys.
+     *   <li>If {@link KeyValueFileStoreRead#withKeyProjection} is not called, key-values produced
+     *       by this reader is guaranteed to be ordered by keys and does not contain duplicated
+     *       keys.
+     * </ul>
+     */
     @Override
     public RecordReader<KeyValue> createReader(
             BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
-        switch (writeMode) {
-            case APPEND_ONLY:
-                return createAppendOnlyReader(partition, bucket, files);
-
-            case CHANGE_LOG:
-                return createMergeTreeReader(partition, bucket, files);
-
-            default:
-                throw new UnsupportedOperationException("Unknown write mode: " + writeMode);
-        }
-    }
-
-    private RecordReader<KeyValue> createAppendOnlyReader(
-            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
-        DataFileReader dataFileReader = dataFileReaderFactory.create(partition, bucket);
-        List<ConcatRecordReader.ReaderSupplier> suppliers = new ArrayList<>();
-        for (DataFileMeta file : files) {
-            suppliers.add(() -> dataFileReader.read(file.fileName()));
-        }
-
-        return ConcatRecordReader.create(suppliers);
-    }
-
-    private RecordReader<KeyValue> createMergeTreeReader(
-            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
         DataFileReader dataFileReader = dataFileReaderFactory.create(partition, bucket);
         if (keyProjected) {
             // key projection has been applied, so data file readers will not return key-values in
-            // order,
-            // we have to return the raw file contents without merging
-            List<ConcatRecordReader.ReaderSupplier> suppliers = new ArrayList<>();
+            // order, we have to return the raw file contents without merging
+            List<ConcatRecordReader.ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
             for (DataFileMeta file : files) {
                 suppliers.add(() -> dataFileReader.read(file.fileName()));
             }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
new file mode 100644
index 00000000..bdbf23fd
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link FileStoreScan} for {@link org.apache.flink.table.store.file.KeyValueFileStore}. */
+public class KeyValueFileStoreScan extends AbstractFileStoreScan {
+
+    private final FieldStatsArraySerializer keyStatsConverter;
+
+    private Predicate keyFilter;
+
+    public KeyValueFileStoreScan(
+            RowType partitionType,
+            RowType keyType,
+            SnapshotManager snapshotManager,
+            ManifestFile.Factory manifestFileFactory,
+            ManifestList.Factory manifestListFactory,
+            int numOfBuckets) {
+        super(
+                partitionType,
+                snapshotManager,
+                manifestFileFactory,
+                manifestListFactory,
+                numOfBuckets);
+        this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
+    }
+
+    public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
+        this.keyFilter = predicate;
+        return this;
+    }
+
+    @Override
+    protected boolean filterByStats(ManifestEntry entry) {
+        return keyFilter == null
+                || keyFilter.test(
+                        entry.file().rowCount(), entry.file().keyStats().fields(keyStatsConverter));
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
similarity index 68%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index b28653f1..86d68ef5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWriteImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -20,13 +20,11 @@ package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.data.DataFilePathFactory;
 import org.apache.flink.table.store.file.data.DataFileReader;
 import org.apache.flink.table.store.file.data.DataFileWriter;
 import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.mergetree.Levels;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
@@ -41,13 +39,10 @@ import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.AppendOnlyWriter;
 import org.apache.flink.table.store.file.writer.CompactWriter;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
-
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -55,25 +50,16 @@ import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 
-/** Default implementation of {@link FileStoreWrite}. */
-public class FileStoreWriteImpl implements FileStoreWrite {
+/** {@link FileStoreWrite} for {@link org.apache.flink.table.store.file.KeyValueFileStore}. */
+public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
 
-    private final WriteMode writeMode;
-    private final SchemaManager schemaManager;
-    private final long schemaId;
-    private final RowType valueType;
     private final DataFileReader.Factory dataFileReaderFactory;
     private final DataFileWriter.Factory dataFileWriterFactory;
-    private final FileFormat fileFormat;
     private final Supplier<Comparator<RowData>> keyComparatorSupplier;
     private final MergeFunction mergeFunction;
-    private final FileStorePathFactory pathFactory;
-    private final SnapshotManager snapshotManager;
-    private final FileStoreScan scan;
     private final MergeTreeOptions options;
 
-    public FileStoreWriteImpl(
-            WriteMode writeMode,
+    public KeyValueFileStoreWrite(
             SchemaManager schemaManager,
             long schemaId,
             RowType keyType,
@@ -85,9 +71,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             MergeTreeOptions options) {
-        this.schemaManager = schemaManager;
-        this.schemaId = schemaId;
-        this.valueType = valueType;
+        super(snapshotManager, scan);
         this.dataFileReaderFactory =
                 new DataFileReader.Factory(
                         schemaManager, schemaId, keyType, valueType, fileFormat, pathFactory);
@@ -99,69 +83,26 @@ public class FileStoreWriteImpl implements FileStoreWrite {
                         fileFormat,
                         pathFactory,
                         options.targetFileSize);
-        this.writeMode = writeMode;
-        this.fileFormat = fileFormat;
         this.keyComparatorSupplier = keyComparatorSupplier;
         this.mergeFunction = mergeFunction;
-        this.pathFactory = pathFactory;
-        this.snapshotManager = snapshotManager;
-        this.scan = scan;
         this.options = options;
     }
 
     @Override
-    public RecordWriter createWriter(
+    public RecordWriter<KeyValue> createWriter(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
-        Long latestSnapshotId = snapshotManager.latestSnapshotId();
-        List<DataFileMeta> existingFileMetas = Lists.newArrayList();
-        if (latestSnapshotId != null) {
-            // Concat all the DataFileMeta of existing files into existingFileMetas.
-            scan.withSnapshot(latestSnapshotId)
-                    .withPartitionFilter(Collections.singletonList(partition)).withBucket(bucket)
-                    .plan().files().stream()
-                    .map(ManifestEntry::file)
-                    .forEach(existingFileMetas::add);
-        }
-
-        switch (writeMode) {
-            case APPEND_ONLY:
-                DataFilePathFactory factory =
-                        pathFactory.createDataFilePathFactory(partition, bucket);
-                long maxSeqNum =
-                        existingFileMetas.stream()
-                                .map(DataFileMeta::maxSequenceNumber)
-                                .max(Long::compare)
-                                .orElse(-1L);
-
-                return new AppendOnlyWriter(
-                        schemaId,
-                        fileFormat,
-                        options.targetFileSize,
-                        valueType,
-                        maxSeqNum,
-                        factory);
-
-            case CHANGE_LOG:
-                if (latestSnapshotId == null) {
-                    return createEmptyWriter(partition, bucket, compactExecutor);
-                } else {
-                    return createMergeTreeWriter(
-                            partition, bucket, existingFileMetas, compactExecutor);
-                }
-
-            default:
-                throw new UnsupportedOperationException("Unknown write mode: " + writeMode);
-        }
+        return createMergeTreeWriter(
+                partition, bucket, scanExistingFileMetas(partition, bucket), compactExecutor);
     }
 
     @Override
-    public RecordWriter createEmptyWriter(
+    public RecordWriter<KeyValue> createEmptyWriter(
             BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
         return createMergeTreeWriter(partition, bucket, Collections.emptyList(), compactExecutor);
     }
 
     @Override
-    public RecordWriter createCompactWriter(
+    public RecordWriter<KeyValue> createCompactWriter(
             BinaryRowData partition,
             int bucket,
             ExecutorService compactExecutor,
@@ -177,16 +118,11 @@ public class FileStoreWriteImpl implements FileStoreWrite {
                         compactExecutor));
     }
 
-    private RecordWriter createMergeTreeWriter(
+    private RecordWriter<KeyValue> createMergeTreeWriter(
             BinaryRowData partition,
             int bucket,
             List<DataFileMeta> restoreFiles,
             ExecutorService compactExecutor) {
-        long maxSequenceNumber =
-                restoreFiles.stream()
-                        .map(DataFileMeta::maxSequenceNumber)
-                        .max(Long::compare)
-                        .orElse(-1L);
         DataFileWriter dataFileWriter = dataFileWriterFactory.create(partition, bucket);
         Comparator<RowData> keyComparator = keyComparatorSupplier.get();
         return new MergeTreeWriter(
@@ -204,7 +140,7 @@ public class FileStoreWriteImpl implements FileStoreWrite {
                                 options.numSortedRunCompactionTrigger),
                         compactExecutor),
                 new Levels(keyComparator, restoreFiles, options.numLevels),
-                maxSequenceNumber,
+                getMaxSequenceNumber(restoreFiles),
                 keyComparator,
                 mergeFunction.copy(),
                 dataFileWriter,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
index de625beb..884af4e8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
@@ -18,8 +18,7 @@
 
 package org.apache.flink.table.store.file.writer;
 
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
@@ -35,7 +34,7 @@ import java.util.concurrent.ExecutionException;
  * A {@link RecordWriter} implementation that only perform compaction on existing records and does
  * not generate new records.
  */
-public class CompactWriter implements RecordWriter {
+public class CompactWriter implements RecordWriter<KeyValue> {
 
     private final CompactUnit unit;
     private final CompactManager compactManager;
@@ -73,7 +72,7 @@ public class CompactWriter implements RecordWriter {
     }
 
     @Override
-    public void write(ValueKind valueKind, RowData key, RowData value) throws Exception {
+    public void write(KeyValue kv) throws Exception {
         // nothing to write
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
index 4f29e278..5b0ce6aa 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.file.writer;
 
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.Increment;
 
@@ -29,11 +27,13 @@ import java.util.List;
  * The {@code RecordWriter} is responsible for writing data and handling in-progress files used to
  * write yet un-staged data. The incremental files ready to commit is returned to the system by the
  * {@link #prepareCommit()}.
+ *
+ * @param <T> type of record to write.
  */
-public interface RecordWriter {
+public interface RecordWriter<T> {
 
     /** Add a key-value element to the writer. */
-    void write(ValueKind valueKind, RowData key, RowData value) throws Exception;
+    void write(T record) throws Exception;
 
     /**
      * Prepare for a commit.
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index ff0c8539..7f0de998 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -19,65 +19,67 @@
 package org.apache.flink.table.store.table;
 
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.AppendOnlyFileStore;
 import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreRead;
+import org.apache.flink.table.store.file.operation.AppendOnlyFileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.AbstractTableWrite;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
-import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
-import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 
+import java.io.IOException;
+import java.util.List;
+
 /** {@link FileStoreTable} for {@link WriteMode#APPEND_ONLY} write mode. */
 public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
 
     private static final long serialVersionUID = 1L;
 
-    private final FileStoreImpl store;
+    private final AppendOnlyFileStore store;
 
     AppendOnlyFileStoreTable(String name, SchemaManager schemaManager, Schema schema, String user) {
         super(name, schema);
         this.store =
-                new FileStoreImpl(
+                new AppendOnlyFileStore(
                         schemaManager,
                         schema.id(),
                         new FileStoreOptions(schema.options()),
-                        WriteMode.APPEND_ONLY,
                         user,
                         schema.logicalPartitionType(),
-                        RowType.of(),
-                        schema.logicalRowType(),
-                        null);
+                        schema.logicalRowType());
     }
 
     @Override
     public TableScan newScan() {
-        return new TableScan(store.newScan(), schema, store.pathFactory()) {
+        AppendOnlyFileStoreScan scan = store.newScan();
+        return new TableScan(scan, schema, store.pathFactory()) {
             @Override
             protected void withNonPartitionFilter(Predicate predicate) {
-                scan.withValueFilter(predicate);
+                scan.withFilter(predicate);
             }
         };
     }
 
     @Override
     public TableRead newRead() {
-        return new TableRead(store.newRead()) {
+        AppendOnlyFileStoreRead read = store.newRead();
+        return new TableRead() {
             @Override
             public TableRead withProjection(int[][] projection) {
-                read.withValueProjection(projection);
+                read.withProjection(projection);
                 return this;
             }
 
@@ -87,9 +89,10 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
             }
 
             @Override
-            protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(
-                    RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
-                return new ValueContentRowDataRecordIterator(kvRecordIterator);
+            public RecordReader<RowData> createReader(
+                    BinaryRowData partition, int bucket, List<DataFileMeta> files)
+                    throws IOException {
+                return read.createReader(partition, bucket, files);
             }
         };
     }
@@ -98,21 +101,21 @@ public class AppendOnlyFileStoreTable extends AbstractFileStoreTable {
     public TableWrite newWrite() {
         SinkRecordConverter recordConverter =
                 new SinkRecordConverter(store.options().bucket(), schema);
-        return new TableWrite(store.newWrite(), recordConverter) {
+        return new AbstractTableWrite<RowData>(store.newWrite(), recordConverter) {
             @Override
-            protected void writeSinkRecord(SinkRecord record, RecordWriter writer)
+            protected void writeSinkRecord(SinkRecord record, RecordWriter<RowData> writer)
                     throws Exception {
                 Preconditions.checkState(
                         record.row().getRowKind() == RowKind.INSERT,
                         "Append only writer can not accept row with RowKind %s",
                         record.row().getRowKind());
-                writer.write(ValueKind.ADD, BinaryRowDataUtil.EMPTY_ROW, record.row());
+                writer.write(record.row());
             }
         };
     }
 
     @Override
-    public FileStoreImpl store() {
+    public AppendOnlyFileStore store() {
         return store;
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 79785d6e..c9f85f80 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -20,21 +20,24 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStoreImpl;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.KeyValueFileStore;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.AbstractTableWrite;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.KeyValueTableRead;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
 import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
@@ -47,7 +50,7 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
 
     private static final long serialVersionUID = 1L;
 
-    private final FileStoreImpl store;
+    private final KeyValueFileStore store;
 
     ChangelogValueCountFileStoreTable(
             String name, SchemaManager schemaManager, Schema schema, String user) {
@@ -57,11 +60,10 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
                         new LogicalType[] {new BigIntType(false)}, new String[] {"_VALUE_COUNT"});
         MergeFunction mergeFunction = new ValueCountMergeFunction();
         this.store =
-                new FileStoreImpl(
+                new KeyValueFileStore(
                         schemaManager,
                         schema.id(),
                         new FileStoreOptions(schema.options()),
-                        WriteMode.CHANGE_LOG,
                         user,
                         schema.logicalPartitionType(),
                         schema.logicalRowType(),
@@ -71,7 +73,8 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
 
     @Override
     public TableScan newScan() {
-        return new TableScan(store.newScan(), schema, store.pathFactory()) {
+        KeyValueFileStoreScan scan = store.newScan();
+        return new TableScan(scan, schema, store.pathFactory()) {
             @Override
             protected void withNonPartitionFilter(Predicate predicate) {
                 scan.withKeyFilter(predicate);
@@ -81,7 +84,7 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
 
     @Override
     public TableRead newRead() {
-        return new TableRead(store.newRead()) {
+        return new KeyValueTableRead(store.newRead()) {
             private int[][] projection = null;
             private boolean isIncremental = false;
 
@@ -114,29 +117,31 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
     public TableWrite newWrite() {
         SinkRecordConverter recordConverter =
                 new SinkRecordConverter(store.options().bucket(), schema);
-        return new TableWrite(store.newWrite(), recordConverter) {
+        return new AbstractTableWrite<KeyValue>(store.newWrite(), recordConverter) {
             @Override
-            protected void writeSinkRecord(SinkRecord record, RecordWriter writer)
+            protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
                     throws Exception {
+                KeyValue kv = new KeyValue();
                 switch (record.row().getRowKind()) {
                     case INSERT:
                     case UPDATE_AFTER:
-                        writer.write(ValueKind.ADD, record.row(), GenericRowData.of(1L));
+                        kv.replace(record.row(), ValueKind.ADD, GenericRowData.of(1L));
                         break;
                     case UPDATE_BEFORE:
                     case DELETE:
-                        writer.write(ValueKind.ADD, record.row(), GenericRowData.of(-1L));
+                        kv.replace(record.row(), ValueKind.ADD, GenericRowData.of(-1L));
                         break;
                     default:
                         throw new UnsupportedOperationException(
                                 "Unknown row kind " + record.row().getRowKind());
                 }
+                writer.write(kv);
             }
         };
     }
 
     @Override
-    public FileStoreImpl store() {
+    public KeyValueFileStore store() {
         return store;
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 6d611638..7109a91a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -20,23 +20,26 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.FileStoreImpl;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.KeyValueFileStore;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.store.table.sink.AbstractTableWrite;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.KeyValueTableRead;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
 import org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
@@ -53,7 +56,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
 
     private static final long serialVersionUID = 1L;
 
-    private final FileStoreImpl store;
+    private final KeyValueFileStore store;
 
     ChangelogWithKeyFileStoreTable(
             String name, SchemaManager schemaManager, Schema schema, String user) {
@@ -93,11 +96,10 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
         }
 
         this.store =
-                new FileStoreImpl(
+                new KeyValueFileStore(
                         schemaManager,
                         schema.id(),
                         new FileStoreOptions(conf),
-                        WriteMode.CHANGE_LOG,
                         user,
                         schema.logicalPartitionType(),
                         keyType,
@@ -107,7 +109,8 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
 
     @Override
     public TableScan newScan() {
-        return new TableScan(store.newScan(), schema, store.pathFactory()) {
+        KeyValueFileStoreScan scan = store.newScan();
+        return new TableScan(scan, schema, store.pathFactory()) {
             @Override
             protected void withNonPartitionFilter(Predicate predicate) {
                 // currently we can only perform filter push down on keys
@@ -138,7 +141,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
 
     @Override
     public TableRead newRead() {
-        return new TableRead(store.newRead()) {
+        return new KeyValueTableRead(store.newRead()) {
             @Override
             public TableRead withProjection(int[][] projection) {
                 read.withValueProjection(projection);
@@ -163,29 +166,31 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
     public TableWrite newWrite() {
         SinkRecordConverter recordConverter =
                 new SinkRecordConverter(store.options().bucket(), schema);
-        return new TableWrite(store.newWrite(), recordConverter) {
+        return new AbstractTableWrite<KeyValue>(store.newWrite(), recordConverter) {
             @Override
-            protected void writeSinkRecord(SinkRecord record, RecordWriter writer)
+            protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
                     throws Exception {
+                KeyValue kv = new KeyValue();
                 switch (record.row().getRowKind()) {
                     case INSERT:
                     case UPDATE_AFTER:
-                        writer.write(ValueKind.ADD, record.primaryKey(), record.row());
+                        kv.replace(record.primaryKey(), ValueKind.ADD, record.row());
                         break;
                     case UPDATE_BEFORE:
                     case DELETE:
-                        writer.write(ValueKind.DELETE, record.primaryKey(), record.row());
+                        kv.replace(record.primaryKey(), ValueKind.DELETE, record.row());
                         break;
                     default:
                         throw new UnsupportedOperationException(
                                 "Unknown row kind " + record.row().getRowKind());
                 }
+                writer.write(kv);
             }
         };
     }
 
     @Override
-    public FileStoreImpl store() {
+    public KeyValueFileStore store() {
         return store;
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
similarity index 75%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
index 2b70a329..1e45320e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
@@ -33,18 +33,22 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-/** An abstraction layer above {@link FileStoreWrite} to provide {@link RowData} writing. */
-public abstract class TableWrite {
+/**
+ * Base {@link TableWrite} implementation.
+ *
+ * @param <T> type of record to write into {@link org.apache.flink.table.store.file.FileStore}.
+ */
+public abstract class AbstractTableWrite<T> implements TableWrite {
 
-    private final FileStoreWrite write;
+    private final FileStoreWrite<T> write;
     private final SinkRecordConverter recordConverter;
 
-    private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
+    private final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers;
     private final ExecutorService compactExecutor;
 
     private boolean overwrite = false;
 
-    protected TableWrite(FileStoreWrite write, SinkRecordConverter recordConverter) {
+    protected AbstractTableWrite(FileStoreWrite<T> write, SinkRecordConverter recordConverter) {
         this.write = write;
         this.recordConverter = recordConverter;
 
@@ -54,36 +58,40 @@ public abstract class TableWrite {
                         new ExecutorThreadFactory("compaction-thread"));
     }
 
+    @Override
     public TableWrite withOverwrite(boolean overwrite) {
         this.overwrite = overwrite;
         return this;
     }
 
+    @Override
     public SinkRecordConverter recordConverter() {
         return recordConverter;
     }
 
+    @Override
     public SinkRecord write(RowData rowData) throws Exception {
         SinkRecord record = recordConverter.convert(rowData);
-        RecordWriter writer = getWriter(record.partition(), record.bucket());
+        RecordWriter<T> writer = getWriter(record.partition(), record.bucket());
         writeSinkRecord(record, writer);
         return record;
     }
 
+    @Override
     public List<FileCommittable> prepareCommit() throws Exception {
         List<FileCommittable> result = new ArrayList<>();
 
-        Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter>>> partIter =
+        Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>>> partIter =
                 writers.entrySet().iterator();
         while (partIter.hasNext()) {
-            Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> partEntry = partIter.next();
+            Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>> partEntry = partIter.next();
             BinaryRowData partition = partEntry.getKey();
-            Iterator<Map.Entry<Integer, RecordWriter>> bucketIter =
+            Iterator<Map.Entry<Integer, RecordWriter<T>>> bucketIter =
                     partEntry.getValue().entrySet().iterator();
             while (bucketIter.hasNext()) {
-                Map.Entry<Integer, RecordWriter> entry = bucketIter.next();
+                Map.Entry<Integer, RecordWriter<T>> entry = bucketIter.next();
                 int bucket = entry.getKey();
-                RecordWriter writer = entry.getValue();
+                RecordWriter<T> writer = entry.getValue();
                 FileCommittable committable =
                         new FileCommittable(partition, bucket, writer.prepareCommit());
                 result.add(committable);
@@ -105,15 +113,16 @@ public abstract class TableWrite {
         return result;
     }
 
-    private void closeWriter(RecordWriter writer) throws Exception {
+    private void closeWriter(RecordWriter<T> writer) throws Exception {
         writer.sync();
         writer.close();
     }
 
+    @Override
     public void close() throws Exception {
         compactExecutor.shutdownNow();
-        for (Map<Integer, RecordWriter> bucketWriters : writers.values()) {
-            for (RecordWriter writer : bucketWriters.values()) {
+        for (Map<Integer, RecordWriter<T>> bucketWriters : writers.values()) {
+            for (RecordWriter<T> writer : bucketWriters.values()) {
                 closeWriter(writer);
             }
         }
@@ -121,15 +130,15 @@ public abstract class TableWrite {
     }
 
     @VisibleForTesting
-    public Map<BinaryRowData, Map<Integer, RecordWriter>> writers() {
+    public Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers() {
         return writers;
     }
 
-    protected abstract void writeSinkRecord(SinkRecord record, RecordWriter writer)
+    protected abstract void writeSinkRecord(SinkRecord record, RecordWriter<T> writer)
             throws Exception;
 
-    private RecordWriter getWriter(BinaryRowData partition, int bucket) {
-        Map<Integer, RecordWriter> buckets = writers.get(partition);
+    private RecordWriter<T> getWriter(BinaryRowData partition, int bucket) {
+        Map<Integer, RecordWriter<T>> buckets = writers.get(partition);
         if (buckets == null) {
             buckets = new HashMap<>();
             writers.put(partition.copy(), buckets);
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index 2b70a329..5384f892 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -18,127 +18,23 @@
 
 package org.apache.flink.table.store.table.sink;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.operation.FileStoreWrite;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
-/** An abstraction layer above {@link FileStoreWrite} to provide {@link RowData} writing. */
-public abstract class TableWrite {
-
-    private final FileStoreWrite write;
-    private final SinkRecordConverter recordConverter;
-
-    private final Map<BinaryRowData, Map<Integer, RecordWriter>> writers;
-    private final ExecutorService compactExecutor;
-
-    private boolean overwrite = false;
-
-    protected TableWrite(FileStoreWrite write, SinkRecordConverter recordConverter) {
-        this.write = write;
-        this.recordConverter = recordConverter;
-
-        this.writers = new HashMap<>();
-        this.compactExecutor =
-                Executors.newSingleThreadScheduledExecutor(
-                        new ExecutorThreadFactory("compaction-thread"));
-    }
-
-    public TableWrite withOverwrite(boolean overwrite) {
-        this.overwrite = overwrite;
-        return this;
-    }
-
-    public SinkRecordConverter recordConverter() {
-        return recordConverter;
-    }
-
-    public SinkRecord write(RowData rowData) throws Exception {
-        SinkRecord record = recordConverter.convert(rowData);
-        RecordWriter writer = getWriter(record.partition(), record.bucket());
-        writeSinkRecord(record, writer);
-        return record;
-    }
-
-    public List<FileCommittable> prepareCommit() throws Exception {
-        List<FileCommittable> result = new ArrayList<>();
-
-        Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter>>> partIter =
-                writers.entrySet().iterator();
-        while (partIter.hasNext()) {
-            Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> partEntry = partIter.next();
-            BinaryRowData partition = partEntry.getKey();
-            Iterator<Map.Entry<Integer, RecordWriter>> bucketIter =
-                    partEntry.getValue().entrySet().iterator();
-            while (bucketIter.hasNext()) {
-                Map.Entry<Integer, RecordWriter> entry = bucketIter.next();
-                int bucket = entry.getKey();
-                RecordWriter writer = entry.getValue();
-                FileCommittable committable =
-                        new FileCommittable(partition, bucket, writer.prepareCommit());
-                result.add(committable);
-
-                // clear if no update
-                // we need a mechanism to clear writers, otherwise there will be more and more
-                // such as yesterday's partition that no longer needs to be written.
-                if (committable.increment().newFiles().isEmpty()) {
-                    closeWriter(writer);
-                    bucketIter.remove();
-                }
-            }
-
-            if (partEntry.getValue().isEmpty()) {
-                partIter.remove();
-            }
-        }
-
-        return result;
-    }
+/**
+ * An abstraction layer above {@link org.apache.flink.table.store.file.operation.FileStoreWrite} to
+ * provide {@link RowData} writing.
+ */
+public interface TableWrite {
 
-    private void closeWriter(RecordWriter writer) throws Exception {
-        writer.sync();
-        writer.close();
-    }
+    TableWrite withOverwrite(boolean overwrite);
 
-    public void close() throws Exception {
-        compactExecutor.shutdownNow();
-        for (Map<Integer, RecordWriter> bucketWriters : writers.values()) {
-            for (RecordWriter writer : bucketWriters.values()) {
-                closeWriter(writer);
-            }
-        }
-        writers.clear();
-    }
+    SinkRecordConverter recordConverter();
 
-    @VisibleForTesting
-    public Map<BinaryRowData, Map<Integer, RecordWriter>> writers() {
-        return writers;
-    }
+    SinkRecord write(RowData rowData) throws Exception;
 
-    protected abstract void writeSinkRecord(SinkRecord record, RecordWriter writer)
-            throws Exception;
+    List<FileCommittable> prepareCommit() throws Exception;
 
-    private RecordWriter getWriter(BinaryRowData partition, int bucket) {
-        Map<Integer, RecordWriter> buckets = writers.get(partition);
-        if (buckets == null) {
-            buckets = new HashMap<>();
-            writers.put(partition.copy(), buckets);
-        }
-        return buckets.computeIfAbsent(
-                bucket,
-                k ->
-                        overwrite
-                                ? write.createEmptyWriter(partition.copy(), bucket, compactExecutor)
-                                : write.createWriter(partition.copy(), bucket, compactExecutor));
-    }
+    void close() throws Exception;
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
similarity index 75%
copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
index 53ff22e6..f5336065 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/KeyValueTableRead.java
@@ -22,36 +22,26 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.operation.FileStoreRead;
+import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
 import org.apache.flink.table.store.file.utils.RecordReader;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
 
-/** An abstraction layer above {@link FileStoreRead} to provide reading of {@link RowData}. */
-public abstract class TableRead {
+/**
+ * An abstraction layer above {@link KeyValueFileStoreRead} to provide reading of {@link RowData}.
+ */
+public abstract class KeyValueTableRead implements TableRead {
 
-    protected final FileStoreRead read;
+    protected final KeyValueFileStoreRead read;
 
-    protected TableRead(FileStoreRead read) {
+    protected KeyValueTableRead(KeyValueFileStoreRead read) {
         this.read = read;
     }
 
-    // TODO support filter push down
-
-    public TableRead withProjection(int[] projection) {
-        int[][] nestedProjection =
-                Arrays.stream(projection).mapToObj(i -> new int[] {i}).toArray(int[][]::new);
-        return withProjection(nestedProjection);
-    }
-
-    public abstract TableRead withProjection(int[][] projection);
-
-    public abstract TableRead withIncremental(boolean isIncremental);
-
+    @Override
     public RecordReader<RowData> createReader(
             BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
         return new RowDataRecordReader(read.createReader(partition, bucket, files));
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
index 53ff22e6..013c1b49 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableRead.java
@@ -20,64 +20,29 @@ package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.utils.RecordReader;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
 /** An abstraction layer above {@link FileStoreRead} to provide reading of {@link RowData}. */
-public abstract class TableRead {
-
-    protected final FileStoreRead read;
-
-    protected TableRead(FileStoreRead read) {
-        this.read = read;
-    }
+public interface TableRead {
 
     // TODO support filter push down
 
-    public TableRead withProjection(int[] projection) {
+    default TableRead withProjection(int[] projection) {
         int[][] nestedProjection =
                 Arrays.stream(projection).mapToObj(i -> new int[] {i}).toArray(int[][]::new);
         return withProjection(nestedProjection);
     }
 
-    public abstract TableRead withProjection(int[][] projection);
-
-    public abstract TableRead withIncremental(boolean isIncremental);
-
-    public RecordReader<RowData> createReader(
-            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException {
-        return new RowDataRecordReader(read.createReader(partition, bucket, files));
-    }
-
-    protected abstract RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(
-            RecordReader.RecordIterator<KeyValue> kvRecordIterator);
-
-    private class RowDataRecordReader implements RecordReader<RowData> {
+    TableRead withProjection(int[][] projection);
 
-        private final RecordReader<KeyValue> wrapped;
+    TableRead withIncremental(boolean isIncremental);
 
-        private RowDataRecordReader(RecordReader<KeyValue> wrapped) {
-            this.wrapped = wrapped;
-        }
-
-        @Nullable
-        @Override
-        public RecordIterator<RowData> readBatch() throws IOException {
-            RecordIterator<KeyValue> batch = wrapped.readBatch();
-            return batch == null ? null : rowDataRecordIteratorFromKv(batch);
-        }
-
-        @Override
-        public void close() throws IOException {
-            wrapped.close();
-        }
-    }
+    RecordReader<RowData> createReader(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files) throws IOException;
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index ada81909..1769e6e0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -36,7 +36,7 @@ import java.util.Optional;
 /** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
 public abstract class TableScan {
 
-    protected final FileStoreScan scan;
+    private final FileStoreScan scan;
     private final Schema schema;
     private final FileStorePathFactory pathFactory;
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 7b872218..655b38db 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -69,7 +69,7 @@ import java.util.stream.Collectors;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** {@link FileStore} for tests. */
-public class TestFileStore extends FileStoreImpl {
+public class TestFileStore extends KeyValueFileStore {
 
     private static final Logger LOG = LoggerFactory.getLogger(TestFileStore.class);
 
@@ -115,7 +115,6 @@ public class TestFileStore extends FileStoreImpl {
                 new SchemaManager(options.path()),
                 0L,
                 options,
-                WriteMode.CHANGE_LOG,
                 UUID.randomUUID().toString(),
                 partitionType,
                 keyType,
@@ -182,12 +181,17 @@ public class TestFileStore extends FileStoreImpl {
             List<KeyValue> kvs,
             Function<KeyValue, BinaryRowData> partitionCalculator,
             Function<KeyValue, Integer> bucketCalculator,
-            QuadFunction<FileStoreWrite, BinaryRowData, Integer, ExecutorService, RecordWriter>
+            QuadFunction<
+                            FileStoreWrite<KeyValue>,
+                            BinaryRowData,
+                            Integer,
+                            ExecutorService,
+                            RecordWriter<KeyValue>>
                     createWriterFunction,
             BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
             throws Exception {
-        FileStoreWrite write = newWrite();
-        Map<BinaryRowData, Map<Integer, RecordWriter>> writers = new HashMap<>();
+        FileStoreWrite<KeyValue> write = newWrite();
+        Map<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> writers = new HashMap<>();
         for (KeyValue kv : kvs) {
             BinaryRowData partition = partitionCalculator.apply(kv);
             int bucket = bucketCalculator.apply(kv);
@@ -203,15 +207,15 @@ public class TestFileStore extends FileStoreImpl {
                                     return w;
                                 }
                             })
-                    .write(kv.valueKind(), kv.key(), kv.value());
+                    .write(kv);
         }
 
         FileStoreCommit commit = newCommit();
         ManifestCommittable committable =
                 new ManifestCommittable(String.valueOf(new Random().nextLong()));
-        for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter>> entryWithPartition :
+        for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> entryWithPartition :
                 writers.entrySet()) {
-            for (Map.Entry<Integer, RecordWriter> entryWithBucket :
+            for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket :
                     entryWithPartition.getValue().entrySet()) {
                 Increment increment = entryWithBucket.getValue().prepareCommit();
                 committable.addFileCommittable(
@@ -271,7 +275,7 @@ public class TestFileStore extends FileStoreImpl {
         }
 
         List<KeyValue> kvs = new ArrayList<>();
-        FileStoreRead read = newRead();
+        FileStoreRead<KeyValue> read = newRead();
         for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entryWithPartition :
                 filesPerPartitionAndBucket.entrySet()) {
             for (Map.Entry<Integer, List<DataFileMeta>> entryWithBucket :
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
similarity index 90%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index f5927de3..774676ad 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.flink.table.store.file.writer;
+package org.apache.flink.table.store.file.data;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -26,13 +26,11 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryRowDataUtil;
 import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.ValueKind;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.data.DataFilePathFactory;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.stats.FieldStats;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
+import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -71,7 +69,7 @@ public class AppendOnlyWriterTest {
 
     @Test
     public void testEmptyCommits() throws Exception {
-        RecordWriter writer = createWriter(1024 * 1024L, SCHEMA, 0);
+        RecordWriter<RowData> writer = createWriter(1024 * 1024L, SCHEMA, 0);
 
         for (int i = 0; i < 3; i++) {
             writer.sync();
@@ -85,8 +83,8 @@ public class AppendOnlyWriterTest {
 
     @Test
     public void testSingleWrite() throws Exception {
-        RecordWriter writer = createWriter(1024 * 1024L, SCHEMA, 0);
-        writer.write(ValueKind.ADD, EMPTY_ROW, row(1, "AAA", PART));
+        RecordWriter<RowData> writer = createWriter(1024 * 1024L, SCHEMA, 0);
+        writer.write(row(1, "AAA", PART));
 
         List<DataFileMeta> result = writer.close();
 
@@ -115,7 +113,7 @@ public class AppendOnlyWriterTest {
 
     @Test
     public void testMultipleCommits() throws Exception {
-        RecordWriter writer = createWriter(1024 * 1024L, SCHEMA, 0);
+        RecordWriter<RowData> writer = createWriter(1024 * 1024L, SCHEMA, 0);
 
         // Commit 5 continues txn.
         for (int txn = 0; txn < 5; txn += 1) {
@@ -124,7 +122,7 @@ public class AppendOnlyWriterTest {
             int start = txn * 100;
             int end = txn * 100 + 100;
             for (int i = start; i < end; i++) {
-                writer.write(ValueKind.ADD, EMPTY_ROW, row(i, String.format("%03d", i), PART));
+                writer.write(row(i, String.format("%03d", i), PART));
             }
 
             writer.sync();
@@ -161,10 +159,10 @@ public class AppendOnlyWriterTest {
     public void testRollingWrite() throws Exception {
         // Set a very small target file size, so that we will roll over to a new file even if
         // writing one record.
-        RecordWriter writer = createWriter(10L, SCHEMA, 0);
+        RecordWriter<RowData> writer = createWriter(10L, SCHEMA, 0);
 
         for (int i = 0; i < 10; i++) {
-            writer.write(ValueKind.ADD, EMPTY_ROW, row(i, String.format("%03d", i), PART));
+            writer.write(row(i, String.format("%03d", i), PART));
         }
 
         writer.sync();
@@ -220,7 +218,8 @@ public class AppendOnlyWriterTest {
                 FileStoreOptions.FILE_FORMAT.defaultValue());
     }
 
-    private RecordWriter createWriter(long targetFileSize, RowType writeSchema, long maxSeqNum) {
+    private RecordWriter<RowData> createWriter(
+            long targetFileSize, RowType writeSchema, long maxSeqNum) {
         FileFormat fileFormat =
                 FileFormat.fromIdentifier(
                         Thread.currentThread().getContextClassLoader(), AVRO, new Configuration());
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 331e8a9e..16137dd3 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -82,7 +82,7 @@ public class MergeTreeTest {
     private MergeTreeOptions options;
     private DataFileReader dataFileReader;
     private DataFileWriter dataFileWriter;
-    private RecordWriter writer;
+    private RecordWriter<KeyValue> writer;
 
     @BeforeEach
     public void beforeEach() throws IOException {
@@ -357,7 +357,8 @@ public class MergeTreeTest {
 
     private void writeAll(List<TestRecord> records) throws Exception {
         for (TestRecord record : records) {
-            writer.write(record.kind, row(record.k), row(record.v));
+            KeyValue kv = new KeyValue().replace(row(record.k), record.kind, row(record.v));
+            writer.write(kv);
         }
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
similarity index 98%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index a3378ed1..af4f774c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -50,8 +50,8 @@ import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link FileStoreReadImpl}. */
-public class FileStoreReadTest {
+/** Tests for {@link KeyValueFileStoreRead}. */
+public class KeyValueFileStoreReadTest {
 
     @TempDir java.nio.file.Path tempDir;
 
@@ -189,7 +189,7 @@ public class FileStoreReadTest {
                 scan.withSnapshot(store.snapshotManager().latestSnapshotId()).plan().files()
                         .stream()
                         .collect(Collectors.groupingBy(ManifestEntry::partition));
-        FileStoreRead read = store.newRead();
+        KeyValueFileStoreRead read = store.newRead();
         if (keyProjection != null) {
             read.withKeyProjection(keyProjection);
             read.withDropDelete(false);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
similarity index 90%
rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
index f6245761..74f7ecae 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreScanTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScanTest.java
@@ -46,8 +46,8 @@ import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link FileStoreScanImpl}. */
-public class FileStoreScanTest {
+/** Tests for {@link KeyValueFileStoreScan}. */
+public class KeyValueFileStoreScanTest {
 
     private static final int NUM_BUCKETS = 10;
 
@@ -112,7 +112,7 @@ public class FileStoreScanTest {
 
         int wantedShopId = data.get(random.nextInt(data.size())).key().getInt(0);
 
-        FileStoreScan scan = store.newScan();
+        KeyValueFileStoreScan scan = store.newScan();
         scan.withSnapshot(snapshot.id());
         scan.withKeyFilter(
                 PredicateBuilder.equal(0, new Literal(new IntType(false), wantedShopId)));
@@ -125,27 +125,6 @@ public class FileStoreScanTest {
         runTestContainsAll(scan, snapshot.id(), expected);
     }
 
-    @Test
-    public void testWithValueFilter() throws Exception {
-        ThreadLocalRandom random = ThreadLocalRandom.current();
-        List<KeyValue> data = generateData(random.nextInt(1000) + 1);
-        Snapshot snapshot = writeData(data);
-
-        int wantedShopId = data.get(random.nextInt(data.size())).value().getInt(2);
-
-        FileStoreScan scan = store.newScan();
-        scan.withSnapshot(snapshot.id());
-        scan.withValueFilter(
-                PredicateBuilder.equal(2, new Literal(new IntType(false), wantedShopId)));
-
-        Map<BinaryRowData, BinaryRowData> expected =
-                store.toKvMap(
-                        data.stream()
-                                .filter(kv -> kv.value().getInt(2) == wantedShopId)
-                                .collect(Collectors.toList()));
-        runTestContainsAll(scan, snapshot.id(), expected);
-    }
-
     @Test
     public void testWithBucket() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index c65ef8bb..d51e4416 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -49,7 +49,7 @@ public class TestCommitThread extends Thread {
     private final Map<BinaryRowData, List<KeyValue>> result;
     private final Map<BinaryRowData, MergeTreeWriter> writers;
 
-    private final FileStoreWrite write;
+    private final FileStoreWrite<KeyValue> write;
     private final FileStoreCommit commit;
 
     public TestCommitThread(
@@ -150,7 +150,7 @@ public class TestCommitThread extends Thread {
         MergeTreeWriter writer =
                 writers.compute(partition, (p, w) -> w == null ? createWriter(p, false) : w);
         for (KeyValue kv : changes) {
-            writer.write(kv.valueKind(), kv.key(), kv.value());
+            writer.write(kv);
         }
     }
 
@@ -169,7 +169,7 @@ public class TestCommitThread extends Thread {
         MergeTreeWriter writer = createWriter(partition, true);
         writers.put(partition, writer);
         for (KeyValue kv : changes) {
-            writer.write(kv.valueKind(), kv.key(), kv.value());
+            writer.write(kv);
         }
 
         return partition;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 22460fee..0ee8e79f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -34,7 +34,6 @@ import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
 
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -45,8 +44,6 @@ import java.util.List;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link AppendOnlyFileStoreTable}. */
-// TODO enable this test class after append only file store with avro format is fixed
-@Disabled
 public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
 
     @TempDir java.nio.file.Path tempDir;
@@ -59,10 +56,10 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(
+                .hasSameElementsAs(
                         Arrays.asList("1|10|100", "1|11|101", "1|12|102", "1|11|101", "1|12|102"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("2|20|200", "2|21|201", "2|22|202", "2|21|201"));
+                .hasSameElementsAs(Arrays.asList("2|20|200", "2|21|201", "2|22|202", "2|21|201"));
     }
 
     @Test
@@ -73,9 +70,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_PROJECTED_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("100|10", "101|11", "102|12", "101|11", "102|12"));
+                .hasSameElementsAs(Arrays.asList("100|10", "101|11", "102|12", "101|11", "102|12"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_PROJECTED_ROW_TO_STRING))
-                .isEqualTo(Arrays.asList("200|20", "201|21", "202|22", "201|21"));
+                .hasSameElementsAs(Arrays.asList("200|20", "201|21", "202|22", "201|21"));
     }
 
     @Test
@@ -90,7 +87,7 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .isEqualTo(
+                .hasSameElementsAs(
                         Arrays.asList(
                                 "2|21|201",
                                 // this record is in the same file with the first "2|21|201"
@@ -108,7 +105,7 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
                 .isEqualTo(Arrays.asList("+1|11|101", "+1|12|102"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
-                .hasSameElementsAs(Collections.singletonList("+2|21|201"));
+                .isEqualTo(Collections.singletonList("+2|21|201"));
     }
 
     @Test
@@ -119,9 +116,9 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().withIncremental(true).plan().splits;
         TableRead read = table.newRead().withIncremental(true).withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_PROJECTED_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("+101|11", "+102|12"));
+                .isEqualTo(Arrays.asList("+101|11", "+102|12"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_PROJECTED_ROW_TO_STRING))
-                .hasSameElementsAs(Collections.singletonList("+201|21"));
+                .isEqualTo(Collections.singletonList("+201|21"));
     }
 
     @Test
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 7154062c..b6fa2abc 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -57,9 +57,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("1|11|101", "1|12|102", "1|11|101"));
+                .isEqualTo(Arrays.asList("1|11|101", "1|11|101", "1|12|102"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("2|20|200", "2|21|201", "2|22|202"));
+                .isEqualTo(Arrays.asList("2|20|200", "2|21|201", "2|22|202"));
     }
 
     @Test
@@ -70,9 +70,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_PROJECTED_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("101|11", "102|12", "101|11"));
+                .isEqualTo(Arrays.asList("101|11", "101|11", "102|12"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_PROJECTED_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("200|20", "201|21", "202|22"));
+                .isEqualTo(Arrays.asList("200|20", "201|21", "202|22"));
     }
 
     @Test
@@ -87,7 +87,7 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(
+                .isEqualTo(
                         Arrays.asList(
                                 "2|21|201",
                                 // this record is in the same file with "delete 2|21|201"
@@ -102,9 +102,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         List<Split> splits = table.newScan().withIncremental(true).plan().splits;
         TableRead read = table.newRead().withIncremental(true);
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("+1|11|101", "-1|10|100"));
+                .isEqualTo(Arrays.asList("-1|10|100", "+1|11|101"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("+2|22|202", "-2|21|201"));
+                .isEqualTo(Arrays.asList("-2|21|201", "-2|21|201", "+2|22|202"));
     }
 
     @Test
@@ -115,9 +115,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         List<Split> splits = table.newScan().withIncremental(true).plan().splits;
         TableRead read = table.newRead().withIncremental(true).withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_PROJECTED_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("+101|11", "-100|10"));
+                .isEqualTo(Arrays.asList("-100|10", "+101|11"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_PROJECTED_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("+202|22", "-201|21"));
+                .isEqualTo(Arrays.asList("-201|21", "-201|21", "+202|22"));
     }
 
     @Test
@@ -133,8 +133,9 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         TableRead read = table.newRead().withIncremental(true);
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
-                .hasSameElementsAs(
+                .isEqualTo(
                         Arrays.asList(
+                                "-2|21|201",
                                 "-2|21|201",
                                 // this record is in the same file with "delete 2|21|201"
                                 "+2|22|202"));
@@ -149,6 +150,7 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         write.write(GenericRowData.of(1, 11, 101L));
         table.newCommit().commit("0", write.prepareCommit());
 
+        write.write(GenericRowData.of(2, 21, 201L));
         write.write(GenericRowData.of(1, 12, 102L));
         write.write(GenericRowData.of(2, 21, 201L));
         write.write(GenericRowData.of(2, 21, 201L));
@@ -156,6 +158,7 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
 
         write.write(GenericRowData.of(1, 11, 101L));
         write.write(GenericRowData.of(2, 22, 202L));
+        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
         table.newCommit().commit("2", write.prepareCommit());
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index b5da3cd4..08463278 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -57,9 +57,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Collections.singletonList("1|10|1000"));
+                .isEqualTo(Collections.singletonList("1|10|1000"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("2|21|20001", "2|22|202"));
+                .isEqualTo(Arrays.asList("2|21|20001", "2|22|202"));
     }
 
     @Test
@@ -70,9 +70,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().plan().splits;
         TableRead read = table.newRead().withProjection(PROJECTION);
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_PROJECTED_ROW_TO_STRING))
-                .hasSameElementsAs(Collections.singletonList("1000|10"));
+                .isEqualTo(Collections.singletonList("1000|10"));
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_PROJECTED_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("20001|21", "202|22"));
+                .isEqualTo(Arrays.asList("20001|21", "202|22"));
     }
 
     @Test
@@ -91,7 +91,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         TableRead read = table.newRead();
         assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING))
-                .hasSameElementsAs(
+                .isEqualTo(
                         Arrays.asList(
                                 // only filter on key should be performed,
                                 // and records from the same file should also be selected
@@ -106,9 +106,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         List<Split> splits = table.newScan().withIncremental(true).plan().splits;
         TableRead read = table.newRead().withIncremental(true);
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
-                .hasSameElementsAs(Collections.singletonList("-1|11|1001"));
+                .isEqualTo(Collections.singletonList("-1|11|1001"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("+2|21|20001", "+2|22|202", "-2|20|200"));
+                .isEqualTo(Arrays.asList("-2|20|200", "+2|21|20001", "+2|22|202"));
     }
 
     @Test
@@ -120,9 +120,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         TableRead read = table.newRead().withIncremental(true).withProjection(PROJECTION);
 
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_PROJECTED_ROW_TO_STRING))
-                .hasSameElementsAs(Collections.singletonList("-1001|11"));
+                .isEqualTo(Collections.singletonList("-1001|11"));
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_PROJECTED_ROW_TO_STRING))
-                .hasSameElementsAs(Arrays.asList("+20001|21", "+202|22", "-200|20"));
+                .isEqualTo(Arrays.asList("-200|20", "+20001|21", "+202|22"));
     }
 
     @Test
@@ -142,11 +142,11 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         TableRead read = table.newRead().withIncremental(true);
         assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)).isEmpty();
         assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING))
-                .hasSameElementsAs(
+                .isEqualTo(
                         Arrays.asList(
                                 // only filter on key should be performed,
                                 // and records from the same file should also be selected
-                                "+2|21|20001", "+2|22|202", "-2|20|200"));
+                                "-2|20|200", "+2|21|20001", "+2|22|202"));
     }
 
     private void writeData() throws Exception {
diff --git a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
index 53a031c5..75ba824c 100644
--- a/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
+++ b/flink-table-store-format/src/test/java/org/apache/flink/table/store/format/FileFormatSuffixTest.java
@@ -22,14 +22,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.binary.BinaryRowDataUtil;
-import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.AppendOnlyWriter;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
 import org.apache.flink.table.store.file.data.DataFileTest;
 import org.apache.flink.table.store.file.data.DataFileWriter;
 import org.apache.flink.table.store.file.format.FileFormat;
-import org.apache.flink.table.store.file.writer.AppendOnlyWriter;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -68,8 +66,6 @@ public class FileFormatSuffixTest extends DataFileTest {
         AppendOnlyWriter appendOnlyWriter =
                 new AppendOnlyWriter(0, fileFormat, 10, SCHEMA, 10, dataFilePathFactory);
         appendOnlyWriter.write(
-                ValueKind.ADD,
-                BinaryRowDataUtil.EMPTY_ROW,
                 GenericRowData.of(1, StringData.fromString("aaa"), StringData.fromString("1")));
         List<DataFileMeta> result = appendOnlyWriter.close();