You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by cz...@apache.org on 2022/11/03 02:58:46 UTC

[flink-table-store] branch master updated: [FLINK-29842] Change commitIdentifier in Table Store snapshot to long value

This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 35062743 [FLINK-29842] Change commitIdentifier in Table Store snapshot to long value
35062743 is described below

commit 350627439a70b187f7270278b497841fa3f2c554
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Nov 3 10:58:40 2022 +0800

    [FLINK-29842] Change commitIdentifier in Table Store snapshot to long value
    
    This closes #346.
---
 .../table/store/connector/sink/StoreCommitter.java |  3 +-
 .../apache/flink/table/store/file/Snapshot.java    | 27 +++++--
 .../store/file/manifest/ManifestCommittable.java   |  8 +-
 .../manifest/ManifestCommittableSerializer.java    |  4 +-
 .../store/file/operation/FileStoreCommitImpl.java  | 10 +--
 .../table/store/table/metadata/SnapshotsTable.java |  5 +-
 .../flink/table/store/table/sink/TableCommit.java  |  4 +-
 .../flink/table/store/file/TestFileStore.java      | 10 ++-
 .../ManifestCommittableSerializerTest.java         |  3 +-
 .../store/file/operation/FileStoreCommitTest.java  |  4 +-
 .../store/file/operation/TestCommitThread.java     | 15 ++--
 .../store/table/AppendOnlyFileStoreTableTest.java  |  8 +-
 .../ChangelogValueCountFileStoreTableTest.java     |  8 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  | 91 +++++++++++++---------
 .../table/store/table/FileStoreTableTestBase.java  | 26 +++----
 .../table/store/table/SchemaEvolutionTest.java     |  4 +-
 .../table/store/table/WritePreemptMemoryTest.java  |  2 +-
 .../hive/TableStoreHiveStorageHandlerITCase.java   | 24 +++---
 .../store/mapred/TableStoreRecordReaderTest.java   |  6 +-
 .../table/store/spark/SimpleTableTestHelper.java   |  7 +-
 .../table/store/spark/SimpleTableTestHelper.java   |  7 +-
 21 files changed, 160 insertions(+), 116 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
index 9fd720a5..ae43cb69 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCommitter.java
@@ -42,8 +42,7 @@ public class StoreCommitter implements Committer {
 
     @Override
     public ManifestCommittable combine(long checkpointId, List<Committable> committables) {
-        ManifestCommittable manifestCommittable =
-                new ManifestCommittable(String.valueOf(checkpointId));
+        ManifestCommittable manifestCommittable = new ManifestCommittable(checkpointId);
         for (Committable committable : committables) {
             switch (committable.kind()) {
                 case FILE:
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
index 72f5273e..177c470a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -38,7 +38,7 @@ import java.util.Map;
 /**
  * This file is the entrance to all data committed at some specific time point.
  *
- * <p>Change list:
+ * <p>Versioned change list:
  *
  * <ul>
  *   <li>Version 1: Initial version for table store <= 0.2. There is no "version" field in json
@@ -46,6 +46,15 @@ import java.util.Map;
  *   <li>Version 2: Introduced in table store 0.3. Add "version" field and "changelogManifestList"
  *       field.
  * </ul>
+ *
+ * <p>Unversioned change list:
+ *
+ * <ul>
+ *   <li>Since table store 0.22 and table store 0.3, commitIdentifier is changed from a String to a
+ *       long value. For table store < 0.22, only Flink connectors have table store sink and they
+ *       use checkpointId as commitIdentifier (which is a long value). Json can automatically
+ *       perform type conversion so there is no compatibility issue.
+ * </ul>
  */
 public class Snapshot {
 
@@ -96,9 +105,15 @@ public class Snapshot {
     @JsonProperty(FIELD_COMMIT_USER)
     private final String commitUser;
 
-    // for deduplication
+    // Mainly for snapshot deduplication.
+    //
+    // If multiple snapshots have the same commitIdentifier, reading from any of these snapshots
+    // must produce the same table.
+    //
+    // If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must be
+    // committed before snapshot B, and thus snapshot A must contain older records than snapshot B.
     @JsonProperty(FIELD_COMMIT_IDENTIFIER)
-    private final String commitIdentifier;
+    private final long commitIdentifier;
 
     @JsonProperty(FIELD_COMMIT_KIND)
     private final CommitKind commitKind;
@@ -116,7 +131,7 @@ public class Snapshot {
             String deltaManifestList,
             @Nullable String changelogManifestList,
             String commitUser,
-            String commitIdentifier,
+            long commitIdentifier,
             CommitKind commitKind,
             long timeMillis,
             Map<Integer, Long> logOffsets) {
@@ -143,7 +158,7 @@ public class Snapshot {
             @JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
             @JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String changelogManifestList,
             @JsonProperty(FIELD_COMMIT_USER) String commitUser,
-            @JsonProperty(FIELD_COMMIT_IDENTIFIER) String commitIdentifier,
+            @JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
             @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
             @JsonProperty(FIELD_TIME_MILLIS) long timeMillis,
             @JsonProperty(FIELD_LOG_OFFSETS) Map<Integer, Long> logOffsets) {
@@ -198,7 +213,7 @@ public class Snapshot {
     }
 
     @JsonGetter(FIELD_COMMIT_IDENTIFIER)
-    public String commitIdentifier() {
+    public long commitIdentifier() {
         return commitIdentifier;
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
index ff8b5cc7..8cdbee10 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
@@ -29,18 +29,18 @@ import java.util.Objects;
 /** Manifest commit message. */
 public class ManifestCommittable {
 
-    private final String identifier;
+    private final long identifier;
     private final Map<Integer, Long> logOffsets;
     private final List<FileCommittable> fileCommittables;
 
-    public ManifestCommittable(String identifier) {
+    public ManifestCommittable(long identifier) {
         this.identifier = identifier;
         this.logOffsets = new HashMap<>();
         this.fileCommittables = new ArrayList<>();
     }
 
     public ManifestCommittable(
-            String identifier,
+            long identifier,
             Map<Integer, Long> logOffsets,
             List<FileCommittable> fileCommittables) {
         this.identifier = identifier;
@@ -61,7 +61,7 @@ public class ManifestCommittable {
         logOffsets.put(bucket, offset);
     }
 
-    public String identifier() {
+    public long identifier() {
         return identifier;
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
index f9615bf3..8049493a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializer.java
@@ -51,7 +51,7 @@ public class ManifestCommittableSerializer
     public byte[] serialize(ManifestCommittable obj) throws IOException {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out);
-        view.writeUTF(obj.identifier());
+        view.writeLong(obj.identifier());
         serializeOffsets(view, obj.logOffsets());
         view.writeInt(fileCommittableSerializer.getVersion());
         fileCommittableSerializer.serializeList(obj.fileCommittables(), view);
@@ -80,7 +80,7 @@ public class ManifestCommittableSerializer
         }
 
         DataInputDeserializer view = new DataInputDeserializer(serialized);
-        String identifier = view.readUTF();
+        long identifier = view.readLong();
         Map<Integer, Long> offsets = deserializeOffsets(view);
         int fileCommittableSerializerVersion = view.readInt();
         List<FileCommittable> fileCommittables =
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
index 0157b168..65fe00b7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -158,7 +158,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
         }
 
         // check if a committable is already committed by its identifier
-        Map<String, ManifestCommittable> identifiers = new LinkedHashMap<>();
+        Map<Long, ManifestCommittable> identifiers = new LinkedHashMap<>();
         for (ManifestCommittable committable : committableList) {
             identifiers.put(committable.identifier(), committable);
         }
@@ -373,7 +373,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
     private void tryCommit(
             List<ManifestEntry> tableFiles,
             List<ManifestEntry> changelogFiles,
-            String hash,
+            long identifier,
             Map<Integer, Long> logOffsets,
             Snapshot.CommitKind commitKind,
             Long safeLatestSnapshotId) {
@@ -382,7 +382,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
             if (tryCommitOnce(
                     tableFiles,
                     changelogFiles,
-                    hash,
+                    identifier,
                     logOffsets,
                     commitKind,
                     latestSnapshotId,
@@ -395,7 +395,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
     private void tryOverwrite(
             Predicate partitionFilter,
             List<ManifestEntry> changes,
-            String identifier,
+            long identifier,
             Map<Integer, Long> logOffsets) {
         while (true) {
             Long latestSnapshotId = snapshotManager.latestSnapshotId();
@@ -435,7 +435,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
     private boolean tryCommitOnce(
             List<ManifestEntry> tableFiles,
             List<ManifestEntry> changelogFiles,
-            String identifier,
+            long identifier,
             Map<Integer, Long> logOffsets,
             Snapshot.CommitKind commitKind,
             Long latestSnapshotId,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java
index b0d9b143..6eb0c6c1 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/metadata/SnapshotsTable.java
@@ -65,8 +65,7 @@ public class SnapshotsTable implements Table {
                             new RowType.RowField("schema_id", new BigIntType(false)),
                             new RowType.RowField(
                                     "commit_user", SerializationUtils.newStringType(false)),
-                            new RowType.RowField(
-                                    "commit_identifier", SerializationUtils.newStringType(false)),
+                            new RowType.RowField("commit_identifier", new BigIntType(false)),
                             new RowType.RowField(
                                     "commit_kind", SerializationUtils.newStringType(false)),
                             new RowType.RowField("commit_time", new TimestampType(false, 3))));
@@ -185,7 +184,7 @@ public class SnapshotsTable implements Table {
                     snapshot.id(),
                     snapshot.schemaId(),
                     StringData.fromString(snapshot.commitUser()),
-                    StringData.fromString(snapshot.commitIdentifier()),
+                    snapshot.commitIdentifier(),
                     StringData.fromString(snapshot.commitKind().toString()),
                     TimestampData.fromLocalDateTime(
                             LocalDateTime.ofInstant(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
index b44c0c42..c763cc1e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCommit.java
@@ -68,7 +68,7 @@ public class TableCommit implements AutoCloseable {
         return commit.filterCommitted(committables);
     }
 
-    public void commit(String identifier, List<FileCommittable> fileCommittables) {
+    public void commit(long identifier, List<FileCommittable> fileCommittables) {
         ManifestCommittable committable = new ManifestCommittable(identifier);
         for (FileCommittable fileCommittable : fileCommittables) {
             committable.addFileCommittable(fileCommittable);
@@ -93,7 +93,7 @@ public class TableCommit implements AutoCloseable {
                 // create an empty committable
                 // identifier is Long.MAX_VALUE, come from batch job
                 // TODO maybe it can be produced by CommitterOperator
-                committable = new ManifestCommittable(String.valueOf(Long.MAX_VALUE));
+                committable = new ManifestCommittable(Long.MAX_VALUE);
             }
             commit.overwrite(overwritePartition, committable, new HashMap<>());
         }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 0e471d9f..4ecf9664 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -59,7 +59,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -85,6 +84,8 @@ public class TestFileStore extends KeyValueFileStore {
     private final RowDataSerializer valueSerializer;
     private final String user;
 
+    private long commitIdentifier;
+
     private TestFileStore(
             String root,
             CoreOptions options,
@@ -105,6 +106,8 @@ public class TestFileStore extends KeyValueFileStore {
         this.keySerializer = new RowDataSerializer(keyType);
         this.valueSerializer = new RowDataSerializer(valueType);
         this.user = UUID.randomUUID().toString();
+
+        this.commitIdentifier = 0L;
     }
 
     public FileStoreCommitImpl newCommit() {
@@ -170,7 +173,7 @@ public class TestFileStore extends KeyValueFileStore {
             Function<KeyValue, BinaryRowData> partitionCalculator,
             Function<KeyValue, Integer> bucketCalculator,
             boolean emptyWriter,
-            String identifier,
+            Long identifier,
             BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
             throws Exception {
         AbstractFileStoreWrite<KeyValue> write = newWrite();
@@ -205,8 +208,7 @@ public class TestFileStore extends KeyValueFileStore {
 
         FileStoreCommit commit = newCommit(user);
         ManifestCommittable committable =
-                new ManifestCommittable(
-                        identifier == null ? String.valueOf(new Random().nextLong()) : identifier);
+                new ManifestCommittable(identifier == null ? commitIdentifier++ : identifier);
         for (Map.Entry<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> entryWithPartition :
                 writers.entrySet()) {
             for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket :
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
index 161cfc16..d40d368c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -56,8 +56,7 @@ public class ManifestCommittableSerializerTest {
     }
 
     public static ManifestCommittable create() {
-        ManifestCommittable committable =
-                new ManifestCommittable(String.valueOf(new Random().nextLong()));
+        ManifestCommittable committable = new ManifestCommittable(new Random().nextLong());
         addFileCommittables(committable, row(0), 0);
         addFileCommittables(committable, row(0), 1);
         addFileCommittables(committable, row(1), 0);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
index cb2ab9aa..987ae4ec 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTest.java
@@ -166,7 +166,7 @@ public class FileStoreCommitTest {
         FileUtils.deleteOrWarn(firstSnapshotPath);
         // this test succeeds if this call does not fail
         store.newCommit(UUID.randomUUID().toString())
-                .filterCommitted(Collections.singletonList(new ManifestCommittable("dummy")));
+                .filterCommitted(Collections.singletonList(new ManifestCommittable(999)));
     }
 
     protected void testRandomConcurrentNoConflict(
@@ -459,7 +459,7 @@ public class FileStoreCommitTest {
                     gen::getPartition,
                     kv -> 0,
                     false,
-                    String.valueOf(i),
+                    (long) i,
                     (commit, committable) -> {
                         commit.commit(committable, Collections.emptyMap());
                         committables.add(committable);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index eb3317b6..5349fa43 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -39,7 +39,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -67,6 +66,8 @@ public class TestCommitThread extends Thread {
     private final AbstractFileStoreWrite<KeyValue> write;
     private final FileStoreCommit commit;
 
+    private long commitIdentifier;
+
     public TestCommitThread(
             RowType keyType,
             RowType valueType,
@@ -93,6 +94,8 @@ public class TestCommitThread extends Thread {
 
         this.write = safeStore.newWrite();
         this.commit = testStore.newCommit(UUID.randomUUID().toString()).withCreateEmptyCommit(true);
+
+        this.commitIdentifier = 0;
     }
 
     public List<KeyValue> getResult() {
@@ -129,8 +132,7 @@ public class TestCommitThread extends Thread {
         for (int i = 0; i < numWrites && !data.isEmpty(); i++) {
             writeData();
         }
-        ManifestCommittable committable =
-                new ManifestCommittable(String.valueOf(new Random().nextLong()));
+        ManifestCommittable committable = new ManifestCommittable(commitIdentifier++);
         for (Map.Entry<BinaryRowData, MergeTreeWriter> entry : writers.entrySet()) {
             RecordWriter.CommitIncrement inc = entry.getValue().prepareCommit(true);
             committable.addFileCommittable(
@@ -143,8 +145,7 @@ public class TestCommitThread extends Thread {
 
     private void doOverwrite() throws Exception {
         BinaryRowData partition = overwriteData();
-        ManifestCommittable committable =
-                new ManifestCommittable(String.valueOf(new Random().nextLong()));
+        ManifestCommittable committable = new ManifestCommittable(commitIdentifier++);
         RecordWriter.CommitIncrement inc = writers.get(partition).prepareCommit(true);
         committable.addFileCommittable(
                 new FileCommittable(partition, 0, inc.newFilesIncrement(), inc.compactIncrement()));
@@ -159,10 +160,10 @@ public class TestCommitThread extends Thread {
     }
 
     private void doFinalCompact() {
+        long identifier = commitIdentifier++;
         while (true) {
             try {
-                ManifestCommittable committable =
-                        new ManifestCommittable(String.valueOf(new Random().nextLong()));
+                ManifestCommittable committable = new ManifestCommittable(identifier);
                 for (BinaryRowData partition : writtenPartitions) {
                     MergeTreeWriter writer =
                             writers.computeIfAbsent(partition, p -> createWriter(p, false));
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 79296b4a..8bbff172 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -189,7 +189,7 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
             dataset.add(new HashMap<>(dataPerBucket));
             dataPerBucket.clear();
         }
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         int partition = random.nextInt(numOfPartition);
         List<Integer> availableBucket = new ArrayList<>(dataset.get(partition).keySet());
@@ -220,17 +220,17 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(rowData(1, 10, 100L));
         write.write(rowData(2, 20, 200L));
         write.write(rowData(1, 11, 101L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         write.write(rowData(1, 12, 102L));
         write.write(rowData(2, 21, 201L));
         write.write(rowData(2, 22, 202L));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
 
         write.write(rowData(1, 11, 101L));
         write.write(rowData(2, 21, 201L));
         write.write(rowData(1, 12, 102L));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
 
         write.close();
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index 3b4806b8..630ab6a4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -157,20 +157,20 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         write.write(rowData(1, 10, 100L));
         write.write(rowData(2, 20, 200L));
         write.write(rowData(1, 11, 101L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         write.write(rowData(2, 21, 201L));
         write.write(rowData(1, 12, 102L));
         write.write(rowData(2, 21, 201L));
         write.write(rowData(2, 21, 201L));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
 
         write.write(rowData(1, 11, 101L));
         write.write(rowData(2, 22, 202L));
         write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
         write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
         write.write(rowDataWithKind(RowKind.DELETE, 2, 21, 201L));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
 
         write.close();
     }
@@ -184,7 +184,7 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         // no data file should be produced from this commit
         write.write(rowData(1, 10, 100L));
         write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.close();
 
         // check that no data file is produced
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index c710cbc6..ef071883 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.utils.CompatibilityTestUtils;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.util.function.FunctionWithException;
 
 import org.junit.jupiter.api.Test;
 
@@ -58,9 +59,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(rowData(1, 10, 200L));
         write.write(rowData(1, 10, 100L));
         write.write(rowData(1, 11, 101L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.write(rowData(1, 11, 55L));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
         write.close();
 
         List<Split> splits = table.newScan().plan().splits();
@@ -181,7 +182,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 20, 201L));
         write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
         write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.close();
 
         List<Split> splits = table.newScan().withIncremental(true).plan().splits();
@@ -216,7 +217,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(rowDataWithKind(RowKind.DELETE, 2, 10, 210L));
         write.compact(binaryRow(1), 0);
         write.compact(binaryRow(2), 0);
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         List<Split> splits = table.newScan().withIncremental(true).plan().splits();
         TableRead read = table.newRead();
@@ -230,13 +231,13 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(rowData(1, 40, 140L));
         write.write(rowData(2, 30, 230L));
         write.write(rowData(2, 40, 240L));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
 
         write.write(rowDataWithKind(RowKind.DELETE, 1, 40, 140L));
         write.write(rowData(2, 40, 241L));
         write.compact(binaryRow(1), 0);
         write.compact(binaryRow(2), 0);
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
 
         splits = table.newScan().withIncremental(true).plan().splits();
         assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
@@ -248,7 +249,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(rowData(1, 20, 121L));
         write.write(rowData(1, 30, 131L));
         write.write(rowData(2, 30, 231L));
-        commit.commit("3", write.prepareCommit(true));
+        commit.commit(3, write.prepareCommit(true));
 
         write.write(rowDataWithKind(RowKind.DELETE, 1, 20, 121L));
         write.write(rowData(1, 30, 132L));
@@ -259,7 +260,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(rowData(2, 40, 242L));
         write.compact(binaryRow(1), 0);
         write.compact(binaryRow(2), 0);
-        commit.commit("4", write.prepareCommit(true));
+        commit.commit(4, write.prepareCommit(true));
 
         splits = table.newScan().withIncremental(true).plan().splits();
         assertThat(getResult(read, splits, binaryRow(1), 0, CHANGELOG_ROW_TO_STRING))
@@ -285,16 +286,6 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                 createFileStoreTable(
                         conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, ChangelogProducer.INPUT));
 
-        // write another commit
-        TableWrite write = table.newWrite();
-        TableCommit commit = table.newCommit("user");
-        write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 102L));
-        write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 103L));
-        write.write(rowDataWithKind(RowKind.INSERT, 1, 20, 201L));
-        write.write(rowDataWithKind(RowKind.DELETE, 2, 10, 301L));
-        commit.commit("2", write.prepareCommit(true));
-        write.close();
-
         List<List<List<String>>> expected =
                 Arrays.asList(
                         // first changelog snapshot
@@ -335,20 +326,48 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
                         ChangelogProducer.INPUT,
                         Snapshot.FIRST_SNAPSHOT_ID - 1);
 
-        for (int i = 0; i < 3; i++) {
-            SnapshotEnumerator.EnumeratorResult result = enumerator.call();
-            assertThat(result).isNotNull();
-
-            List<Split> splits = result.plan.splits();
-            TableRead read = table.newRead();
-            for (int j = 0; j < 2; j++) {
-                assertThat(getResult(read, splits, binaryRow(j + 1), 0, CHANGELOG_ROW_TO_STRING))
-                        .isEqualTo(expected.get(i).get(j));
-            }
+        FunctionWithException<Integer, Void, Exception> assertNextSnapshot =
+                i -> {
+                    SnapshotEnumerator.EnumeratorResult result = enumerator.call();
+                    assertThat(result).isNotNull();
+
+                    List<Split> splits = result.plan.splits();
+                    TableRead read = table.newRead();
+                    for (int j = 0; j < 2; j++) {
+                        assertThat(
+                                        getResult(
+                                                read,
+                                                splits,
+                                                binaryRow(j + 1),
+                                                0,
+                                                CHANGELOG_ROW_TO_STRING))
+                                .isEqualTo(expected.get(i).get(j));
+                    }
+
+                    return null;
+                };
+
+        for (int i = 0; i < 2; i++) {
+            assertNextSnapshot.apply(i);
         }
 
         // no more changelog
         assertThat(enumerator.call()).isNull();
+
+        // write another commit
+        TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
+        write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 102L));
+        write.write(rowDataWithKind(RowKind.UPDATE_AFTER, 1, 10, 103L));
+        write.write(rowDataWithKind(RowKind.INSERT, 1, 20, 201L));
+        write.write(rowDataWithKind(RowKind.DELETE, 2, 10, 301L));
+        commit.commit(2, write.prepareCommit(true));
+        write.close();
+
+        assertNextSnapshot.apply(2);
+
+        // no more changelog
+        assertThat(enumerator.call()).isNull();
     }
 
     private void writeData() throws Exception {
@@ -359,19 +378,19 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(rowData(1, 10, 100L));
         write.write(rowData(2, 20, 200L));
         write.write(rowData(1, 11, 101L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         write.write(rowData(1, 10, 1000L));
         write.write(rowData(2, 21, 201L));
         write.write(rowData(2, 21, 2001L));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
 
         write.write(rowData(1, 11, 1001L));
         write.write(rowData(2, 21, 20001L));
         write.write(rowData(2, 22, 202L));
         write.write(rowDataWithKind(RowKind.DELETE, 1, 11, 1001L));
         write.write(rowDataWithKind(RowKind.DELETE, 2, 20, 200L));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
 
         write.close();
     }
@@ -386,15 +405,15 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
 
         write.write(rowData(1, 10, 100L));
         write.write(rowData(1, 20, 200L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         write.write(rowData(1, 30, 300L));
         write.write(rowData(1, 40, 400L));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
 
         write.write(rowData(1, 50, 500L));
         write.write(rowData(1, 60, 600L));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
 
         PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
         List<Split> splits = table.newScan().plan().splits();
@@ -427,7 +446,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         // update pk 60, 10
         write.write(rowData(1, 60, 500L));
         write.write(rowData(1, 10, 10L));
-        commit.commit("3", write.prepareCommit(true));
+        commit.commit(3, write.prepareCommit(true));
 
         write.close();
 
@@ -461,7 +480,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(rowDataWithKind(RowKind.DELETE, 1, 10, 100L));
         write.write(rowDataWithKind(RowKind.UPDATE_BEFORE, 1, 10, 100L));
         write.write(rowDataWithKind(RowKind.DELETE, 1, 20, 400L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.close();
 
         List<Split> splits = table.newScan().plan().splits();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 94ed642c..46cd2645 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -137,7 +137,7 @@ public abstract class FileStoreTableTestBase {
         TableCommit commit = table.newCommit("user");
         write.write(rowData(1, 10, 100L));
         write.write(rowData(2, 20, 200L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.close();
 
         write = table.newWrite().withOverwrite(true);
@@ -145,7 +145,7 @@ public abstract class FileStoreTableTestBase {
         write.write(rowData(2, 21, 201L));
         Map<String, String> overwritePartition = new HashMap<>();
         overwritePartition.put("pt", "2");
-        commit.withOverwritePartition(overwritePartition).commit("1", write.prepareCommit(true));
+        commit.withOverwritePartition(overwritePartition).commit(1, write.prepareCommit(true));
         write.close();
 
         List<Split> splits = table.newScan().plan().splits();
@@ -171,7 +171,7 @@ public abstract class FileStoreTableTestBase {
         write.write(rowData(1, 5, 6L));
         write.write(rowData(1, 7, 8L));
         write.write(rowData(1, 9, 10L));
-        table.newCommit("user").commit("0", write.prepareCommit(true));
+        table.newCommit("user").commit(0, write.prepareCommit(true));
         write.close();
 
         List<Split> splits =
@@ -192,15 +192,15 @@ public abstract class FileStoreTableTestBase {
 
         write.write(rowData(1, 10, 100L));
         write.write(rowData(1, 20, 200L));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         write.write(rowData(1, 30, 300L));
         write.write(rowData(1, 40, 400L));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
 
         write.write(rowData(1, 50, 500L));
         write.write(rowData(1, 60, 600L));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
 
         write.close();
 
@@ -223,7 +223,7 @@ public abstract class FileStoreTableTestBase {
             for (int j = 0; j < 1000; j++) {
                 write.write(rowData(1, 10 * i * j, 100L * i * j));
             }
-            commit.commit(String.valueOf(i), write.prepareCommit(false));
+            commit.commit(i, write.prepareCommit(false));
         }
 
         write.write(rowData(1, 40, 400L));
@@ -243,9 +243,9 @@ public abstract class FileStoreTableTestBase {
             // if remove writer too fast, will see old files, do another compaction
             // then will be conflicts
 
-            commit.commit("4", commit4);
-            commit.commit("5", commit5);
-            commit.commit("6", commit6);
+            commit.commit(4, commit4);
+            commit.commit(5, commit5);
+            commit.commit(6, commit6);
         } else {
             // commit4 is a compaction commit
             // do compaction commit5
@@ -254,8 +254,8 @@ public abstract class FileStoreTableTestBase {
             // wait compaction finish
             // commit5 should be a compaction commit
 
-            commit.commit("4", commit4);
-            commit.commit("5", commit5);
+            commit.commit(4, commit4);
+            commit.commit(5, commit5);
         }
 
         write.close();
@@ -274,7 +274,7 @@ public abstract class FileStoreTableTestBase {
         TableCommit commit = table.newCommit("user");
         for (int i = 0; i < 10; i++) {
             write.write(rowData(1, 1, 100L));
-            commit.commit(String.valueOf(i), write.prepareCommit(true));
+            commit.commit(i, write.prepareCommit(true));
         }
         write.close();
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index 025ecb93..250cfdfa 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -86,7 +86,7 @@ public class SchemaEvolutionTest {
         TableWrite write = table.newWrite();
         write.write(GenericRowData.of(1, 1L));
         write.write(GenericRowData.of(2, 2L));
-        table.newCommit("").commit("0", write.prepareCommit(true));
+        table.newCommit("").commit(0, write.prepareCommit(true));
         write.close();
 
         schemaManager.commitChanges(
@@ -96,7 +96,7 @@ public class SchemaEvolutionTest {
         write = table.newWrite();
         write.write(GenericRowData.of(3, 3L, 3L));
         write.write(GenericRowData.of(4, 4L, 4L));
-        table.newCommit("").commit("1", write.prepareCommit(true));
+        table.newCommit("").commit(1, write.prepareCommit(true));
         write.close();
 
         // read all
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index b6a09c4a..f5c77682 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -73,7 +73,7 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
             expected.add(BATCH_ROW_TO_STRING.apply(row));
         }
         List<FileCommittable> committables = write.prepareCommit(true);
-        commit.commit("0", committables);
+        commit.commit(0, committables);
         write.close();
 
         // read
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 3a7d785c..c4112b36 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -76,6 +76,8 @@ public class TableStoreHiveStorageHandlerITCase {
 
     private static String engine;
 
+    private long commitIdentifier;
+
     @BeforeClass
     public static void beforeClass() {
         // TODO Currently FlinkEmbeddedHiveRunner can only be used for one test class,
@@ -98,6 +100,8 @@ public class TableStoreHiveStorageHandlerITCase {
 
         hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
         hiveShell.execute("USE test_db");
+
+        commitIdentifier = 0;
     }
 
     @After
@@ -626,10 +630,10 @@ public class TableStoreHiveStorageHandlerITCase {
         for (RowData rowData : data) {
             write.write(rowData);
             if (ThreadLocalRandom.current().nextInt(5) == 0) {
-                commit.commit(UUID.randomUUID().toString(), write.prepareCommit(false));
+                commit.commit(commitIdentifier++, write.prepareCommit(false));
             }
         }
-        commit.commit(UUID.randomUUID().toString(), write.prepareCommit(true));
+        commit.commit(commitIdentifier++, write.prepareCommit(true));
         write.close();
 
         String tableName = "test_table_" + (UUID.randomUUID().toString().substring(0, 4));
@@ -674,7 +678,7 @@ public class TableStoreHiveStorageHandlerITCase {
         for (GenericRowData rowData : input) {
             write.write(rowData);
         }
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.close();
 
         hiveShell.execute(
@@ -779,17 +783,17 @@ public class TableStoreHiveStorageHandlerITCase {
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
         write.write(GenericRowData.of(1));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.write(GenericRowData.of((Object) null));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
         write.write(GenericRowData.of(2));
         write.write(GenericRowData.of(3));
         write.write(GenericRowData.of((Object) null));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
         write.write(GenericRowData.of(4));
         write.write(GenericRowData.of(5));
         write.write(GenericRowData.of(6));
-        commit.commit("3", write.prepareCommit(true));
+        commit.commit(3, write.prepareCommit(true));
         write.close();
 
         hiveShell.execute(
@@ -875,16 +879,16 @@ public class TableStoreHiveStorageHandlerITCase {
                         375, /* 1971-01-11 */
                         TimestampData.fromLocalDateTime(
                                 LocalDateTime.of(2022, 5, 17, 17, 29, 20, 100_000_000))));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
         write.write(GenericRowData.of(null, null));
-        commit.commit("1", write.prepareCommit(true));
+        commit.commit(1, write.prepareCommit(true));
         write.write(GenericRowData.of(376 /* 1971-01-12 */, null));
         write.write(
                 GenericRowData.of(
                         null,
                         TimestampData.fromLocalDateTime(
                                 LocalDateTime.of(2022, 6, 18, 8, 30, 0, 100_000_000))));
-        commit.commit("2", write.prepareCommit(true));
+        commit.commit(2, write.prepareCommit(true));
         write.close();
 
         hiveShell.execute(
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 08186888..79c57848 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -78,7 +78,7 @@ public class TableStoreRecordReaderTest {
         write.write(GenericRowData.of(3L, StringData.fromString("World")));
         write.write(GenericRowData.of(1L, StringData.fromString("Hi again")));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2L, StringData.fromString("Hello")));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
         RowDataContainer container = reader.createValue();
@@ -120,7 +120,7 @@ public class TableStoreRecordReaderTest {
         write.write(GenericRowData.of(1, StringData.fromString("Hi")));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, StringData.fromString("Hello")));
         write.write(GenericRowData.of(1, StringData.fromString("Hi")));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
         RowDataContainer container = reader.createValue();
@@ -160,7 +160,7 @@ public class TableStoreRecordReaderTest {
         write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
         write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello")));
         write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
-        commit.commit("0", write.prepareCommit(true));
+        commit.commit(0, write.prepareCommit(true));
 
         TableStoreRecordReader reader =
                 read(table, BinaryRowDataUtil.EMPTY_ROW, 0, Arrays.asList("c", "a"));
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index f31eeaea..1f2e4394 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -34,7 +34,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
 /** A simple table test helper to write and commit. */
 public class SimpleTableTestHelper {
@@ -42,6 +41,8 @@ public class SimpleTableTestHelper {
     private final TableWrite writer;
     private final TableCommit commit;
 
+    private long commitIdentifier;
+
     public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
         this(path, rowType, Collections.emptyList(), Collections.emptyList());
     }
@@ -60,6 +61,8 @@ public class SimpleTableTestHelper {
         FileStoreTable table = FileStoreTableFactory.create(conf);
         this.writer = table.newWrite();
         this.commit = table.newCommit("user");
+
+        this.commitIdentifier = 0;
     }
 
     public void write(RowData row) throws Exception {
@@ -67,6 +70,6 @@ public class SimpleTableTestHelper {
     }
 
     public void commit() throws Exception {
-        commit.commit(UUID.randomUUID().toString(), writer.prepareCommit(true));
+        commit.commit(commitIdentifier++, writer.prepareCommit(true));
     }
 }
diff --git a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index 705a961a..8e7f10ef 100644
--- a/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ b/flink-table-store-spark2/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -33,7 +33,6 @@ import org.apache.flink.table.types.logical.RowType;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.UUID;
 
 /** A simple table test helper to write and commit. */
 public class SimpleTableTestHelper {
@@ -41,6 +40,8 @@ public class SimpleTableTestHelper {
     private final TableWrite writer;
     private final TableCommit commit;
 
+    private long commitIdentifier;
+
     public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
         Map<String, String> options = new HashMap<>();
         // orc is shaded, can not find shaded classes in ide
@@ -58,6 +59,8 @@ public class SimpleTableTestHelper {
         FileStoreTable table = FileStoreTableFactory.create(conf);
         this.writer = table.newWrite();
         this.commit = table.newCommit("user");
+
+        this.commitIdentifier = 0;
     }
 
     public void write(RowData row) throws Exception {
@@ -65,6 +68,6 @@ public class SimpleTableTestHelper {
     }
 
     public void commit() throws Exception {
-        commit.commit(UUID.randomUUID().toString(), writer.prepareCommit(true));
+        commit.commit(commitIdentifier++, writer.prepareCommit(true));
     }
 }