You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/13 07:06:02 UTC

[flink-table-store] branch master updated: [FLINK-28465] Wait compaction finished in endInput

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new a407dd9c [FLINK-28465] Wait compaction finished in endInput
a407dd9c is described below

commit a407dd9cfd7ee62cfaed2debd0dc2fc21b69b191
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Wed Jul 13 15:05:58 2022 +0800

    [FLINK-28465] Wait compaction finished in endInput
    
    This closes #211
---
 .../connector/sink/PrepareCommitOperator.java      |  11 +-
 .../store/connector/sink/StoreCompactOperator.java |   2 +-
 .../store/connector/sink/StoreWriteOperator.java   |   4 +-
 .../store/connector/AlterTableCompactITCase.java   |   3 +-
 .../store/connector/AppendOnlyTableITCase.java     |   3 +-
 .../store/connector/ForceCompactionITCase.java     | 214 +++++++++++++++++----
 .../source/FileStoreSourceSplitReaderTest.java     |   2 +-
 .../source/TestChangelogDataReadWrite.java         |   2 +-
 .../table/store/file/data/AppendOnlyWriter.java    |   7 +-
 .../store/file/mergetree/MergeTreeWriter.java      |   7 +-
 .../table/store/file/writer/RecordWriter.java      |   5 +-
 .../table/store/table/sink/AbstractTableWrite.java |   4 +-
 .../flink/table/store/table/sink/TableWrite.java   |   2 +-
 .../flink/table/store/file/TestFileStore.java      |   2 +-
 .../store/file/data/AppendOnlyWriterTest.java      |   8 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |   8 +-
 .../store/file/operation/TestCommitThread.java     |   4 +-
 .../store/table/AppendOnlyFileStoreTableTest.java  |   6 +-
 .../ChangelogValueCountFileStoreTableTest.java     |   6 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  12 +-
 .../table/store/table/FileStoreTableTestBase.java  |   6 +-
 .../table/store/table/WritePreemptMemoryTest.java  |   2 +-
 .../hive/TableStoreHiveStorageHandlerITCase.java   |  22 +--
 .../store/mapred/TableStoreRecordReaderTest.java   |   6 +-
 .../table/store/spark/SimpleTableTestHelper.java   |   2 +-
 25 files changed, 243 insertions(+), 107 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
index 7d686d9e..052bbdb8 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/PrepareCommitOperator.java
@@ -44,7 +44,7 @@ public abstract class PrepareCommitOperator extends AbstractStreamOperator<Commi
     @Override
     public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
         if (!endOfInput) {
-            emitCommittables();
+            emitCommittables(false);
         }
         // no records are expected to emit after endOfInput
     }
@@ -52,12 +52,13 @@ public abstract class PrepareCommitOperator extends AbstractStreamOperator<Commi
     @Override
     public void endInput() throws Exception {
         endOfInput = true;
-        emitCommittables();
+        emitCommittables(true);
     }
 
-    private void emitCommittables() throws IOException {
-        prepareCommit().forEach(committable -> output.collect(new StreamRecord<>(committable)));
+    private void emitCommittables(boolean endOfInput) throws IOException {
+        prepareCommit(endOfInput)
+                .forEach(committable -> output.collect(new StreamRecord<>(committable)));
     }
 
-    protected abstract List<Committable> prepareCommit() throws IOException;
+    protected abstract List<Committable> prepareCommit(boolean endOfInput) throws IOException;
 }
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index 8ad6bf07..f4329d6b 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -58,7 +58,7 @@ public class StoreCompactOperator extends PrepareCommitOperator {
     }
 
     @Override
-    protected List<Committable> prepareCommit() throws IOException {
+    protected List<Committable> prepareCommit(boolean endOfInput) throws IOException {
         return compact.compact().stream()
                 .map(c -> new Committable(Committable.Kind.FILE, c))
                 .collect(Collectors.toList());
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
index 6679ca21..1c14ec0b 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
@@ -181,10 +181,10 @@ public class StoreWriteOperator extends PrepareCommitOperator {
     }
 
     @Override
-    protected List<Committable> prepareCommit() throws IOException {
+    protected List<Committable> prepareCommit(boolean endOfInput) throws IOException {
         List<Committable> committables = new ArrayList<>();
         try {
-            for (FileCommittable committable : write.prepareCommit()) {
+            for (FileCommittable committable : write.prepareCommit(endOfInput)) {
                 committables.add(new Committable(Committable.Kind.FILE, committable));
             }
         } catch (Exception e) {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
index 9d2708a0..34b97668 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -92,8 +92,7 @@ public class AlterTableCompactITCase extends FileStoreTableITCase {
 
     @Test
     public void testChangeNumOfSortedRunTrigger() {
-        // force auto-compaction and increase trigger
-        batchSql("ALTER TABLE T0 SET ('commit.force-compact' = 'true')");
+        // increase trigger
         batchSql("ALTER TABLE T0 SET ('num-sorted-run.compaction-trigger' = '5')");
 
         // write duplicates
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
index 70f06993..6391a00c 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
@@ -127,7 +127,6 @@ public class AppendOnlyTableITCase extends FileStoreTableITCase {
 
     @Test
     public void testAutoCompaction() {
-        batchSql("ALTER TABLE append_table SET ('commit.force-compact' = 'true')");
         batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')");
         batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')");
 
@@ -200,7 +199,7 @@ public class AppendOnlyTableITCase extends FileStoreTableITCase {
     @Override
     protected List<String> ddl() {
         return Collections.singletonList(
-                "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('write-mode'='append-only', 'commit.force-compact' = 'true')");
+                "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('write-mode'='append-only')");
     }
 
     private void testRejectChanges(RowKind kind) {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
index c2a0cd39..18c9a9f6 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ForceCompactionITCase.java
@@ -18,6 +18,18 @@
 
 package org.apache.flink.table.store.connector;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.stats.BinaryTableStats;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -25,9 +37,11 @@ import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** ITCase for enabling commit.force-compact. */
+/** ITCase for auto-enabling commit.force-compact under batch mode. */
 public class ForceCompactionITCase extends FileStoreTableITCase {
 
+    private final FileFormat avro = FileFormat.fromIdentifier("avro", new Configuration());
+
     @Override
     protected List<String> ddl() {
         return Arrays.asList(
@@ -40,73 +54,193 @@ public class ForceCompactionITCase extends FileStoreTableITCase {
                         + "  f0 INT\n, "
                         + "  f1 STRING\n, "
                         + "  f2 STRING\n"
-                        + ")");
+                        + ")",
+                "CREATE TABLE IF NOT EXISTS T2 (\n"
+                        + "  f0 INT\n, "
+                        + "  f1 STRING\n, "
+                        + "  f2 STRING\n"
+                        + ") WITH (\n"
+                        + "'write-mode' = 'append-only')");
     }
 
     @Test
     public void testDynamicPartition() {
         batchSql("ALTER TABLE T SET ('num-levels' = '3')");
-        batchSql("ALTER TABLE T SET ('commit.force-compact' = 'true')");
-        batchSql(
+
+        // Winter: 1, Spring: 1, Summer: 1
+        assertAppend(
                 "INSERT INTO T VALUES(1, 'Winter', 'Winter is Coming'),"
                         + "(2, 'Winter', 'The First Snowflake'), "
                         + "(2, 'Spring', 'The First Rose in Spring'), "
-                        + "(7, 'Summer', 'Summertime Sadness')");
-        batchSql("INSERT INTO T VALUES(12, 'Winter', 'Last Christmas')");
-        batchSql("INSERT INTO T VALUES(11, 'Winter', 'Winter is Coming')");
-        batchSql("INSERT INTO T VALUES(10, 'Autumn', 'Refrain')");
-        batchSql(
+                        + "(7, 'Summer', 'Summertime Sadness')",
+                "T",
+                1L);
+
+        // Winter: 2
+        assertAppend("INSERT INTO T VALUES(12, 'Winter', 'Last Christmas')", "T", 2L);
+
+        // Winter: 3, Spring: 2
+        assertAppend(
+                "INSERT INTO T VALUES(11, 'Winter', 'Winter is Coming'), "
+                        + "(4, 'Spring', 'April')",
+                "T",
+                3L);
+
+        // Autumn: 1
+        assertAppend("INSERT INTO T VALUES(10, 'Autumn', 'Refrain')", "T", 4L);
+
+        // Summer: 2, Spring: 3
+        assertAppend(
                 "INSERT INTO T VALUES(6, 'Summer', 'Watermelon Sugar'), "
-                        + "(4, 'Spring', 'Spring Water')");
-        batchSql(
+                        + "(4, 'Spring', 'Spring Water')",
+                "T",
+                5L);
+
+        // Summer: 3, Autumn: 2
+        assertAppend(
                 "INSERT INTO T VALUES(66, 'Summer', 'Summer Vibe'),"
-                        + " (9, 'Autumn', 'Wake Me Up When September Ends')");
-        batchSql(
+                        + " (9, 'Autumn', 'Wake Me Up When September Ends')",
+                "T",
+                6L);
+
+        // Summer: 4, Autumn: 3
+        assertAppend(
                 "INSERT INTO T VALUES(666, 'Summer', 'Summer Vibe'),"
-                        + " (9, 'Autumn', 'Wake Me Up When September Ends')");
-        batchSql(
+                        + " (9, 'Autumn', 'Wake Me Up When September Ends')",
+                "T",
+                7L);
+
+        // Summer: 5, Autumn: 4, trigger compaction for partition 'Summer'
+        assertCompact(
                 "INSERT INTO T VALUES(6666, 'Summer', 'Summer Vibe'),"
-                        + " (9, 'Autumn', 'Wake Me Up When September Ends')");
-        batchSql(
+                        + " (9, 'Autumn', 'Wake Me Up When September Ends')",
+                "T",
+                9L,
+                "Summer",
+                "Summer");
+
+        // Summer: 6, Autumn: 5, trigger compaction for partition 'Autumn'
+        assertCompact(
                 "INSERT INTO T VALUES(66666, 'Summer', 'Summer Vibe'),"
-                        + " (9, 'Autumn', 'Wake Me Up When September Ends')");
-        batchSql(
-                "INSERT INTO T VALUES(666666, 'Summer', 'Summer Vibe'),"
-                        + " (9, 'Autumn', 'Wake Me Up When September Ends')");
-        batchSql(
-                "INSERT INTO T VALUES(6666666, 'Summer', 'Summer Vibe'),"
-                        + " (9, 'Autumn', 'Wake Me Up When September Ends')");
-
-        assertThat(batchSql("SELECT * FROM T")).hasSize(21);
+                        + " (9, 'Autumn', 'Wake Me Up When September Ends')",
+                "T",
+                11L,
+                "Autumn",
+                "Autumn");
+
+        // Winter: 4, Spring: 4
+        assertAppend(
+                "INSERT INTO T VALUES(1, 'Winter', 'Cold Water'), (4, 'Spring', 'SpringBoot')",
+                "T",
+                12L);
+
+        // Winter: 5, Spring: 5, trigger compaction for both partitions
+        assertCompact(
+                "INSERT INTO T VALUES(1, 'Winter', 'Winter is Coming'),"
+                        + " (4, 'Spring', 'The First Rose in Spring')",
+                "T",
+                14L,
+                "Spring",
+                "Winter");
+
+        assertThat(batchSql("SELECT * FROM T")).hasSize(22);
     }
 
     @Test
     public void testNoDefaultNumOfLevels() {
-        batchSql("ALTER TABLE T1 SET ('commit.force-compact' = 'true')");
-        batchSql(
+        assertAppend(
                 "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is Coming'),"
                         + "(2, 'Winter', 'The First Snowflake'), "
                         + "(2, 'Spring', 'The First Rose in Spring'), "
-                        + "(7, 'Summer', 'Summertime Sadness')");
-        batchSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last Christmas')");
-        batchSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is Coming')");
-        batchSql("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')");
-        batchSql(
+                        + "(7, 'Summer', 'Summertime Sadness')",
+                "T1",
+                1L);
+
+        assertAppend("INSERT INTO T1 VALUES(12, 'Winter', 'Last Christmas')", "T1", 2L);
+
+        assertAppend("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is Coming')", "T1", 3L);
+
+        assertAppend("INSERT INTO T1 VALUES(10, 'Autumn', 'Refrain')", "T1", 4L);
+
+        assertCompact(
                 "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon Sugar'), "
-                        + "(4, 'Spring', 'Spring Water')");
-        batchSql(
+                        + "(4, 'Spring', 'Spring Water')",
+                "T1",
+                6L);
+
+        assertAppend(
                 "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe'), "
-                        + "(9, 'Autumn', 'Wake Me Up When September Ends')");
+                        + "(9, 'Autumn', 'Wake Me Up When September Ends')",
+                "T1",
+                7L);
 
-        batchSql(
+        assertAppend(
                 "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
-                        + "(9, 'Autumn', 'Wake Me Up When September Ends')");
+                        + "(9, 'Autumn', 'Wake Me Up When September Ends')",
+                "T1",
+                8L);
 
+        // change num-sorted-run.compaction-trigger to test restore LSM
         batchSql("ALTER TABLE T1 SET ('num-sorted-run.compaction-trigger' = '2')");
-        batchSql(
+
+        assertCompact(
                 "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe'), "
-                        + "(9, 'Autumn', 'Wake Me Up When September Ends')");
+                        + "(9, 'Autumn', 'Wake Me Up When September Ends')",
+                "T1",
+                10L);
 
         assertThat(batchSql("SELECT * FROM T1")).hasSize(15);
     }
+
+    @Test
+    public void testForceCompact() {
+        // explicit set false to verify force compact
+        batchSql("ALTER TABLE T1 SET ('commit.force-compact' = 'false')");
+        batchSql("ALTER TABLE T1 SET ('num-sorted-run.compaction-trigger' = '2')");
+
+        assertAppend("INSERT INTO T1 VALUES (1, 'Winter', 'Winter is Coming')", "T1", 1L);
+        assertAppend("INSERT INTO T1 VALUES (2, 'Spring', 'Spring Water')", "T1", 2L);
+        assertCompact("INSERT INTO T1 VALUES (3, 'Summer', 'Summer Vibe')", "T1", 4L);
+
+        batchSql("ALTER TABLE T2 SET ('commit.force-compact' = 'false')");
+        batchSql("ALTER TABLE T2 SET ('compaction.early-max.file-num' = '2')");
+        assertAppend("INSERT INTO T2 VALUES (1, 'Winter', 'Winter is Coming')", "T2", 1L);
+        assertCompact("INSERT INTO T2 VALUES (2, 'Spring', 'Spring Water')", "T2", 3L);
+    }
+
+    private void assertAppend(String sql, String tableName, long expectSnapshotId) {
+        batchSql(sql);
+        Snapshot snapshot = findLatestSnapshot(tableName, true);
+        assertThat(snapshot.id()).isEqualTo(expectSnapshotId);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+    }
+
+    private void assertCompact(
+            String sql, String tableName, long expectSnapshotId, String... expectPartMinMax) {
+        batchSql(sql);
+        Snapshot snapshot = findLatestSnapshot(tableName, true);
+        assertThat(snapshot.id()).isEqualTo(expectSnapshotId);
+        assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+        RowType partType =
+                expectPartMinMax.length > 0
+                        ? RowType.of(new LogicalType[] {new VarCharType()}, new String[] {"f1"})
+                        : RowType.of();
+        FileStorePathFactory pathFactory =
+                new FileStorePathFactory(
+                        getTableDirectory(tableName, true),
+                        partType,
+                        "default",
+                        CoreOptions.FILE_FORMAT.defaultValue());
+
+        List<ManifestFileMeta> manifestFileMetas =
+                new ManifestList.Factory(partType, avro, pathFactory)
+                        .create()
+                        .read(snapshot.deltaManifestList());
+        assertThat(manifestFileMetas.get(0).numDeletedFiles()).isGreaterThanOrEqualTo(1);
+        BinaryTableStats partStats = manifestFileMetas.get(0).partitionStats();
+        if (expectPartMinMax.length > 0) {
+            assertThat(partStats.min().getString(0).toString()).isEqualTo(expectPartMinMax[0]);
+            assertThat(partStats.max().getString(0).toString()).isEqualTo(expectPartMinMax[1]);
+        }
+    }
 }
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index 214fbca8..8da3af35 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -147,7 +147,7 @@ public class FileStoreSourceSplitReaderTest {
         writer.write(
                 new KeyValue()
                         .replace(GenericRowData.of(222L), RowKind.DELETE, GenericRowData.of(333L)));
-        List<DataFileMeta> files = writer.prepareCommit().newFiles();
+        List<DataFileMeta> files = writer.prepareCommit(true).newFiles();
         writer.close();
 
         assignSplit(reader, newSourceSplit("id1", row(1), 0, files, true));
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 94a02856..1d26808c 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -134,7 +134,7 @@ public class TestChangelogDataReadWrite {
                                     RowKind.INSERT,
                                     GenericRowData.of(tuple2.f1)));
         }
-        List<DataFileMeta> files = writer.prepareCommit().newFiles();
+        List<DataFileMeta> files = writer.prepareCommit(true).newFiles();
         writer.close();
         return new ArrayList<>(files);
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
index 305a7133..f58e9f60 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
@@ -109,7 +109,7 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
     }
 
     @Override
-    public Increment prepareCommit() throws Exception {
+    public Increment prepareCommit(boolean endOnfInput) throws Exception {
         List<DataFileMeta> newFiles = new ArrayList<>();
         if (writer != null) {
             writer.close();
@@ -130,7 +130,10 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
         // add new generated files
         toCompact.addAll(newFiles);
         submitCompaction();
-        finishCompaction(forceCompact);
+
+        boolean blocking = endOnfInput || forceCompact;
+        finishCompaction(blocking);
+
         return drainIncrement(newFiles);
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 50fea94f..c838a253 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -179,11 +179,10 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
     }
 
     @Override
-    public Increment prepareCommit() throws Exception {
+    public Increment prepareCommit(boolean endOfInput) throws Exception {
         flushMemory();
-        if (commitForceCompact) {
-            finishCompaction(true);
-        }
+        boolean blocking = endOfInput || commitForceCompact;
+        finishCompaction(blocking);
         return drainIncrement();
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
index 5b0ce6aa..e072a797 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java
@@ -26,7 +26,7 @@ import java.util.List;
 /**
  * The {@code RecordWriter} is responsible for writing data and handling in-progress files used to
  * write yet un-staged data. The incremental files ready to commit is returned to the system by the
- * {@link #prepareCommit()}.
+ * {@link #prepareCommit(boolean)}.
  *
  * @param <T> type of record to write.
  */
@@ -38,9 +38,10 @@ public interface RecordWriter<T> {
     /**
      * Prepare for a commit.
      *
+     * @param endOfInput Signal that there is no committable anymore.
      * @return Incremental files in this snapshot cycle
      */
-    Increment prepareCommit() throws Exception;
+    Increment prepareCommit(boolean endOfInput) throws Exception;
 
     /**
      * Sync the writer. The structure related to file reading and writing is thread unsafe, there
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
index 8ba3bfc7..51b666c7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
@@ -78,7 +78,7 @@ public abstract class AbstractTableWrite<T> implements TableWrite {
     }
 
     @Override
-    public List<FileCommittable> prepareCommit() throws Exception {
+    public List<FileCommittable> prepareCommit(boolean endOfInput) throws Exception {
         List<FileCommittable> result = new ArrayList<>();
 
         Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>>> partIter =
@@ -93,7 +93,7 @@ public abstract class AbstractTableWrite<T> implements TableWrite {
                 int bucket = entry.getKey();
                 RecordWriter<T> writer = entry.getValue();
                 FileCommittable committable =
-                        new FileCommittable(partition, bucket, writer.prepareCommit());
+                        new FileCommittable(partition, bucket, writer.prepareCommit(endOfInput));
                 result.add(committable);
 
                 // clear if no update
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index 5384f892..3a5615d8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -34,7 +34,7 @@ public interface TableWrite {
 
     SinkRecord write(RowData rowData) throws Exception;
 
-    List<FileCommittable> prepareCommit() throws Exception;
+    List<FileCommittable> prepareCommit(boolean endOfInput) throws Exception;
 
     void close() throws Exception;
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 3a6e39ca..e186a88f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -229,7 +229,7 @@ public class TestFileStore extends KeyValueFileStore {
                 writers.entrySet()) {
             for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket :
                     entryWithPartition.getValue().entrySet()) {
-                Increment increment = entryWithBucket.getValue().prepareCommit();
+                Increment increment = entryWithBucket.getValue().prepareCommit(emptyWriter);
                 committable.addFileCommittable(
                         entryWithPartition.getKey(), entryWithBucket.getKey(), increment);
             }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index a60380c0..a16411a4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -81,7 +81,7 @@ public class AppendOnlyWriterTest {
 
         for (int i = 0; i < 3; i++) {
             writer.sync();
-            Increment inc = writer.prepareCommit();
+            Increment inc = writer.prepareCommit(true);
 
             assertThat(inc.newFiles()).isEqualTo(Collections.emptyList());
             assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList());
@@ -134,7 +134,7 @@ public class AppendOnlyWriterTest {
             }
 
             writer.sync();
-            Increment inc = writer.prepareCommit();
+            Increment inc = writer.prepareCommit(true);
             if (txn > 0 && txn % 3 == 0) {
                 assertThat(inc.compactBefore()).hasSize(4);
                 assertThat(inc.compactAfter()).hasSize(1);
@@ -191,7 +191,7 @@ public class AppendOnlyWriterTest {
         }
 
         writer.sync();
-        Increment firstInc = writer.prepareCommit();
+        Increment firstInc = writer.prepareCommit(true);
         assertThat(firstInc.compactBefore()).isEqualTo(Collections.emptyList());
         assertThat(firstInc.compactAfter()).isEqualTo(Collections.emptyList());
 
@@ -228,7 +228,7 @@ public class AppendOnlyWriterTest {
         assertThat(writer.getToCompact()).containsExactlyElementsOf(firstInc.newFiles());
         writer.write(row(id, String.format("%03d", id), PART));
         writer.sync();
-        Increment secInc = writer.prepareCommit();
+        Increment secInc = writer.prepareCommit(true);
 
         // check compact before and after
         List<DataFileMeta> compactBefore = secInc.compactBefore();
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 699eeb54..8a5793d4 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -166,10 +166,10 @@ public class MergeTreeTest {
     @Test
     public void testRestore() throws Exception {
         List<TestRecord> expected = new ArrayList<>(writeBatch());
-        List<DataFileMeta> newFiles = writer.prepareCommit().newFiles();
+        List<DataFileMeta> newFiles = writer.prepareCommit(true).newFiles();
         writer = createMergeTreeWriter(newFiles);
         expected.addAll(writeBatch());
-        writer.prepareCommit();
+        writer.prepareCommit(true);
         writer.sync();
         assertRecords(expected);
     }
@@ -206,7 +206,7 @@ public class MergeTreeTest {
             }
             writeAll(records);
             expected.addAll(records);
-            Increment increment = writer.prepareCommit();
+            Increment increment = writer.prepareCommit(true);
             mergeCompacted(newFileNames, compactedFiles, increment);
         }
         writer.close();
@@ -237,7 +237,7 @@ public class MergeTreeTest {
                 writer.sync();
             }
 
-            Increment increment = writer.prepareCommit();
+            Increment increment = writer.prepareCommit(true);
             newFiles.addAll(increment.newFiles());
             mergeCompacted(newFileNames, compactedFiles, increment);
         }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index fe1b7d7e..070c3767 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -104,7 +104,7 @@ public class TestCommitThread extends Thread {
         ManifestCommittable committable =
                 new ManifestCommittable(String.valueOf(new Random().nextLong()));
         for (Map.Entry<BinaryRowData, MergeTreeWriter> entry : writers.entrySet()) {
-            committable.addFileCommittable(entry.getKey(), 0, entry.getValue().prepareCommit());
+            committable.addFileCommittable(entry.getKey(), 0, entry.getValue().prepareCommit(true));
         }
 
         runWithRetry(committable, () -> commit.commit(committable, Collections.emptyMap()));
@@ -114,7 +114,7 @@ public class TestCommitThread extends Thread {
         BinaryRowData partition = overwriteData();
         ManifestCommittable committable =
                 new ManifestCommittable(String.valueOf(new Random().nextLong()));
-        committable.addFileCommittable(partition, 0, writers.get(partition).prepareCommit());
+        committable.addFileCommittable(partition, 0, writers.get(partition).prepareCommit(true));
 
         runWithRetry(
                 committable,
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 78cd1c8f..f4498949 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -147,17 +147,17 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(GenericRowData.of(1, 10, 100L));
         write.write(GenericRowData.of(2, 20, 200L));
         write.write(GenericRowData.of(1, 11, 101L));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
 
         write.write(GenericRowData.of(1, 12, 102L));
         write.write(GenericRowData.of(2, 21, 201L));
         write.write(GenericRowData.of(2, 22, 202L));
-        commit.commit("1", write.prepareCommit());
+        commit.commit("1", write.prepareCommit(true));
 
         write.write(GenericRowData.of(1, 11, 101L));
         write.write(GenericRowData.of(2, 21, 201L));
         write.write(GenericRowData.of(1, 12, 102L));
-        commit.commit("2", write.prepareCommit());
+        commit.commit("2", write.prepareCommit(true));
 
         write.close();
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index e39ac3dd..24733a7b 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -147,20 +147,20 @@ public class ChangelogValueCountFileStoreTableTest extends FileStoreTableTestBas
         write.write(GenericRowData.of(1, 10, 100L));
         write.write(GenericRowData.of(2, 20, 200L));
         write.write(GenericRowData.of(1, 11, 101L));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
 
         write.write(GenericRowData.of(2, 21, 201L));
         write.write(GenericRowData.of(1, 12, 102L));
         write.write(GenericRowData.of(2, 21, 201L));
         write.write(GenericRowData.of(2, 21, 201L));
-        commit.commit("1", write.prepareCommit());
+        commit.commit("1", write.prepareCommit(true));
 
         write.write(GenericRowData.of(1, 11, 101L));
         write.write(GenericRowData.of(2, 22, 202L));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 10, 100L));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 21, 201L));
-        commit.commit("2", write.prepareCommit());
+        commit.commit("2", write.prepareCommit(true));
 
         write.close();
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 0f3c31a1..a7a5a318 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -58,9 +58,9 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(GenericRowData.of(1, 10, 200L));
         write.write(GenericRowData.of(1, 10, 100L));
         write.write(GenericRowData.of(1, 11, 101L));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
         write.write(GenericRowData.of(1, 11, 55L));
-        commit.commit("1", write.prepareCommit());
+        commit.commit("1", write.prepareCommit(true));
         write.close();
 
         List<Split> splits = table.newScan().plan().splits;
@@ -169,7 +169,7 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(GenericRowData.of(1, 10, 101L));
         write.write(GenericRowData.ofKind(RowKind.UPDATE_BEFORE, 1, 10, 101L));
         write.write(GenericRowData.ofKind(RowKind.UPDATE_AFTER, 1, 10, 102L));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
         write.close();
 
         List<Split> splits = table.newScan().withIncremental(true).plan().splits;
@@ -192,19 +192,19 @@ public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase {
         write.write(GenericRowData.of(1, 10, 100L));
         write.write(GenericRowData.of(2, 20, 200L));
         write.write(GenericRowData.of(1, 11, 101L));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
 
         write.write(GenericRowData.of(1, 10, 1000L));
         write.write(GenericRowData.of(2, 21, 201L));
         write.write(GenericRowData.of(2, 21, 2001L));
-        commit.commit("1", write.prepareCommit());
+        commit.commit("1", write.prepareCommit(true));
 
         write.write(GenericRowData.of(1, 11, 1001L));
         write.write(GenericRowData.of(2, 21, 20001L));
         write.write(GenericRowData.of(2, 22, 202L));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 1, 11, 1001L));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 20, 200L));
-        commit.commit("2", write.prepareCommit());
+        commit.commit("2", write.prepareCommit(true));
 
         write.close();
     }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 86e28662..2b818185 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -87,7 +87,7 @@ public abstract class FileStoreTableTestBase {
         TableCommit commit = table.newCommit("user");
         write.write(GenericRowData.of(1, 10, 100L));
         write.write(GenericRowData.of(2, 20, 200L));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
         write.close();
 
         write = table.newWrite().withOverwrite(true);
@@ -95,7 +95,7 @@ public abstract class FileStoreTableTestBase {
         write.write(GenericRowData.of(2, 21, 201L));
         Map<String, String> overwritePartition = new HashMap<>();
         overwritePartition.put("pt", "2");
-        commit.withOverwritePartition(overwritePartition).commit("1", write.prepareCommit());
+        commit.withOverwritePartition(overwritePartition).commit("1", write.prepareCommit(true));
         write.close();
 
         List<Split> splits = table.newScan().plan().splits;
@@ -121,7 +121,7 @@ public abstract class FileStoreTableTestBase {
         write.write(GenericRowData.of(1, 5, 6L));
         write.write(GenericRowData.of(1, 7, 8L));
         write.write(GenericRowData.of(1, 9, 10L));
-        table.newCommit("user").commit("0", write.prepareCommit());
+        table.newCommit("user").commit("0", write.prepareCommit(true));
         write.close();
 
         List<Split> splits =
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index aa96da9e..d4ee1d13 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -73,7 +73,7 @@ public class WritePreemptMemoryTest extends FileStoreTableTestBase {
             write.write(row);
             expected.add(BATCH_ROW_TO_STRING.apply(row));
         }
-        List<FileCommittable> committables = write.prepareCommit();
+        List<FileCommittable> committables = write.prepareCommit(true);
         commit.commit("0", committables);
         write.close();
 
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index 80f7d5b7..c677eb51 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -109,7 +109,7 @@ public class TableStoreHiveStorageHandlerITCase {
         write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi Again")));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L, StringData.fromString("World")));
         write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
         write.close();
 
         hiveShell.execute(
@@ -152,7 +152,7 @@ public class TableStoreHiveStorageHandlerITCase {
         write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L, StringData.fromString("World")));
         write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
         write.close();
 
         hiveShell.execute(
@@ -199,7 +199,7 @@ public class TableStoreHiveStorageHandlerITCase {
         for (GenericRowData rowData : input) {
             write.write(rowData);
         }
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
         write.close();
 
         hiveShell.execute(
@@ -304,17 +304,17 @@ public class TableStoreHiveStorageHandlerITCase {
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
         write.write(GenericRowData.of(1));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
         write.write(GenericRowData.of((Object) null));
-        commit.commit("1", write.prepareCommit());
+        commit.commit("1", write.prepareCommit(true));
         write.write(GenericRowData.of(2));
         write.write(GenericRowData.of(3));
         write.write(GenericRowData.of((Object) null));
-        commit.commit("2", write.prepareCommit());
+        commit.commit("2", write.prepareCommit(true));
         write.write(GenericRowData.of(4));
         write.write(GenericRowData.of(5));
         write.write(GenericRowData.of(6));
-        commit.commit("3", write.prepareCommit());
+        commit.commit("3", write.prepareCommit(true));
         write.close();
 
         hiveShell.execute(
@@ -400,15 +400,15 @@ public class TableStoreHiveStorageHandlerITCase {
                         375, /* 1971-01-11 */
                         TimestampData.fromLocalDateTime(
                                 LocalDateTime.of(2022, 5, 17, 17, 29, 20))));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
         write.write(GenericRowData.of(null, null));
-        commit.commit("1", write.prepareCommit());
+        commit.commit("1", write.prepareCommit(true));
         write.write(GenericRowData.of(376 /* 1971-01-12 */, null));
         write.write(
                 GenericRowData.of(
                         null,
                         TimestampData.fromLocalDateTime(LocalDateTime.of(2022, 6, 18, 8, 30, 0))));
-        commit.commit("2", write.prepareCommit());
+        commit.commit("2", write.prepareCommit(true));
         write.close();
 
         hiveShell.execute(
@@ -459,7 +459,7 @@ public class TableStoreHiveStorageHandlerITCase {
         write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
         write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello")));
         write.write(GenericRowData.of(3, 30L, StringData.fromString("World")));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
         write.close();
 
         hiveShell.execute(
diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
index 4bf2744c..e8106dc3 100644
--- a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
+++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java
@@ -78,7 +78,7 @@ public class TableStoreRecordReaderTest {
         write.write(GenericRowData.of(3L, StringData.fromString("World")));
         write.write(GenericRowData.of(1L, StringData.fromString("Hi again")));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2L, StringData.fromString("Hello")));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
 
         TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
         RowDataContainer container = reader.createValue();
@@ -120,7 +120,7 @@ public class TableStoreRecordReaderTest {
         write.write(GenericRowData.of(1, StringData.fromString("Hi")));
         write.write(GenericRowData.ofKind(RowKind.DELETE, 2, StringData.fromString("Hello")));
         write.write(GenericRowData.of(1, StringData.fromString("Hi")));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
 
         TableStoreRecordReader reader = read(table, BinaryRowDataUtil.EMPTY_ROW, 0);
         RowDataContainer container = reader.createValue();
@@ -160,7 +160,7 @@ public class TableStoreRecordReaderTest {
         write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
         write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello")));
         write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
-        commit.commit("0", write.prepareCommit());
+        commit.commit("0", write.prepareCommit(true));
 
         TableStoreRecordReader reader =
                 read(table, BinaryRowDataUtil.EMPTY_ROW, 0, Arrays.asList("c", "a"));
diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index 680b8dea..705a961a 100644
--- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -65,6 +65,6 @@ public class SimpleTableTestHelper {
     }
 
     public void commit() throws Exception {
-        commit.commit(UUID.randomUUID().toString(), writer.prepareCommit());
+        commit.commit(UUID.randomUUID().toString(), writer.prepareCommit(true));
     }
 }