You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/09/19 11:01:44 UTC

[flink-table-store] branch master updated: [FLINK-28810] Introduce write.skip-compaction to skip compaction on write

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new fc73a0bc [FLINK-28810] Introduce write.skip-compaction to skip compaction on write
fc73a0bc is described below

commit fc73a0bc4427741b41cb4b15e75c2dba363b97fb
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Sep 19 19:01:38 2022 +0800

    [FLINK-28810] Introduce write.skip-compaction to skip compaction on write
    
    This closes #262
---
 .../shortcodes/generated/core_configuration.html   |  6 +++
 .../org/apache/flink/table/store/CoreOptions.java  | 14 ++++-
 .../table/store/file/AppendOnlyFileStore.java      |  6 +--
 .../table/store/file/compact/CompactManager.java   | 34 +++++++-----
 .../store/file/compact/NoopCompactManager.java     | 48 +++++++++++++++++
 .../store/file/data/AppendOnlyCompactManager.java  | 39 ++++++++++++--
 .../table/store/file/data/AppendOnlyWriter.java    | 51 +++++-------------
 .../flink/table/store/file/data/DataFileMeta.java  |  7 +++
 .../store/file/mergetree/MergeTreeWriter.java      | 32 ++++-------
 .../mergetree/compact/MergeTreeCompactManager.java | 30 +++++++++--
 .../file/operation/AbstractFileStoreWrite.java     |  7 ---
 .../file/operation/AppendOnlyFileStoreWrite.java   | 54 +++++++++++--------
 .../file/operation/KeyValueFileStoreWrite.java     | 35 +++++++-----
 .../store/file/data/AppendOnlyWriterTest.java      | 63 ++++++++++++----------
 .../store/file/format/FileFormatSuffixTest.java    |  2 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |  8 +--
 .../compact/MergeTreeCompactManagerTest.java       | 12 +++--
 .../table/store/table/FileStoreTableTestBase.java  | 40 ++++++++++++++
 18 files changed, 324 insertions(+), 164 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index c01d1bc8..81aa677d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -230,5 +230,11 @@
             <td><p>Enum</p></td>
             <td>Specify the write mode for table.<br /><br />Possible values:<ul><li>"append-only": The table can only accept append-only insert operations. Neither data deduplication nor any primary key constraints will be done when inserting rows into table store.</li><li>"change-log": The table can accept insert/delete/update operations.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>write.compaction-skip</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to skip compaction on write.</td>
+        </tr>
     </tbody>
 </table>
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 31b6b1f2..72bce71a 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -156,6 +156,12 @@ public class CoreOptions implements Serializable {
                     .defaultValue(WriteMode.CHANGE_LOG)
                     .withDescription("Specify the write mode for table.");
 
+    public static final ConfigOption<Boolean> WRITE_COMPACTION_SKIP =
+            ConfigOptions.key("write.compaction-skip")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to skip compaction on write.");
+
     public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
             ConfigOptions.key("source.split.target-size")
                     .memoryType()
@@ -458,11 +464,11 @@ public class CoreOptions implements Serializable {
         return options.get(COMPACTION_SIZE_RATIO);
     }
 
-    public int minFileNum() {
+    public int compactionMinFileNum() {
         return options.get(COMPACTION_MIN_FILE_NUM);
     }
 
-    public int maxFileNum() {
+    public int compactionMaxFileNum() {
         return options.get(COMPACTION_MAX_FILE_NUM);
     }
 
@@ -478,6 +484,10 @@ public class CoreOptions implements Serializable {
         return options.getOptional(SEQUENCE_FIELD);
     }
 
+    public boolean writeCompactionSkip() {
+        return options.get(WRITE_COMPACTION_SKIP);
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index 25dee9fe..d8aa4703 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -63,14 +63,10 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
                 newRead(),
                 schemaId,
                 rowType,
-                options.fileFormat(),
                 pathFactory(),
                 snapshotManager(),
                 newScan(true),
-                options.targetFileSize(),
-                options.minFileNum(),
-                options.maxFileNum(),
-                options.commitForceCompact());
+                options);
     }
 
     private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
index daf062b8..3bd3b5ad 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.store.file.compact;
 
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,14 +42,28 @@ public abstract class CompactManager {
         this.executor = executor;
     }
 
-    /** Submit a new compaction task. */
-    public abstract void submitCompaction();
+    /** Should wait compaction finish. */
+    public abstract boolean shouldWaitCompaction();
+
+    /** Add a new file. */
+    public abstract void addNewFile(DataFileMeta file);
+
+    /** Trigger a new compaction task. */
+    public abstract void triggerCompaction();
 
-    public boolean isCompactionFinished() {
-        return taskFuture == null;
+    /** Get compaction result. Wait finish if {@code blocking} is true. */
+    public abstract Optional<CompactResult> getCompactionResult(boolean blocking)
+            throws ExecutionException, InterruptedException;
+
+    public void cancelCompaction() {
+        // TODO this method may leave behind orphan files if compaction is actually finished
+        //  but some CPU work still needs to be done
+        if (taskFuture != null && !taskFuture.isCancelled()) {
+            taskFuture.cancel(true);
+        }
     }
 
-    public Optional<CompactResult> finishCompaction(boolean blocking)
+    protected final Optional<CompactResult> innerGetCompactionResult(boolean blocking)
             throws ExecutionException, InterruptedException {
         if (taskFuture != null) {
             if (blocking || taskFuture.isDone()) {
@@ -65,12 +81,4 @@ public abstract class CompactManager {
         }
         return Optional.empty();
     }
-
-    public void cancelCompaction() {
-        // TODO this method may leave behind orphan files if compaction is actually finished
-        //  but some CPU work still needs to be done
-        if (taskFuture != null && !taskFuture.isCancelled()) {
-            taskFuture.cancel(true);
-        }
-    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
new file mode 100644
index 00000000..84a650fe
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.compact;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+/** A {@link CompactManager} which doesn't do things. */
+public class NoopCompactManager extends CompactManager {
+
+    public NoopCompactManager(ExecutorService executor) {
+        super(executor);
+    }
+
+    @Override
+    public boolean shouldWaitCompaction() {
+        return false;
+    }
+
+    @Override
+    public void addNewFile(DataFileMeta file) {}
+
+    @Override
+    public void triggerCompaction() {}
+
+    @Override
+    public Optional<CompactResult> getCompactionResult(boolean blocking) {
+        return Optional.empty();
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
index 1f27efe4..d10063bd 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
@@ -31,6 +31,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
 /** Compact manager for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */
@@ -58,10 +59,9 @@ public class AppendOnlyCompactManager extends CompactManager {
     }
 
     @Override
-    public void submitCompaction() {
+    public void triggerCompaction() {
         if (taskFuture != null) {
-            throw new IllegalStateException(
-                    "Please finish the previous compaction before submitting new one.");
+            return;
         }
         pickCompactBefore()
                 .ifPresent(
@@ -70,6 +70,35 @@ public class AppendOnlyCompactManager extends CompactManager {
                                         executor.submit(new AutoCompactTask(inputs, rewriter)));
     }
 
+    @Override
+    public boolean shouldWaitCompaction() {
+        return false;
+    }
+
+    @Override
+    public void addNewFile(DataFileMeta file) {
+        toCompact.add(file);
+    }
+
+    /** Finish current task, and update result files to {@link #toCompact}. */
+    @Override
+    public Optional<CompactResult> getCompactionResult(boolean blocking)
+            throws ExecutionException, InterruptedException {
+        Optional<CompactResult> result = innerGetCompactionResult(blocking);
+        result.ifPresent(
+                r -> {
+                    if (!r.after().isEmpty()) {
+                        // if the last compacted file is still small,
+                        // add it back to the head
+                        DataFileMeta lastFile = r.after().get(r.after().size() - 1);
+                        if (lastFile.fileSize() < targetFileSize) {
+                            toCompact.offerFirst(lastFile);
+                        }
+                    }
+                });
+        return result;
+    }
+
     @VisibleForTesting
     Optional<List<DataFileMeta>> pickCompactBefore() {
         return pick(toCompact, targetFileSize, minFileNum, maxFileNum);
@@ -80,6 +109,10 @@ public class AppendOnlyCompactManager extends CompactManager {
             long targetFileSize,
             int minFileNum,
             int maxFileNum) {
+        if (toCompact.isEmpty()) {
+            return Optional.empty();
+        }
+
         long totalFileSize = 0L;
         int fileNum = 0;
         LinkedList<DataFileMeta> candidates = new LinkedList<>();
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
index 1790e4c0..1476a287 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
@@ -19,10 +19,10 @@
 
 package org.apache.flink.table.store.file.data;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.compact.CompactManager;
 import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.stats.BinaryTableStats;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
@@ -41,13 +41,13 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.apache.flink.table.store.file.data.AppendOnlyWriter.RowRollingWriter.createRollingRowWriter;
+import static org.apache.flink.table.store.file.data.DataFileMeta.getMaxSequenceNumber;
 
 /**
  * A {@link RecordWriter} implementation that only accepts records which are always insert
@@ -60,9 +60,8 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
     private final long targetFileSize;
     private final RowType writeSchema;
     private final DataFilePathFactory pathFactory;
-    private final AppendOnlyCompactManager compactManager;
+    private final CompactManager compactManager;
     private final boolean forceCompact;
-    private final LinkedList<DataFileMeta> toCompact;
     private final List<DataFileMeta> compactBefore;
     private final List<DataFileMeta> compactAfter;
     private final LongCounter seqNumCounter;
@@ -74,8 +73,8 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
             FileFormat fileFormat,
             long targetFileSize,
             RowType writeSchema,
-            LinkedList<DataFileMeta> restoredFiles,
-            AppendOnlyCompactManager compactManager,
+            long maxSequenceNumber,
+            CompactManager compactManager,
             boolean forceCompact,
             DataFilePathFactory pathFactory) {
         this.schemaId = schemaId;
@@ -85,10 +84,9 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
         this.pathFactory = pathFactory;
         this.compactManager = compactManager;
         this.forceCompact = forceCompact;
-        this.toCompact = restoredFiles;
         this.compactBefore = new ArrayList<>();
         this.compactAfter = new ArrayList<>();
-        this.seqNumCounter = new LongCounter(getMaxSequenceNumber(restoredFiles) + 1);
+        this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
         this.writer =
                 createRollingRowWriter(
                         schemaId,
@@ -128,18 +126,18 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
                             seqNumCounter);
         }
         // add new generated files
-        toCompact.addAll(newFiles);
+        newFiles.forEach(compactManager::addNewFile);
         submitCompaction();
 
         boolean blocking = endOnfInput || forceCompact;
-        finishCompaction(blocking);
+        trySyncLatestCompaction(blocking);
 
         return drainIncrement(newFiles);
     }
 
     @Override
     public void sync() throws Exception {
-        finishCompaction(true);
+        trySyncLatestCompaction(true);
     }
 
     @Override
@@ -160,37 +158,19 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
         return result;
     }
 
-    private static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
-        return fileMetas.stream()
-                .map(DataFileMeta::maxSequenceNumber)
-                .max(Long::compare)
-                .orElse(-1L);
-    }
-
     private void submitCompaction() throws ExecutionException, InterruptedException {
-        finishCompaction(false);
-        if (compactManager.isCompactionFinished() && !toCompact.isEmpty()) {
-            compactManager.submitCompaction();
-        }
+        trySyncLatestCompaction(false);
+        compactManager.triggerCompaction();
     }
 
-    private void finishCompaction(boolean blocking)
+    private void trySyncLatestCompaction(boolean blocking)
             throws ExecutionException, InterruptedException {
         compactManager
-                .finishCompaction(blocking)
+                .getCompactionResult(blocking)
                 .ifPresent(
                         result -> {
                             compactBefore.addAll(result.before());
                             compactAfter.addAll(result.after());
-                            if (!result.after().isEmpty()) {
-                                // if the last compacted file is still small,
-                                // add it back to the head
-                                DataFileMeta lastFile =
-                                        result.after().get(result.after().size() - 1);
-                                if (lastFile.fileSize() < targetFileSize) {
-                                    toCompact.offerFirst(lastFile);
-                                }
-                            }
                         });
     }
 
@@ -203,11 +183,6 @@ public class AppendOnlyWriter implements RecordWriter<RowData> {
         return increment;
     }
 
-    @VisibleForTesting
-    List<DataFileMeta> getToCompact() {
-        return toCompact;
-    }
-
     /** Rolling file writer for append-only table. */
     public static class RowRollingWriter extends RollingFileWriter<RowData, DataFileMeta> {
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
index b8d70ec0..512d285e 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
@@ -294,4 +294,11 @@ public class DataFileMeta {
                 new RowType.RowField("_EXTRA_FILES", new ArrayType(false, newStringType(false))));
         return new RowType(fields);
     }
+
+    public static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
+        return fileMetas.stream()
+                .map(DataFileMeta::maxSequenceNumber)
+                .max(Long::compare)
+                .orElse(-1L);
+    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 6c1161a3..b48942f0 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -52,8 +52,6 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
 
     private final CompactManager compactManager;
 
-    private final Levels levels;
-
     private final Comparator<RowData> keyComparator;
 
     private final MergeFunction mergeFunction;
@@ -62,8 +60,6 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
 
     private final boolean commitForceCompact;
 
-    private final int numSortedRunStopTrigger;
-
     private final ChangelogProducer changelogProducer;
 
     private final LinkedHashSet<DataFileMeta> newFiles;
@@ -80,24 +76,20 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
             RowType keyType,
             RowType valueType,
             CompactManager compactManager,
-            Levels levels,
             long maxSequenceNumber,
             Comparator<RowData> keyComparator,
             MergeFunction mergeFunction,
             DataFileWriter dataFileWriter,
             boolean commitForceCompact,
-            int numSortedRunStopTrigger,
             ChangelogProducer changelogProducer) {
         this.keyType = keyType;
         this.valueType = valueType;
         this.compactManager = compactManager;
-        this.levels = levels;
         this.newSequenceNumber = maxSequenceNumber + 1;
         this.keyComparator = keyComparator;
         this.mergeFunction = mergeFunction;
         this.dataFileWriter = dataFileWriter;
         this.commitForceCompact = commitForceCompact;
-        this.numSortedRunStopTrigger = numSortedRunStopTrigger;
         this.changelogProducer = changelogProducer;
         this.newFiles = new LinkedHashSet<>();
         this.compactBefore = new LinkedHashMap<>();
@@ -109,8 +101,8 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
     }
 
     @VisibleForTesting
-    Levels levels() {
-        return levels;
+    CompactManager compactManager() {
+        return compactManager;
     }
 
     @Override
@@ -142,9 +134,9 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
     @Override
     public void flushMemory() throws Exception {
         if (memTable.size() > 0) {
-            if (levels.numberOfSortedRuns() > numSortedRunStopTrigger) {
+            if (compactManager.shouldWaitCompaction()) {
                 // stop writing, wait for compaction finished
-                finishCompaction(true);
+                trySyncLatestCompaction(true);
             }
             List<String> extraFiles = new ArrayList<>();
             if (changelogProducer == ChangelogProducer.INPUT) {
@@ -165,7 +157,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
                                         file -> {
                                             DataFileMeta fileMeta = file.copy(extraFiles);
                                             newFiles.add(fileMeta);
-                                            levels.addLevel0File(fileMeta);
+                                            compactManager.addNewFile(fileMeta);
                                             return true;
                                         })
                                 .orElse(false);
@@ -183,13 +175,13 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
     public Increment prepareCommit(boolean endOfInput) throws Exception {
         flushMemory();
         boolean blocking = endOfInput || commitForceCompact;
-        finishCompaction(blocking);
+        trySyncLatestCompaction(blocking);
         return drainIncrement();
     }
 
     @Override
     public void sync() throws Exception {
-        finishCompaction(true);
+        trySyncLatestCompaction(true);
     }
 
     private Increment drainIncrement() {
@@ -226,14 +218,12 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
     }
 
     private void submitCompaction() throws Exception {
-        finishCompaction(false);
-        if (compactManager.isCompactionFinished()) {
-            compactManager.submitCompaction();
-        }
+        trySyncLatestCompaction(false);
+        compactManager.triggerCompaction();
     }
 
-    private void finishCompaction(boolean blocking) throws Exception {
-        Optional<CompactResult> result = compactManager.finishCompaction(blocking);
+    private void trySyncLatestCompaction(boolean blocking) throws Exception {
+        Optional<CompactResult> result = compactManager.getCompactionResult(blocking);
         result.ifPresent(this::updateCompactResult);
     }
 
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
index 17c160b2..64ddc5a6 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.table.store.file.mergetree.compact;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.compact.CompactManager;
 import org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.compact.CompactUnit;
+import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.Levels;
 
 import org.slf4j.Logger;
@@ -46,6 +48,8 @@ public class MergeTreeCompactManager extends CompactManager {
 
     private final long minFileSize;
 
+    private final int numSortedRunStopTrigger;
+
     private final CompactRewriter rewriter;
 
     public MergeTreeCompactManager(
@@ -54,20 +58,31 @@ public class MergeTreeCompactManager extends CompactManager {
             CompactStrategy strategy,
             Comparator<RowData> keyComparator,
             long minFileSize,
+            int numSortedRunStopTrigger,
             CompactRewriter rewriter) {
         super(executor);
         this.levels = levels;
         this.strategy = strategy;
         this.minFileSize = minFileSize;
+        this.numSortedRunStopTrigger = numSortedRunStopTrigger;
         this.keyComparator = keyComparator;
         this.rewriter = rewriter;
     }
 
     @Override
-    public void submitCompaction() {
+    public boolean shouldWaitCompaction() {
+        return levels.numberOfSortedRuns() > numSortedRunStopTrigger;
+    }
+
+    @Override
+    public void addNewFile(DataFileMeta file) {
+        levels.addLevel0File(file);
+    }
+
+    @Override
+    public void triggerCompaction() {
         if (taskFuture != null) {
-            throw new IllegalStateException(
-                    "Please finish the previous compaction before submitting new one.");
+            return;
         }
         strategy.pick(levels.numberOfLevels(), levels.levelSortedRuns())
                 .ifPresent(
@@ -104,6 +119,11 @@ public class MergeTreeCompactManager extends CompactManager {
                         });
     }
 
+    @VisibleForTesting
+    public Levels levels() {
+        return levels;
+    }
+
     private void submitCompaction(CompactUnit unit, boolean dropDelete) {
         MergeTreeCompactTask task =
                 new MergeTreeCompactTask(keyComparator, minFileSize, rewriter, unit, dropDelete);
@@ -123,9 +143,9 @@ public class MergeTreeCompactManager extends CompactManager {
 
     /** Finish current task, and update result files to {@link Levels}. */
     @Override
-    public Optional<CompactResult> finishCompaction(boolean blocking)
+    public Optional<CompactResult> getCompactionResult(boolean blocking)
             throws ExecutionException, InterruptedException {
-        Optional<CompactResult> result = super.finishCompaction(blocking);
+        Optional<CompactResult> result = innerGetCompactionResult(blocking);
         result.ifPresent(r -> levels.update(r.before(), r.after()));
         return result;
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
index c3e09745..b5fa2fe3 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
@@ -56,11 +56,4 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
         }
         return existingFileMetas;
     }
-
-    protected long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
-        return fileMetas.stream()
-                .map(DataFileMeta::maxSequenceNumber)
-                .max(Long::compare)
-                .orElse(-1L);
-    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index b2bed275..8911df14 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -21,7 +21,10 @@ package org.apache.flink.table.store.file.operation;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.compact.CompactManager;
 import org.apache.flink.table.store.file.compact.CompactResult;
+import org.apache.flink.table.store.file.compact.NoopCompactManager;
 import org.apache.flink.table.store.file.data.AppendOnlyCompactManager;
 import org.apache.flink.table.store.file.data.AppendOnlyWriter;
 import org.apache.flink.table.store.file.data.DataFileMeta;
@@ -42,6 +45,8 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
+import static org.apache.flink.table.store.file.data.DataFileMeta.getMaxSequenceNumber;
+
 /** {@link FileStoreWrite} for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */
 public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
 
@@ -51,32 +56,30 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
     private final FileFormat fileFormat;
     private final FileStorePathFactory pathFactory;
     private final long targetFileSize;
-    private final int minFileNum;
-    private final int maxFileNum;
+    private final int compactionMinFileNum;
+    private final int compactionMaxFileNum;
     private final boolean commitForceCompact;
+    private final boolean skipCompaction;
 
     public AppendOnlyFileStoreWrite(
             AppendOnlyFileStoreRead read,
             long schemaId,
             RowType rowType,
-            FileFormat fileFormat,
             FileStorePathFactory pathFactory,
             SnapshotManager snapshotManager,
             FileStoreScan scan,
-            long targetFileSize,
-            int minFileNum,
-            int maxFileNum,
-            boolean commitForceCompact) {
+            CoreOptions options) {
         super(snapshotManager, scan);
         this.read = read;
         this.schemaId = schemaId;
         this.rowType = rowType;
-        this.fileFormat = fileFormat;
+        this.fileFormat = options.fileFormat();
         this.pathFactory = pathFactory;
-        this.targetFileSize = targetFileSize;
-        this.maxFileNum = maxFileNum;
-        this.minFileNum = minFileNum;
-        this.commitForceCompact = commitForceCompact;
+        this.targetFileSize = options.targetFileSize();
+        this.compactionMinFileNum = options.compactionMinFileNum();
+        this.compactionMaxFileNum = options.compactionMaxFileNum();
+        this.commitForceCompact = options.commitForceCompact();
+        this.skipCompaction = options.writeCompactionSkip();
     }
 
     @Override
@@ -101,8 +104,8 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
         return new AppendOnlyCompactManager.IterativeCompactTask(
                 compactFiles,
                 targetFileSize,
-                minFileNum,
-                maxFileNum,
+                compactionMinFileNum,
+                compactionMaxFileNum,
                 compactRewriter(partition, bucket),
                 pathFactory.createDataFilePathFactory(partition, bucket));
     }
@@ -116,19 +119,26 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
         // and make restore files mutable to update
         LinkedList<DataFileMeta> restored = new LinkedList<>(restoredFiles);
         DataFilePathFactory factory = pathFactory.createDataFilePathFactory(partition, bucket);
+        CompactManager compactManager;
+        if (skipCompaction) {
+            compactManager = new NoopCompactManager(compactExecutor);
+        } else {
+            compactManager =
+                    new AppendOnlyCompactManager(
+                            compactExecutor,
+                            restored,
+                            compactionMinFileNum,
+                            compactionMaxFileNum,
+                            targetFileSize,
+                            compactRewriter(partition, bucket));
+        }
         return new AppendOnlyWriter(
                 schemaId,
                 fileFormat,
                 targetFileSize,
                 rowType,
-                restored,
-                new AppendOnlyCompactManager(
-                        compactExecutor,
-                        restored,
-                        minFileNum,
-                        maxFileNum,
-                        targetFileSize,
-                        compactRewriter(partition, bucket)),
+                getMaxSequenceNumber(restored),
+                compactManager,
                 commitForceCompact,
                 factory);
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 9175bffa..9e430b01 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.compact.CompactManager;
 import org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.compact.CompactUnit;
+import org.apache.flink.table.store.file.compact.NoopCompactManager;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileReader;
 import org.apache.flink.table.store.file.data.DataFileWriter;
@@ -53,6 +54,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 
+import static org.apache.flink.table.store.file.data.DataFileMeta.getMaxSequenceNumber;
+
 /** {@link FileStoreWrite} for {@link org.apache.flink.table.store.file.KeyValueFileStore}. */
 public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
 
@@ -130,27 +133,32 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
             ExecutorService compactExecutor) {
         DataFileWriter dataFileWriter = dataFileWriterFactory.create(partition, bucket);
         Comparator<RowData> keyComparator = keyComparatorSupplier.get();
-        Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
+        CompactManager compactManager;
+        if (options.writeCompactionSkip()) {
+            compactManager = new NoopCompactManager(compactExecutor);
+        } else {
+            Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels());
+            compactManager =
+                    createCompactManager(
+                            partition,
+                            bucket,
+                            new UniversalCompaction(
+                                    options.maxSizeAmplificationPercent(),
+                                    options.sortedRunSizeRatio(),
+                                    options.numSortedRunCompactionTrigger(),
+                                    options.maxSortedRunNum()),
+                            compactExecutor,
+                            levels);
+        }
         return new MergeTreeWriter(
                 dataFileWriter.keyType(),
                 dataFileWriter.valueType(),
-                createCompactManager(
-                        partition,
-                        bucket,
-                        new UniversalCompaction(
-                                options.maxSizeAmplificationPercent(),
-                                options.sortedRunSizeRatio(),
-                                options.numSortedRunCompactionTrigger(),
-                                options.maxSortedRunNum()),
-                        compactExecutor,
-                        levels),
-                levels,
+                compactManager,
                 getMaxSequenceNumber(restoreFiles),
                 keyComparator,
                 mergeFunction.copy(),
                 dataFileWriter,
                 options.commitForceCompact(),
-                options.numSortedRunStopTrigger(),
                 options.changelogProducer());
     }
 
@@ -168,6 +176,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
                 compactStrategy,
                 keyComparator,
                 options.targetFileSize(),
+                options.numSortedRunStopTrigger(),
                 rewriter);
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index a16411a4..c450116c 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.table.store.file.data;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
@@ -48,6 +49,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 
+import static org.apache.flink.table.store.file.data.DataFileMeta.getMaxSequenceNumber;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test the correctness for {@link AppendOnlyWriter}. */
@@ -121,7 +123,7 @@ public class AppendOnlyWriterTest {
 
     @Test
     public void testMultipleCommits() throws Exception {
-        RecordWriter<RowData> writer = createWriter(1024 * 1024L, true, Collections.emptyList());
+        RecordWriter<RowData> writer = createWriter(1024 * 1024L, true, Collections.emptyList()).f0;
 
         // Commit 5 continues txn.
         for (int txn = 0; txn < 5; txn += 1) {
@@ -224,8 +226,11 @@ public class AppendOnlyWriterTest {
 
         // increase target file size to test compaction
         long targetFileSize = 1024 * 1024L;
-        writer = createWriter(targetFileSize, true, firstInc.newFiles());
-        assertThat(writer.getToCompact()).containsExactlyElementsOf(firstInc.newFiles());
+        Tuple2<AppendOnlyWriter, LinkedList<DataFileMeta>> writerAndToCompact =
+                createWriter(targetFileSize, true, firstInc.newFiles());
+        writer = writerAndToCompact.f0;
+        LinkedList<DataFileMeta> toCompact = writerAndToCompact.f1;
+        assertThat(toCompact).containsExactlyElementsOf(firstInc.newFiles());
         writer.write(row(id, String.format("%03d", id), PART));
         writer.sync();
         Increment secInc = writer.prepareCommit(true);
@@ -253,10 +258,10 @@ public class AppendOnlyWriterTest {
          * <3> the newFiles[round]
          * with strict order
          */
-        List<DataFileMeta> toCompact = new ArrayList<>(compactAfter);
-        toCompact.addAll(firstInc.newFiles().subList(4, firstInc.newFiles().size()));
-        toCompact.addAll(secInc.newFiles());
-        assertThat(writer.getToCompact()).containsExactlyElementsOf(toCompact);
+        List<DataFileMeta> toCompactResult = new ArrayList<>(compactAfter);
+        toCompactResult.addAll(firstInc.newFiles().subList(4, firstInc.newFiles().size()));
+        toCompactResult.addAll(secInc.newFiles());
+        assertThat(toCompact).containsExactlyElementsOf(toCompactResult);
     }
 
     private FieldStats initStats(Integer min, Integer max, long nullCount) {
@@ -280,33 +285,35 @@ public class AppendOnlyWriterTest {
     }
 
     private AppendOnlyWriter createEmptyWriter(long targetFileSize) {
-        return createWriter(targetFileSize, false, Collections.emptyList());
+        return createWriter(targetFileSize, false, Collections.emptyList()).f0;
     }
 
-    private AppendOnlyWriter createWriter(
+    private Tuple2<AppendOnlyWriter, LinkedList<DataFileMeta>> createWriter(
             long targetFileSize, boolean forceCompact, List<DataFileMeta> scannedFiles) {
         FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Configuration());
         LinkedList<DataFileMeta> toCompact = new LinkedList<>(scannedFiles);
-        return new AppendOnlyWriter(
-                SCHEMA_ID,
-                fileFormat,
-                targetFileSize,
-                AppendOnlyWriterTest.SCHEMA,
-                toCompact,
-                new AppendOnlyCompactManager(
-                        Executors.newSingleThreadScheduledExecutor(
-                                new ExecutorThreadFactory("compaction-thread")),
-                        toCompact,
-                        MIN_FILE_NUM,
-                        MAX_FILE_NUM,
+        return new Tuple2<>(
+                new AppendOnlyWriter(
+                        SCHEMA_ID,
+                        fileFormat,
                         targetFileSize,
-                        compactBefore ->
-                                compactBefore.isEmpty()
-                                        ? Collections.emptyList()
-                                        : Collections.singletonList(
-                                                generateCompactAfter(compactBefore))),
-                forceCompact,
-                pathFactory);
+                        AppendOnlyWriterTest.SCHEMA,
+                        getMaxSequenceNumber(toCompact),
+                        new AppendOnlyCompactManager(
+                                Executors.newSingleThreadScheduledExecutor(
+                                        new ExecutorThreadFactory("compaction-thread")),
+                                toCompact,
+                                MIN_FILE_NUM,
+                                MAX_FILE_NUM,
+                                targetFileSize,
+                                compactBefore ->
+                                        compactBefore.isEmpty()
+                                                ? Collections.emptyList()
+                                                : Collections.singletonList(
+                                                        generateCompactAfter(compactBefore))),
+                        forceCompact,
+                        pathFactory),
+                toCompact);
     }
 
     private DataFileMeta generateCompactAfter(List<DataFileMeta> toCompact) {
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
index 4be7c693..39ed935e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
@@ -66,7 +66,7 @@ public class FileFormatSuffixTest extends DataFileTest {
                         fileFormat,
                         10,
                         SCHEMA,
-                        toCompact,
+                        0,
                         new AppendOnlyCompactManager(null, toCompact, 4, 10, 10, null), // not used
                         false,
                         dataFilePathFactory);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 446cfc45..4a82d35f 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -272,13 +272,11 @@ public class MergeTreeTest {
                         dataFileWriter.keyType(),
                         dataFileWriter.valueType(),
                         createCompactManager(dataFileWriter, service, files),
-                        new Levels(comparator, files, options.numLevels()),
                         maxSequenceNumber,
                         comparator,
                         new DeduplicateMergeFunction(),
                         dataFileWriter,
                         options.commitForceCompact(),
-                        options.numSortedRunStopTrigger(),
                         ChangelogProducer.NONE);
         writer.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
@@ -312,6 +310,7 @@ public class MergeTreeTest {
                 strategy,
                 comparator,
                 options.targetFileSize(),
+                options.numSortedRunStopTrigger(),
                 rewriter);
     }
 
@@ -346,7 +345,10 @@ public class MergeTreeTest {
 
     private void assertRecords(List<TestRecord> expected) throws Exception {
         // compaction will drop delete
-        List<DataFileMeta> files = ((MergeTreeWriter) writer).levels().allFiles();
+        List<DataFileMeta> files =
+                ((MergeTreeCompactManager) ((MergeTreeWriter) writer).compactManager())
+                        .levels()
+                        .allFiles();
         assertRecords(expected, files, true);
     }
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
index 64c46c81..a645a74e 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -197,9 +197,15 @@ public class MergeTreeCompactManagerTest {
         Levels levels = new Levels(comparator, files, 3);
         MergeTreeCompactManager manager =
                 new MergeTreeCompactManager(
-                        service, levels, strategy, comparator, 2, testRewriter(expectedDropDelete));
-        manager.submitCompaction();
-        manager.finishCompaction(true);
+                        service,
+                        levels,
+                        strategy,
+                        comparator,
+                        2,
+                        Integer.MAX_VALUE,
+                        testRewriter(expectedDropDelete));
+        manager.triggerCompaction();
+        manager.getCompactionResult(true);
         List<LevelMinMax> outputs =
                 levels.allFiles().stream().map(LevelMinMax::new).collect(Collectors.toList());
         assertThat(outputs).isEqualTo(expected);
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index 74b824db..3c80b563 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -25,11 +25,14 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
 import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.TableCommit;
@@ -52,9 +55,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.store.CoreOptions.BUCKET;
 import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
+import static org.apache.flink.table.store.CoreOptions.COMPACTION_MAX_FILE_NUM;
+import static org.apache.flink.table.store.CoreOptions.WRITE_COMPACTION_SKIP;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Base test class for {@link FileStoreTable}. */
@@ -211,6 +217,40 @@ public abstract class FileStoreTableTestBase {
         write.close();
     }
 
+    @Test
+    public void testWriteWithoutCompaction() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(WRITE_COMPACTION_SKIP, true);
+                            conf.set(COMPACTION_MAX_FILE_NUM, 5);
+                        });
+
+        TableWrite write = table.newWrite();
+        TableCommit commit = table.newCommit("user");
+        for (int i = 0; i < 10; i++) {
+            write.write(GenericRowData.of(1, 1, 100L));
+            commit.commit(String.valueOf(i), write.prepareCommit(true));
+        }
+        write.close();
+
+        List<DataFileMeta> files =
+                table.newScan().plan().splits.stream()
+                        .flatMap(split -> split.files().stream())
+                        .collect(Collectors.toList());
+        for (DataFileMeta file : files) {
+            assertThat(file.level()).isEqualTo(0);
+        }
+
+        SnapshotManager snapshotManager = new SnapshotManager(table.location());
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+        assertThat(latestSnapshotId).isNotNull();
+        for (int i = 1; i <= latestSnapshotId; i++) {
+            Snapshot snapshot = snapshotManager.snapshot(i);
+            assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+        }
+    }
+
     protected List<String> getResult(
             TableRead read,
             List<Split> splits,