You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/28 03:01:04 UTC

[flink-table-store] branch master updated: [FLINK-28257] Replace ValueKind to RowKind in table store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 515d1f99 [FLINK-28257] Replace ValueKind to RowKind in table store
515d1f99 is described below

commit 515d1f99b27b1ec9569c025ca59d91bdf8545b54
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Tue Jun 28 11:00:59 2022 +0800

    [FLINK-28257] Replace ValueKind to RowKind in table store
    
    This closes #176
---
 .../store/connector/AlterTableCompactITCase.java   |  5 ++--
 .../table/store/connector/sink/StoreSinkTest.java  | 28 +++++++++++-----------
 .../store/connector/sink/TestFileStoreTable.java   |  6 ++---
 .../source/FileStoreSourceSplitGeneratorTest.java  |  4 ++--
 .../source/FileStoreSourceSplitReaderTest.java     |  8 ++-----
 .../source/TestChangelogDataReadWrite.java         |  4 ++--
 .../apache/flink/table/store/file/KeyValue.java    |  9 +++----
 .../flink/table/store/file/KeyValueSerializer.java |  5 ++--
 .../{ValueKind.java => manifest/FileKind.java}     | 10 ++++----
 .../table/store/file/manifest/ManifestEntry.java   |  7 +++---
 .../file/manifest/ManifestEntrySerializer.java     |  3 +--
 .../flink/table/store/file/mergetree/MemTable.java |  4 ++--
 .../store/file/mergetree/MergeTreeReader.java      |  5 ++--
 .../store/file/mergetree/SortBufferMemTable.java   |  4 ++--
 .../store/file/operation/FileStoreCommitImpl.java  | 20 ++++++++--------
 .../table/store/file/operation/FileStoreScan.java  |  4 ++--
 .../table/ChangelogValueCountFileStoreTable.java   |  9 +++----
 .../table/ChangelogWithKeyFileStoreTable.java      | 21 ++++------------
 .../source/ValueContentRowDataRecordIterator.java  |  6 +----
 .../flink/table/store/file/TestFileStore.java      |  4 +++-
 .../table/store/file/TestKeyValueGenerator.java    |  9 +++----
 .../store/file/manifest/ManifestFileMetaTest.java  |  6 +----
 .../file/manifest/ManifestTestDataGenerator.java   | 11 ++++-----
 .../table/store/file/mergetree/MergeTreeTest.java  | 12 +++++-----
 .../mergetree/compact/MergeFunctionTestUtils.java  |  6 ++---
 .../store/file/operation/FileStoreCommitTest.java  |  4 ++--
 .../file/operation/KeyValueFileStoreReadTest.java  |  4 ++--
 .../table/store/file/utils/ReusingTestData.java    | 12 +++++-----
 28 files changed, 105 insertions(+), 125 deletions(-)

diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
index a6cf81a8..9d2708a0 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
@@ -237,10 +236,10 @@ public class AlterTableCompactITCase extends FileStoreTableITCase {
         List<KeyValue> data = new ArrayList<>();
         for (int i = 0; i < numRecords; i++) {
             KeyValue kv = generator.next();
-            if (kv.valueKind() == ValueKind.ADD) {
+            if (kv.valueKind() == RowKind.INSERT) {
                 data.add(kv);
             } else {
-                data.add(kv.replace(kv.key(), ValueKind.ADD, kv.value()));
+                data.add(kv.replace(kv.key(), RowKind.INSERT, kv.value()));
             }
         }
         return data;
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index d7f86661..d572de26 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -134,7 +134,7 @@ public class StoreSinkTest {
         assertThat(fileStore.committedFiles.get(row(0)).get(0))
                 .isEqualTo(Collections.singletonList("DELETE-key-2-value-0/2/3"));
         assertThat(fileStore.committedFiles.get(row(0)).get(1))
-                .isEqualTo(Arrays.asList("ADD-key-0-value-0/0/1", "ADD-key-7-value-0/7/5"));
+                .isEqualTo(Arrays.asList("INSERT-key-0-value-0/0/1", "INSERT-key-7-value-0/7/5"));
     }
 
     @Test
@@ -149,11 +149,11 @@ public class StoreSinkTest {
                 GenericRowData.ofKind(RowKind.UPDATE_AFTER, 0, 4, 5),
                 GenericRowData.ofKind(RowKind.DELETE, 1, 0, 1));
         assertThat(fileStore.committedFiles.get(row(1)).get(0))
-                .isEqualTo(Collections.singletonList("ADD-key-1/0/1-value--1"));
+                .isEqualTo(Collections.singletonList("INSERT-key-1/0/1-value--1"));
         assertThat(fileStore.committedFiles.get(row(0)).get(0))
-                .isEqualTo(Collections.singletonList("ADD-key-0/4/5-value-1"));
+                .isEqualTo(Collections.singletonList("INSERT-key-0/4/5-value-1"));
         assertThat(fileStore.committedFiles.get(row(0)).get(1))
-                .isEqualTo(Arrays.asList("ADD-key-0/0/1-value-1", "ADD-key-0/2/3-value--1"));
+                .isEqualTo(Arrays.asList("INSERT-key-0/0/1-value-1", "INSERT-key-0/2/3-value--1"));
     }
 
     @Test
@@ -164,9 +164,9 @@ public class StoreSinkTest {
 
         writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
         assertThat(fileStore.committedFiles.get(row(1)).get(0))
-                .isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
+                .isEqualTo(Collections.singletonList("INSERT-key-10-value-1/10/11"));
         assertThat(fileStore.committedFiles.get(row(0)).get(0))
-                .isEqualTo(Arrays.asList("ADD-key-2-value-0/2/3", "ADD-key-8-value-0/8/9"));
+                .isEqualTo(Arrays.asList("INSERT-key-2-value-0/2/3", "INSERT-key-8-value-0/8/9"));
     }
 
     @Test
@@ -178,9 +178,9 @@ public class StoreSinkTest {
         writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
         assertThat(fileStore.committedFiles.get(row(1)).get(1)).isNull();
         assertThat(fileStore.committedFiles.get(row(1)).get(0))
-                .isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
+                .isEqualTo(Collections.singletonList("INSERT-key-10-value-1/10/11"));
         assertThat(fileStore.committedFiles.get(row(0)).get(0))
-                .isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
+                .isEqualTo(Collections.singletonList("INSERT-key-8-value-0/8/9"));
     }
 
     @Test
@@ -193,11 +193,11 @@ public class StoreSinkTest {
 
         writeAndCommit(sink, GenericRowData.of(0, 8, 9), GenericRowData.of(1, 10, 11));
         assertThat(fileStore.committedFiles.get(row(1)).get(1))
-                .isEqualTo(Collections.singletonList("ADD-key-0-value-1/0/1"));
+                .isEqualTo(Collections.singletonList("INSERT-key-0-value-1/0/1"));
         assertThat(fileStore.committedFiles.get(row(1)).get(0))
-                .isEqualTo(Collections.singletonList("ADD-key-10-value-1/10/11"));
+                .isEqualTo(Collections.singletonList("INSERT-key-10-value-1/10/11"));
         assertThat(fileStore.committedFiles.get(row(0)).get(0))
-                .isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
+                .isEqualTo(Collections.singletonList("INSERT-key-8-value-0/8/9"));
     }
 
     private void writeAndAssert(StoreSink<?, ?> sink) throws Exception {
@@ -208,11 +208,11 @@ public class StoreSinkTest {
                 GenericRowData.of(0, 7, 5),
                 GenericRowData.of(1, 0, 1));
         assertThat(fileStore.committedFiles.get(row(1)).get(1))
-                .isEqualTo(Collections.singletonList("ADD-key-0-value-1/0/1"));
+                .isEqualTo(Collections.singletonList("INSERT-key-0-value-1/0/1"));
         assertThat(fileStore.committedFiles.get(row(0)).get(0))
-                .isEqualTo(Collections.singletonList("ADD-key-2-value-0/2/3"));
+                .isEqualTo(Collections.singletonList("INSERT-key-2-value-0/2/3"));
         assertThat(fileStore.committedFiles.get(row(0)).get(1))
-                .isEqualTo(Arrays.asList("ADD-key-0-value-0/0/1", "ADD-key-7-value-0/7/5"));
+                .isEqualTo(Arrays.asList("INSERT-key-0-value-0/0/1", "INSERT-key-7-value-0/7/5"));
     }
 
     private void writeAndCommit(StoreSink<?, ?> sink, RowData... rows) throws Exception {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
index fb2727a8..2bd7e41a 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.connector.sink;
 
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.RecordWriter;
@@ -85,10 +84,11 @@ public class TestFileStoreTable implements FileStoreTable {
                 if (store.hasPk) {
                     kv.replace(
                             record.primaryKey(),
-                            isInsert ? ValueKind.ADD : ValueKind.DELETE,
+                            isInsert ? RowKind.INSERT : RowKind.DELETE,
                             record.row());
                 } else {
-                    kv.replace(record.row(), ValueKind.ADD, GenericRowData.of(isInsert ? 1L : -1L));
+                    kv.replace(
+                            record.row(), RowKind.INSERT, GenericRowData.of(isInsert ? 1L : -1L));
                 }
                 writer.write(kv);
             }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index 76331ef4..1b20a9df 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.manifest.FileKind;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
@@ -115,7 +115,7 @@ public class FileStoreSourceSplitGeneratorTest {
 
     private ManifestEntry makeEntry(int partition, int bucket, String fileName) {
         return new ManifestEntry(
-                ValueKind.ADD,
+                FileKind.ADD,
                 row(partition), // not used
                 bucket, // not used
                 0, // not used
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index 9e3e17ae..b5d39f2e 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.connector.file.src.util.RecordAndPosition;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.types.RowKind;
@@ -142,15 +141,12 @@ public class FileStoreSourceSplitReaderTest {
                     new KeyValue()
                             .replace(
                                     GenericRowData.of(tuple2.f0),
-                                    ValueKind.ADD,
+                                    RowKind.INSERT,
                                     GenericRowData.of(tuple2.f1)));
         }
         writer.write(
                 new KeyValue()
-                        .replace(
-                                GenericRowData.of(222L),
-                                ValueKind.DELETE,
-                                GenericRowData.of(333L)));
+                        .replace(GenericRowData.of(222L), RowKind.DELETE, GenericRowData.of(333L)));
         List<DataFileMeta> files = writer.prepareCommit().newFiles();
         writer.close();
 
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 1f32f9bb..66c66dd0 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
@@ -47,6 +46,7 @@ import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
@@ -141,7 +141,7 @@ public class TestChangelogDataReadWrite {
                     new KeyValue()
                             .replace(
                                     GenericRowData.of(tuple2.f0),
-                                    ValueKind.ADD,
+                                    RowKind.INSERT,
                                     GenericRowData.of(tuple2.f1)));
         }
         List<DataFileMeta> files = writer.prepareCommit().newFiles();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index a5baa708..1978220c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.types.RowKind;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -38,7 +39,7 @@ public class KeyValue {
 
     private RowData key;
     private long sequenceNumber;
-    private ValueKind valueKind;
+    private RowKind valueKind;
     private RowData value;
 
     public KeyValue setValue(RowData value) {
@@ -46,11 +47,11 @@ public class KeyValue {
         return this;
     }
 
-    public KeyValue replace(RowData key, ValueKind valueKind, RowData value) {
+    public KeyValue replace(RowData key, RowKind valueKind, RowData value) {
         return replace(key, -1, valueKind, value);
     }
 
-    public KeyValue replace(RowData key, long sequenceNumber, ValueKind valueKind, RowData value) {
+    public KeyValue replace(RowData key, long sequenceNumber, RowKind valueKind, RowData value) {
         this.key = key;
         this.sequenceNumber = sequenceNumber;
         this.valueKind = valueKind;
@@ -66,7 +67,7 @@ public class KeyValue {
         return sequenceNumber;
     }
 
-    public ValueKind valueKind() {
+    public RowKind valueKind() {
         return valueKind;
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueSerializer.java
index 3719e708..fadcd236 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueSerializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.data.utils.JoinedRowData;
 import org.apache.flink.table.store.file.utils.ObjectSerializer;
 import org.apache.flink.table.store.file.utils.OffsetRowData;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
 
 /**
  * Serializer for {@link KeyValue}.
@@ -64,7 +65,7 @@ public class KeyValueSerializer extends ObjectSerializer<KeyValue> {
         return toRow(record.key(), record.sequenceNumber(), record.valueKind(), record.value());
     }
 
-    public RowData toRow(RowData key, long sequenceNumber, ValueKind valueKind, RowData value) {
+    public RowData toRow(RowData key, long sequenceNumber, RowKind valueKind, RowData value) {
         reusedMeta.setField(0, sequenceNumber);
         reusedMeta.setField(1, valueKind.toByteValue());
         return reusedRow.replace(reusedKeyWithMeta.replace(key, reusedMeta), value);
@@ -75,7 +76,7 @@ public class KeyValueSerializer extends ObjectSerializer<KeyValue> {
         reusedKey.replace(row);
         reusedValue.replace(row);
         long sequenceNumber = row.getLong(keyArity);
-        ValueKind valueKind = ValueKind.fromByteValue(row.getByte(keyArity + 1));
+        RowKind valueKind = RowKind.fromByteValue(row.getByte(keyArity + 1));
         reusedKv.replace(reusedKey, sequenceNumber, valueKind, reusedValue);
         return reusedKv;
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/ValueKind.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/FileKind.java
similarity index 87%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/ValueKind.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/FileKind.java
index 685d3cb2..c393ad55 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/ValueKind.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/FileKind.java
@@ -16,17 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file;
+package org.apache.flink.table.store.file.manifest;
 
-/** Value kind of a record. */
-public enum ValueKind {
+/** Kind of a file. */
+public enum FileKind {
     ADD((byte) 0),
 
     DELETE((byte) 1);
 
     private final byte value;
 
-    ValueKind(byte value) {
+    FileKind(byte value) {
         this.value = value;
     }
 
@@ -34,7 +34,7 @@ public enum ValueKind {
         return value;
     }
 
-    public static ValueKind fromByteValue(byte value) {
+    public static FileKind fromByteValue(byte value) {
         switch (value) {
             case 0:
                 return ADD;
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
index c857df6e..9fcc0512 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.file.manifest;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -34,7 +33,7 @@ import static org.apache.flink.table.store.file.utils.SerializationUtils.newByte
 /** Entry of a manifest file, representing an addition / deletion of a data file. */
 public class ManifestEntry {
 
-    private final ValueKind kind;
+    private final FileKind kind;
     // for tables without partition this field should be a row with 0 columns (not null)
     private final BinaryRowData partition;
     private final int bucket;
@@ -42,7 +41,7 @@ public class ManifestEntry {
     private final DataFileMeta file;
 
     public ManifestEntry(
-            ValueKind kind,
+            FileKind kind,
             BinaryRowData partition,
             int bucket,
             int totalBuckets,
@@ -54,7 +53,7 @@ public class ManifestEntry {
         this.file = file;
     }
 
-    public ValueKind kind() {
+    public FileKind kind() {
         return kind;
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
index 2f5874c6..f1e5a423 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntrySerializer.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.file.manifest;
 
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMetaSerializer;
 import org.apache.flink.table.store.file.utils.VersionedObjectSerializer;
 
@@ -67,7 +66,7 @@ public class ManifestEntrySerializer extends VersionedObjectSerializer<ManifestE
             throw new IllegalArgumentException("Unsupported version: " + version);
         }
         return new ManifestEntry(
-                ValueKind.fromByteValue(row.getByte(0)),
+                FileKind.fromByteValue(row.getByte(0)),
                 deserializeBinaryRow(row.getBinary(1)),
                 row.getInt(2),
                 row.getInt(3),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
index 944516cd..bca182ed 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.store.file.mergetree;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
 import java.util.Comparator;
@@ -38,7 +38,7 @@ public interface MemTable {
      *
      * @return True, if the record was successfully written, false, if the mem table was full.
      */
-    boolean put(long sequenceNumber, ValueKind valueKind, RowData key, RowData value)
+    boolean put(long sequenceNumber, RowKind valueKind, RowData key, RowData value)
             throws IOException;
 
     /** Record size of this table. */
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
index fbf17b83..da39d0a3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeReader.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.store.file.mergetree;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileReader;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
@@ -28,6 +27,7 @@ import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.Re
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.SortMergeReader;
 import org.apache.flink.table.store.file.utils.RecordReader;
+import org.apache.flink.types.RowKind;
 
 import javax.annotation.Nullable;
 
@@ -84,7 +84,8 @@ public class MergeTreeReader implements RecordReader<KeyValue> {
                         return null;
                     }
 
-                    if (kv.valueKind() == ValueKind.ADD) {
+                    if (kv.valueKind() == RowKind.INSERT
+                            || kv.valueKind() == RowKind.UPDATE_AFTER) {
                         return kv;
                     }
                 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
index 9258d2b6..4dafb52f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
@@ -31,11 +31,11 @@ import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.table.store.codegen.CodeGenUtils;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializer;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.MutableObjectIterator;
 
 import java.io.IOException;
@@ -83,7 +83,7 @@ public class SortBufferMemTable implements MemTable {
     }
 
     @Override
-    public boolean put(long sequenceNumber, ValueKind valueKind, RowData key, RowData value)
+    public boolean put(long sequenceNumber, RowKind valueKind, RowData key, RowData value)
             throws IOException {
         return buffer.write(serializer.toRow(key, sequenceNumber, valueKind, value));
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index a2fef4fc..f6a1404e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -23,8 +23,8 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.Snapshot;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.manifest.FileKind;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
@@ -166,7 +166,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             LOG.debug("Ready to commit\n" + committable.toString());
         }
 
-        List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
+        List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), FileKind.ADD);
         if (!appendChanges.isEmpty()) {
             tryCommit(
                     appendChanges,
@@ -177,8 +177,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         }
 
         List<ManifestEntry> compactChanges = new ArrayList<>();
-        compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
-        compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
+        compactChanges.addAll(collectChanges(committable.compactBefore(), FileKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), FileKind.ADD));
         if (!compactChanges.isEmpty()) {
             tryCommit(
                     compactChanges,
@@ -202,7 +202,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                             + committable.toString());
         }
 
-        List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), ValueKind.ADD);
+        List<ManifestEntry> appendChanges = collectChanges(committable.newFiles(), FileKind.ADD);
         // sanity check, all changes must be done within the given partition
         Predicate partitionFilter = PredicateConverter.fromMap(partition, partitionType);
         if (partitionFilter != null) {
@@ -222,8 +222,8 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                 partitionFilter, appendChanges, committable.identifier(), committable.logOffsets());
 
         List<ManifestEntry> compactChanges = new ArrayList<>();
-        compactChanges.addAll(collectChanges(committable.compactBefore(), ValueKind.DELETE));
-        compactChanges.addAll(collectChanges(committable.compactAfter(), ValueKind.ADD));
+        compactChanges.addAll(collectChanges(committable.compactBefore(), FileKind.DELETE));
+        compactChanges.addAll(collectChanges(committable.compactAfter(), FileKind.ADD));
         if (!compactChanges.isEmpty()) {
             tryCommit(
                     compactChanges,
@@ -267,7 +267,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
                 for (ManifestEntry entry : currentEntries) {
                     changesWithOverwrite.add(
                             new ManifestEntry(
-                                    ValueKind.DELETE,
+                                    FileKind.DELETE,
                                     entry.partition(),
                                     entry.bucket(),
                                     entry.totalBuckets(),
@@ -289,7 +289,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
     }
 
     private List<ManifestEntry> collectChanges(
-            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> map, ValueKind kind) {
+            Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> map, FileKind kind) {
         List<ManifestEntry> changes = new ArrayList<>();
         for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> entryWithPartition :
                 map.entrySet()) {
@@ -466,7 +466,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
     private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
         Set<ManifestEntry.Identifier> removedFiles =
                 changes.stream()
-                        .filter(e -> e.kind().equals(ValueKind.DELETE))
+                        .filter(e -> e.kind().equals(FileKind.DELETE))
                         .map(ManifestEntry::identifier)
                         .collect(Collectors.toSet());
         if (removedFiles.isEmpty()) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
index 3d21746b..307c7503 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScan.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.manifest.FileKind;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.predicate.Predicate;
@@ -70,7 +70,7 @@ public interface FileStoreScan {
             List<ManifestEntry> files = files();
             Map<BinaryRowData, Map<Integer, List<DataFileMeta>>> groupBy = new HashMap<>();
             for (ManifestEntry entry : files) {
-                checkArgument(entry.kind() == ValueKind.ADD);
+                checkArgument(entry.kind() == FileKind.ADD);
                 groupBy.computeIfAbsent(entry.partition(), k -> new HashMap<>())
                         .computeIfAbsent(entry.bucket(), k -> new ArrayList<>())
                         .add(entry.file());
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 842258fd..5319c7b5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueFileStore;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
@@ -47,6 +46,7 @@ import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
 
 /** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode without primary keys. */
 public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
@@ -130,18 +130,19 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
                 new SinkRecordConverter(store.options().bucket(), tableSchema);
         return new MemoryTableWrite<KeyValue>(store.newWrite(), recordConverter, store.options()) {
 
+            private final KeyValue kv = new KeyValue();
+
             @Override
             protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
                     throws Exception {
-                KeyValue kv = new KeyValue();
                 switch (record.row().getRowKind()) {
                     case INSERT:
                     case UPDATE_AFTER:
-                        kv.replace(record.row(), ValueKind.ADD, GenericRowData.of(1L));
+                        kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(1L));
                         break;
                     case UPDATE_BEFORE:
                     case DELETE:
-                        kv.replace(record.row(), ValueKind.ADD, GenericRowData.of(-1L));
+                        kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(-1L));
                         break;
                     default:
                         throw new UnsupportedOperationException(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 337c3c9d..bb26efa0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueFileStore;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
@@ -178,24 +177,14 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
         SinkRecordConverter recordConverter =
                 new SinkRecordConverter(store.options().bucket(), tableSchema);
         return new MemoryTableWrite<KeyValue>(store.newWrite(), recordConverter, store.options()) {
+
+            private final KeyValue kv = new KeyValue();
+
             @Override
             protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
                     throws Exception {
-                KeyValue kv = new KeyValue();
-                switch (record.row().getRowKind()) {
-                    case INSERT:
-                    case UPDATE_AFTER:
-                        kv.replace(record.primaryKey(), ValueKind.ADD, record.row());
-                        break;
-                    case UPDATE_BEFORE:
-                    case DELETE:
-                        kv.replace(record.primaryKey(), ValueKind.DELETE, record.row());
-                        break;
-                    default:
-                        throw new UnsupportedOperationException(
-                                "Unknown row kind " + record.row().getRowKind());
-                }
-                writer.write(kv);
+                writer.write(
+                        kv.replace(record.primaryKey(), record.row().getRowKind(), record.row()));
             }
         };
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java
index 716f4838..3f442201 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ValueContentRowDataRecordIterator.java
@@ -20,9 +20,7 @@ package org.apache.flink.table.store.table.source;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.utils.RecordReader;
-import org.apache.flink.types.RowKind;
 
 import java.io.IOException;
 
@@ -41,9 +39,7 @@ public class ValueContentRowDataRecordIterator extends ResetRowKindRecordIterato
         }
 
         RowData rowData = kv.value();
-        if (kv.valueKind() == ValueKind.DELETE) {
-            rowData.setRowKind(RowKind.DELETE);
-        }
+        rowData.setRowKind(kv.valueKind());
         return rowData;
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 1c4ff107..bf3fce36 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -309,9 +309,11 @@ public class TestFileStore extends KeyValueFileStore {
             BinaryRowData key = keySerializer.toBinaryRow(kv.key()).copy();
             BinaryRowData value = valueSerializer.toBinaryRow(kv.value()).copy();
             switch (kv.valueKind()) {
-                case ADD:
+                case INSERT:
+                case UPDATE_AFTER:
                     result.put(key, value);
                     break;
+                case UPDATE_BEFORE:
                 case DELETE:
                     result.remove(key);
                     break;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
index 29dc549d..90a02764 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
 
 import javax.annotation.Nullable;
 
@@ -169,12 +170,12 @@ public class TestKeyValueGenerator {
     public KeyValue next() {
         int op = random.nextInt(5);
         Order order = null;
-        ValueKind kind = ValueKind.ADD;
+        RowKind kind = RowKind.INSERT;
         if (op == 0 && addedOrders.size() > 0) {
             // delete order
             order = pick(addedOrders);
             deletedOrders.add(order);
-            kind = ValueKind.DELETE;
+            kind = RowKind.DELETE;
         } else if (op == 1) {
             // update order
             if (random.nextBoolean() && deletedOrders.size() > 0) {
@@ -185,14 +186,14 @@ public class TestKeyValueGenerator {
             if (order != null) {
                 order.update();
                 addedOrders.add(order);
-                kind = ValueKind.ADD;
+                kind = RowKind.INSERT;
             }
         }
         if (order == null) {
             // new order
             order = new Order();
             addedOrders.add(order);
-            kind = ValueKind.ADD;
+            kind = RowKind.INSERT;
         }
         return new KeyValue()
                 .replace(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
index 32b0051e..cf89de9b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -24,14 +24,12 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
 import org.apache.flink.table.store.file.FileStoreOptions;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -56,8 +54,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class ManifestFileMetaTest {
 
     private static final RowType PARTITION_TYPE = RowType.of(new IntType());
-    private static final RowType KEY_TYPE = RowType.of(new IntType());
-    private static final RowType ROW_TYPE = RowType.of(new BigIntType());
 
     private final FileFormat avro;
 
@@ -236,7 +232,7 @@ public class ManifestFileMetaTest {
         writer.complete();
 
         return new ManifestEntry(
-                isAdd ? ValueKind.ADD : ValueKind.DELETE,
+                isAdd ? FileKind.ADD : FileKind.DELETE,
                 binaryRowData, // not used
                 0, // not used
                 0, // not used
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
index 11fc3813..2eff6284 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestTestDataGenerator.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.store.file.manifest;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileTestDataGenerator;
 import org.apache.flink.table.store.file.stats.FieldStatsCollector;
 import org.apache.flink.util.Preconditions;
@@ -72,7 +71,7 @@ public class ManifestTestDataGenerator {
         level.add(file);
         bufferedResults.push(
                 new ManifestEntry(
-                        ValueKind.ADD, file.partition, file.bucket, numBuckets, file.meta));
+                        FileKind.ADD, file.partition, file.bucket, numBuckets, file.meta));
         mergeLevelsIfNeeded(file.partition, file.bucket);
 
         return bufferedResults.poll();
@@ -89,7 +88,7 @@ public class ManifestTestDataGenerator {
         long numDeletedFiles = 0;
         for (ManifestEntry entry : entries) {
             collector.collect(entry.partition());
-            if (entry.kind() == ValueKind.ADD) {
+            if (entry.kind() == FileKind.ADD) {
                 numAddedFiles++;
             } else {
                 numDeletedFiles++;
@@ -121,7 +120,7 @@ public class ManifestTestDataGenerator {
             for (DataFileTestDataGenerator.Data file : currentLevel) {
                 bufferedResults.push(
                         new ManifestEntry(
-                                ValueKind.DELETE, partition, bucket, numBuckets, file.meta));
+                                FileKind.DELETE, partition, bucket, numBuckets, file.meta));
                 kvs.addAll(file.content);
             }
             currentLevel.clear();
@@ -129,7 +128,7 @@ public class ManifestTestDataGenerator {
             for (DataFileTestDataGenerator.Data file : nextLevel) {
                 bufferedResults.push(
                         new ManifestEntry(
-                                ValueKind.DELETE, partition, bucket, numBuckets, file.meta));
+                                FileKind.DELETE, partition, bucket, numBuckets, file.meta));
                 kvs.addAll(file.content);
             }
             nextLevel.clear();
@@ -140,7 +139,7 @@ public class ManifestTestDataGenerator {
             nextLevel.addAll(merged);
             for (DataFileTestDataGenerator.Data file : nextLevel) {
                 bufferedResults.push(
-                        new ManifestEntry(ValueKind.ADD, partition, bucket, numBuckets, file.meta));
+                        new ManifestEntry(FileKind.ADD, partition, bucket, numBuckets, file.meta));
             }
 
             lastModifiedLevel += 1;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 72f58376..69ca3a6f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowDataUtil;
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileReader;
 import org.apache.flink.table.store.file.data.DataFileWriter;
@@ -46,6 +45,7 @@ import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -199,7 +199,7 @@ public class MergeTreeTest {
             for (int j = 0; j < perBatch; j++) {
                 records.add(
                         new TestRecord(
-                                random.nextBoolean() ? ValueKind.ADD : ValueKind.DELETE,
+                                random.nextBoolean() ? RowKind.INSERT : RowKind.DELETE,
                                 random.nextInt(perBatch / 2) - i * (perBatch / 2),
                                 random.nextInt()));
             }
@@ -351,7 +351,7 @@ public class MergeTreeTest {
         }
         if (dropDelete) {
             return map.values().stream()
-                    .filter(record -> record.kind == ValueKind.ADD)
+                    .filter(record -> record.kind == RowKind.INSERT)
                     .collect(Collectors.toList());
         }
         return new ArrayList<>(map.values());
@@ -394,7 +394,7 @@ public class MergeTreeTest {
         for (int i = 0; i < perBatch; i++) {
             records.add(
                     new TestRecord(
-                            random.nextBoolean() ? ValueKind.ADD : ValueKind.DELETE,
+                            random.nextBoolean() ? RowKind.INSERT : RowKind.DELETE,
                             random.nextInt(perBatch / 2),
                             random.nextInt()));
         }
@@ -403,11 +403,11 @@ public class MergeTreeTest {
 
     private static class TestRecord {
 
-        private final ValueKind kind;
+        private final RowKind kind;
         private final int k;
         private final int v;
 
-        private TestRecord(ValueKind kind, int k, int v) {
+        private TestRecord(RowKind kind, int k, int v) {
             this.kind = kind;
             this.k = k;
             this.v = v;
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
index 83dc92d3..4ff9b083 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeFunctionTestUtils.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.table.store.file.mergetree.compact;
 
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.utils.ReusingTestData;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
@@ -38,13 +38,13 @@ public class MergeFunctionTestUtils {
         for (int i = 0; i < input.size(); i++) {
             ReusingTestData data = input.get(i);
             Preconditions.checkArgument(
-                    data.valueKind == ValueKind.ADD,
+                    data.valueKind == RowKind.INSERT,
                     "Only ADD value kind is supported for value count merge function.");
             c += data.value;
             if (i + 1 >= input.size() || data.key != input.get(i + 1).key) {
                 if (c != 0) {
                     expected.add(
-                            new ReusingTestData(data.key, data.sequenceNumber, ValueKind.ADD, c));
+                            new ReusingTestData(data.key, data.sequenceNumber, RowKind.INSERT, c));
                 }
                 c = 0;
             }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index 5c07c31e..d75949cf 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -24,13 +24,13 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
 import org.apache.flink.table.store.file.utils.FileUtils;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+import org.apache.flink.types.RowKind;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -315,7 +315,7 @@ public class FileStoreCommitTest {
 
     private List<KeyValue> kvMapToKvList(Map<BinaryRowData, BinaryRowData> map) {
         return map.entrySet().stream()
-                .map(e -> new KeyValue().replace(e.getKey(), -1, ValueKind.ADD, e.getValue()))
+                .map(e -> new KeyValue().replace(e.getKey(), -1, RowKind.INSERT, e.getValue()))
                 .collect(Collectors.toList());
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
index af4f774c..30866470 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreReadTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
-import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
@@ -36,6 +35,7 @@ import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -75,7 +75,7 @@ public class KeyValueFileStoreReadTest {
                             .replace(
                                     GenericRowData.of(a, b, c),
                                     i,
-                                    ValueKind.ADD,
+                                    RowKind.INSERT,
                                     GenericRowData.of(delta)));
         }
         // remove zero occurrence, it might be merged and discarded by the merge tree
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
index 25581938..44c1555f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/ReusingTestData.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.file.utils;
 
 import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
@@ -39,10 +39,10 @@ public class ReusingTestData implements Comparable<ReusingTestData> {
 
     public final int key;
     public final long sequenceNumber;
-    public final ValueKind valueKind;
+    public final RowKind valueKind;
     public final long value;
 
-    public ReusingTestData(int key, long sequenceNumber, ValueKind valueKind, long value) {
+    public ReusingTestData(int key, long sequenceNumber, RowKind valueKind, long value) {
         this.key = key;
         this.sequenceNumber = sequenceNumber;
         this.valueKind = valueKind;
@@ -89,7 +89,7 @@ public class ReusingTestData implements Comparable<ReusingTestData> {
                     new ReusingTestData(
                             Integer.parseInt(split[0].trim()),
                             Long.parseLong(split[1].trim()),
-                            split[2].trim().equals("+") ? ValueKind.ADD : ValueKind.DELETE,
+                            split[2].trim().equals("+") ? RowKind.INSERT : RowKind.DELETE,
                             Long.parseLong(split[3].trim())));
         }
         return result;
@@ -104,7 +104,7 @@ public class ReusingTestData implements Comparable<ReusingTestData> {
                     new ReusingTestData(
                             random.nextInt(numRecords),
                             generator.next(),
-                            random.nextBoolean() || onlyAdd ? ValueKind.ADD : ValueKind.DELETE,
+                            random.nextBoolean() || onlyAdd ? RowKind.INSERT : RowKind.DELETE,
                             random.nextInt(10) - 5));
         }
         return result;
@@ -122,7 +122,7 @@ public class ReusingTestData implements Comparable<ReusingTestData> {
                     new ReusingTestData(
                             key,
                             generator.next(),
-                            random.nextBoolean() || onlyAdd ? ValueKind.ADD : ValueKind.DELETE,
+                            random.nextBoolean() || onlyAdd ? RowKind.INSERT : RowKind.DELETE,
                             random.nextInt(10) - 5));
         }
         return new ArrayList<>(result.values());