You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/06/27 05:30:33 UTC

[flink-table-store] branch master updated: [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 981b8e3  [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers
981b8e3 is described below

commit 981b8e36984803eb269e32a5f3116da8ce4185dc
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Mon Jun 27 13:30:28 2022 +0800

    [FLINK-28068] Introduce MemoryPoolFactory to share and preempt memory for multiple writers
    
    This closes #171
---
 .../table/store/connector/sink/StoreSinkTest.java  |   8 +-
 .../table/store/connector/sink/TestFileStore.java  |   6 +-
 .../source/TestChangelogDataReadWrite.java         |  33 ++++---
 .../{utils => memory}/HeapMemorySegmentPool.java   |   6 +-
 .../flink/table/store/file/memory/MemoryOwner.java |  34 +++++++
 .../table/store/file/memory/MemoryPoolFactory.java |  99 +++++++++++++++++++
 .../flink/table/store/file/mergetree/MemTable.java |   3 +
 .../store/file/mergetree/MergeTreeOptions.java     |  10 +-
 .../store/file/mergetree/MergeTreeWriter.java      |  34 +++++--
 .../store/file/mergetree/SortBufferMemTable.java   |  11 ++-
 .../file/operation/KeyValueFileStoreWrite.java     |  10 +-
 .../table/ChangelogValueCountFileStoreTable.java   |   5 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |   4 +-
 .../table/store/table/sink/AbstractTableWrite.java |  31 +++---
 .../table/store/table/sink/MemoryTableWrite.java   |  82 ++++++++++++++++
 .../flink/table/store/file/TestFileStore.java      |  37 ++++---
 .../table/store/file/mergetree/MergeTreeTest.java  |  25 ++---
 .../file/mergetree/SortBufferMemTableTestBase.java |   4 +-
 .../store/file/operation/TestCommitThread.java     |  16 ++-
 .../table/store/table/WritePreemptMemoryTest.java  | 109 +++++++++++++++++++++
 20 files changed, 466 insertions(+), 101 deletions(-)

diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index 1ddbad7..d7f8666 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -234,11 +234,9 @@ public class StoreSinkTest {
         writers.forEach(
                 (part, map) ->
                         map.forEach(
-                                (bucket, recordWriter) -> {
-                                    TestRecordWriter testWriter = (TestRecordWriter) recordWriter;
-                                    assertThat(testWriter.synced).isTrue();
-                                    assertThat(testWriter.closed).isTrue();
-                                }));
+                                (bucket, recordWriter) ->
+                                        assertThat(((TestRecordWriter) recordWriter).closed)
+                                                .isTrue()));
         return committables;
     }
 
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index d048eff..aa8be1f 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -146,8 +146,6 @@ public class TestFileStore implements FileStore<KeyValue> {
         final List<String> records = new ArrayList<>();
         final boolean hasPk;
 
-        boolean synced = false;
-
         boolean closed = false;
 
         TestRecordWriter(boolean hasPk) {
@@ -210,9 +208,7 @@ public class TestFileStore implements FileStore<KeyValue> {
         }
 
         @Override
-        public void sync() {
-            synced = true;
-        }
+        public void sync() {}
 
         @Override
         public List<DataFileMeta> close() {
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index d4d26f5..1f32f9b 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -29,6 +29,8 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
+import org.apache.flink.table.store.file.memory.MemoryOwner;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.operation.KeyValueFileStoreRead;
@@ -149,18 +151,23 @@ public class TestChangelogDataReadWrite {
 
     public RecordWriter<KeyValue> createMergeTreeWriter(BinaryRowData partition, int bucket) {
         MergeTreeOptions options = new MergeTreeOptions(new Configuration());
-        return new KeyValueFileStoreWrite(
-                        new SchemaManager(tablePath),
-                        0,
-                        KEY_TYPE,
-                        VALUE_TYPE,
-                        () -> COMPARATOR,
-                        new DeduplicateMergeFunction(),
-                        avro,
-                        pathFactory,
-                        snapshotManager,
-                        null, // not used, we only create an empty writer
-                        options)
-                .createEmptyWriter(partition, bucket, service);
+        RecordWriter<KeyValue> writer =
+                new KeyValueFileStoreWrite(
+                                new SchemaManager(tablePath),
+                                0,
+                                KEY_TYPE,
+                                VALUE_TYPE,
+                                () -> COMPARATOR,
+                                new DeduplicateMergeFunction(),
+                                avro,
+                                pathFactory,
+                                snapshotManager,
+                                null, // not used, we only create an empty writer
+                                options)
+                        .createEmptyWriter(partition, bucket, service);
+        ((MemoryOwner) writer)
+                .setMemoryPool(
+                        new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize));
+        return writer;
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/HeapMemorySegmentPool.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/HeapMemorySegmentPool.java
similarity index 92%
rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/HeapMemorySegmentPool.java
rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/HeapMemorySegmentPool.java
index 6174bc3..0dc48e7 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/HeapMemorySegmentPool.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/HeapMemorySegmentPool.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.utils;
+package org.apache.flink.table.store.file.memory;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
@@ -34,9 +34,9 @@ public class HeapMemorySegmentPool implements MemorySegmentPool {
 
     private int numPage;
 
-    public HeapMemorySegmentPool(int maxPages, int pageSize) {
+    public HeapMemorySegmentPool(long maxMemory, int pageSize) {
         this.segments = new LinkedList<>();
-        this.maxPages = maxPages;
+        this.maxPages = (int) (maxMemory / pageSize);
         this.pageSize = pageSize;
         this.numPage = 0;
     }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryOwner.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryOwner.java
new file mode 100644
index 0000000..4a35230
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryOwner.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.memory;
+
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+
+/** A class provides memory related methods. */
+public interface MemoryOwner {
+
+    /** Set {@link MemorySegmentPool} for the owner. */
+    void setMemoryPool(MemorySegmentPool memoryPool);
+
+    /** Memory occupancy size of this owner. */
+    long memoryOccupancy();
+
+    /** Flush memory of owner, release memory. */
+    void flushMemory() throws Exception;
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryPoolFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryPoolFactory.java
new file mode 100644
index 0000000..2e06a9e
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/MemoryPoolFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.memory;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+
+import java.util.List;
+
+/**
+ * A factory which creates {@link MemorySegmentPool} from {@link MemoryOwner}. The returned memory
+ * pool will try to preempt memory when there is no memory left.
+ */
+public class MemoryPoolFactory {
+
+    private final MemorySegmentPool innerPool;
+    private final Iterable<MemoryOwner> owners;
+
+    public MemoryPoolFactory(MemorySegmentPool innerPool, Iterable<MemoryOwner> owners) {
+        this.innerPool = innerPool;
+        this.owners = owners;
+    }
+
+    public void notifyNewOwner(MemoryOwner owner) {
+        MemorySegmentPool memoryPool = new OwnerMemoryPool(owner);
+        owner.setMemoryPool(memoryPool);
+    }
+
+    private void preemptMemory(MemoryOwner owner) {
+        long maxMemory = -1;
+        MemoryOwner max = null;
+        for (MemoryOwner other : owners) {
+            // Don't preempt yourself! Write and flush at the same time, which may lead to
+            // inconsistent state
+            if (other != owner && other.memoryOccupancy() > maxMemory) {
+                maxMemory = other.memoryOccupancy();
+                max = other;
+            }
+        }
+
+        if (max != null) {
+            try {
+                max.flushMemory();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private class OwnerMemoryPool implements MemorySegmentPool {
+
+        private final MemoryOwner owner;
+
+        public OwnerMemoryPool(MemoryOwner owner) {
+            this.owner = owner;
+        }
+
+        @Override
+        public int pageSize() {
+            return innerPool.pageSize();
+        }
+
+        @Override
+        public void returnAll(List<MemorySegment> memory) {
+            innerPool.returnAll(memory);
+        }
+
+        @Override
+        public int freePages() {
+            return innerPool.freePages();
+        }
+
+        @Override
+        public MemorySegment nextSegment() {
+            MemorySegment segment = innerPool.nextSegment();
+            if (segment == null) {
+                preemptMemory(owner);
+                return innerPool.nextSegment();
+            }
+            return segment;
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
index e9ef179..944516c 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MemTable.java
@@ -44,6 +44,9 @@ public interface MemTable {
     /** Record size of this table. */
     int size();
 
+    /** Memory occupancy size of this table. */
+    long memoryOccupancy();
+
     /**
      * Returns an iterator over the records in this table. The elements are returned in the order of
      * key and sequence number and elements with the same key will be merged by the given {@link
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
index 8cadbfb..9c7a5e4 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeOptions.java
@@ -32,14 +32,14 @@ public class MergeTreeOptions {
     public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
             ConfigOptions.key("write-buffer-size")
                     .memoryType()
-                    .defaultValue(MemorySize.parse("128 mb"))
+                    .defaultValue(MemorySize.parse("256 mb"))
                     .withDescription(
                             "Amount of data to build up in memory before converting to a sorted on-disk file.");
 
     public static final ConfigOption<MemorySize> PAGE_SIZE =
             ConfigOptions.key("page-size")
                     .memoryType()
-                    .defaultValue(MemorySize.parse("1 mb"))
+                    .defaultValue(MemorySize.parse("64 kb"))
                     .withDescription("Memory page size.");
 
     public static final ConfigOption<MemorySize> TARGET_FILE_SIZE =
@@ -95,7 +95,7 @@ public class MergeTreeOptions {
 
     public final long writeBufferSize;
 
-    public final long pageSize;
+    public final int pageSize;
 
     public final long targetFileSize;
 
@@ -113,7 +113,7 @@ public class MergeTreeOptions {
 
     public MergeTreeOptions(
             long writeBufferSize,
-            long pageSize,
+            int pageSize,
             long targetFileSize,
             int numSortedRunCompactionTrigger,
             int numSortedRunStopTrigger,
@@ -138,7 +138,7 @@ public class MergeTreeOptions {
     public MergeTreeOptions(ReadableConfig config) {
         this(
                 config.get(WRITE_BUFFER_SIZE).getBytes(),
-                config.get(PAGE_SIZE).getBytes(),
+                (int) config.get(PAGE_SIZE).getBytes(),
                 config.get(TARGET_FILE_SIZE).getBytes(),
                 config.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER),
                 config.get(NUM_SORTED_RUNS_STOP_TRIGGER),
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 6dd1d04..319a1b8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -20,13 +20,16 @@ package org.apache.flink.table.store.file.mergetree;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileWriter;
+import org.apache.flink.table.store.file.memory.MemoryOwner;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
 import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.writer.RecordWriter;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.CloseableIterator;
 
 import java.util.ArrayList;
@@ -40,9 +43,11 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 /** A {@link RecordWriter} to write records and generate {@link Increment}. */
-public class MergeTreeWriter implements RecordWriter<KeyValue> {
+public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {
 
-    private final MemTable memTable;
+    private final RowType keyType;
+
+    private final RowType valueType;
 
     private final CompactManager compactManager;
 
@@ -66,8 +71,11 @@ public class MergeTreeWriter implements RecordWriter<KeyValue> {
 
     private long newSequenceNumber;
 
+    private MemTable memTable;
+
     public MergeTreeWriter(
-            MemTable memTable,
+            RowType keyType,
+            RowType valueType,
             CompactManager compactManager,
             Levels levels,
             long maxSequenceNumber,
@@ -76,7 +84,8 @@ public class MergeTreeWriter implements RecordWriter<KeyValue> {
             DataFileWriter dataFileWriter,
             boolean commitForceCompact,
             int numSortedRunStopTrigger) {
-        this.memTable = memTable;
+        this.keyType = keyType;
+        this.valueType = valueType;
         this.compactManager = compactManager;
         this.levels = levels;
         this.newSequenceNumber = maxSequenceNumber + 1;
@@ -99,12 +108,17 @@ public class MergeTreeWriter implements RecordWriter<KeyValue> {
         return levels;
     }
 
+    @Override
+    public void setMemoryPool(MemorySegmentPool memoryPool) {
+        this.memTable = new SortBufferMemTable(keyType, valueType, memoryPool);
+    }
+
     @Override
     public void write(KeyValue kv) throws Exception {
         long sequenceNumber = newSequenceNumber();
         boolean success = memTable.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
         if (!success) {
-            flush();
+            flushMemory();
             success = memTable.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
             if (!success) {
                 throw new RuntimeException("Mem table is too small to hold a single element.");
@@ -112,7 +126,13 @@ public class MergeTreeWriter implements RecordWriter<KeyValue> {
         }
     }
 
-    private void flush() throws Exception {
+    @Override
+    public long memoryOccupancy() {
+        return memTable.memoryOccupancy();
+    }
+
+    @Override
+    public void flushMemory() throws Exception {
         if (memTable.size() > 0) {
             if (levels.numberOfSortedRuns() > numSortedRunStopTrigger) {
                 // stop writing, wait for compaction finished
@@ -130,7 +150,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue> {
 
     @Override
     public Increment prepareCommit() throws Exception {
-        flush();
+        flushMemory();
         if (commitForceCompact) {
             finishCompaction(true);
         }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
index 363f69f..9258d2b 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
@@ -27,12 +27,12 @@ import org.apache.flink.table.runtime.generated.RecordComparator;
 import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.table.store.codegen.CodeGenUtils;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializer;
 import org.apache.flink.table.store.file.ValueKind;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
-import org.apache.flink.table.store.file.utils.HeapMemorySegmentPool;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -52,7 +52,7 @@ public class SortBufferMemTable implements MemTable {
     private final KeyValueSerializer serializer;
     private final BinaryInMemorySortBuffer buffer;
 
-    public SortBufferMemTable(RowType keyType, RowType valueType, long maxMemSize, long pageSize) {
+    public SortBufferMemTable(RowType keyType, RowType valueType, MemorySegmentPool memoryPool) {
         this.keyType = keyType;
         this.valueType = valueType;
         this.serializer = new KeyValueSerializer(keyType, valueType);
@@ -69,8 +69,6 @@ public class SortBufferMemTable implements MemTable {
                 CodeGenUtils.newRecordComparator(
                         new TableConfig(), sortKeyTypes, "MemTableComparator");
 
-        HeapMemorySegmentPool memoryPool =
-                new HeapMemorySegmentPool((int) (maxMemSize / pageSize), (int) pageSize);
         if (memoryPool.freePages() < 3) {
             throw new IllegalArgumentException(
                     "Write buffer requires a minimum of 3 page memory, please increase write buffer memory size.");
@@ -95,6 +93,11 @@ public class SortBufferMemTable implements MemTable {
         return buffer.size();
     }
 
+    @Override
+    public long memoryOccupancy() {
+        return buffer.getOccupancy();
+    }
+
     @Override
     public Iterator<KeyValue> iterator(
             Comparator<RowData> keyComparator, MergeFunction mergeFunction) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 174fe25..c546a5d 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.store.file.mergetree.Levels;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
 import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
-import org.apache.flink.table.store.file.mergetree.SortBufferMemTable;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
 import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
 import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
@@ -119,7 +118,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
         return new CompactTask(keyComparator, options.targetFileSize, rewriter, unit, true);
     }
 
-    private RecordWriter<KeyValue> createMergeTreeWriter(
+    private MergeTreeWriter createMergeTreeWriter(
             BinaryRowData partition,
             int bucket,
             List<DataFileMeta> restoreFiles,
@@ -127,11 +126,8 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> {
         DataFileWriter dataFileWriter = dataFileWriterFactory.create(partition, bucket);
         Comparator<RowData> keyComparator = keyComparatorSupplier.get();
         return new MergeTreeWriter(
-                new SortBufferMemTable(
-                        dataFileWriter.keyType(),
-                        dataFileWriter.valueType(),
-                        options.writeBufferSize,
-                        options.pageSize),
+                dataFileWriter.keyType(),
+                dataFileWriter.valueType(),
                 createCompactManager(
                         partition,
                         bucket,
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 299d4a1..842258f 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -34,7 +34,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.table.sink.AbstractTableWrite;
+import org.apache.flink.table.store.table.sink.MemoryTableWrite;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
@@ -128,7 +128,8 @@ public class ChangelogValueCountFileStoreTable extends AbstractFileStoreTable {
     public TableWrite newWrite() {
         SinkRecordConverter recordConverter =
                 new SinkRecordConverter(store.options().bucket(), tableSchema);
-        return new AbstractTableWrite<KeyValue>(store.newWrite(), recordConverter) {
+        return new MemoryTableWrite<KeyValue>(store.newWrite(), recordConverter, store.options()) {
+
             @Override
             protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
                     throws Exception {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 8b7f95c..337c3c9 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -36,7 +36,7 @@ import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.table.sink.AbstractTableWrite;
+import org.apache.flink.table.store.table.sink.MemoryTableWrite;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableWrite;
@@ -177,7 +177,7 @@ public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
     public TableWrite newWrite() {
         SinkRecordConverter recordConverter =
                 new SinkRecordConverter(store.options().bucket(), tableSchema);
-        return new AbstractTableWrite<KeyValue>(store.newWrite(), recordConverter) {
+        return new MemoryTableWrite<KeyValue>(store.newWrite(), recordConverter, store.options()) {
             @Override
             protected void writeSinkRecord(SinkRecord record, RecordWriter<KeyValue> writer)
                     throws Exception {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
index 1e45320..8ba3bfc 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java
@@ -43,7 +43,7 @@ public abstract class AbstractTableWrite<T> implements TableWrite {
     private final FileStoreWrite<T> write;
     private final SinkRecordConverter recordConverter;
 
-    private final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers;
+    protected final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers;
     private final ExecutorService compactExecutor;
 
     private boolean overwrite = false;
@@ -100,7 +100,7 @@ public abstract class AbstractTableWrite<T> implements TableWrite {
                 // we need a mechanism to clear writers, otherwise there will be more and more
                 // such as yesterday's partition that no longer needs to be written.
                 if (committable.increment().newFiles().isEmpty()) {
-                    closeWriter(writer);
+                    writer.close();
                     bucketIter.remove();
                 }
             }
@@ -113,20 +113,15 @@ public abstract class AbstractTableWrite<T> implements TableWrite {
         return result;
     }
 
-    private void closeWriter(RecordWriter<T> writer) throws Exception {
-        writer.sync();
-        writer.close();
-    }
-
     @Override
     public void close() throws Exception {
-        compactExecutor.shutdownNow();
         for (Map<Integer, RecordWriter<T>> bucketWriters : writers.values()) {
             for (RecordWriter<T> writer : bucketWriters.values()) {
-                closeWriter(writer);
+                writer.close();
             }
         }
         writers.clear();
+        compactExecutor.shutdownNow();
     }
 
     @VisibleForTesting
@@ -143,11 +138,17 @@ public abstract class AbstractTableWrite<T> implements TableWrite {
             buckets = new HashMap<>();
             writers.put(partition.copy(), buckets);
         }
-        return buckets.computeIfAbsent(
-                bucket,
-                k ->
-                        overwrite
-                                ? write.createEmptyWriter(partition.copy(), bucket, compactExecutor)
-                                : write.createWriter(partition.copy(), bucket, compactExecutor));
+        return buckets.computeIfAbsent(bucket, k -> createWriter(partition.copy(), bucket));
     }
+
+    private RecordWriter<T> createWriter(BinaryRowData partition, int bucket) {
+        RecordWriter<T> writer =
+                overwrite
+                        ? write.createEmptyWriter(partition.copy(), bucket, compactExecutor)
+                        : write.createWriter(partition.copy(), bucket, compactExecutor);
+        notifyNewWriter(writer);
+        return writer;
+    }
+
+    protected void notifyNewWriter(RecordWriter<T> writer) {}
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
new file mode 100644
index 0000000..215840a
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
+import org.apache.flink.table.store.file.memory.MemoryOwner;
+import org.apache.flink.table.store.file.memory.MemoryPoolFactory;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.writer.RecordWriter;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A {@link TableWrite} which supports using shared memory and preempting memory from other writers.
+ */
+public abstract class MemoryTableWrite<T> extends AbstractTableWrite<T> {
+
+    private final MemoryPoolFactory memoryPoolFactory;
+
+    protected MemoryTableWrite(
+            FileStoreWrite<T> write,
+            SinkRecordConverter recordConverter,
+            FileStoreOptions options) {
+        super(write, recordConverter);
+
+        MergeTreeOptions mergeTreeOptions = options.mergeTreeOptions();
+        HeapMemorySegmentPool memoryPool =
+                new HeapMemorySegmentPool(
+                        mergeTreeOptions.writeBufferSize, mergeTreeOptions.pageSize);
+        this.memoryPoolFactory = new MemoryPoolFactory(memoryPool, this::memoryOwners);
+    }
+
+    private Iterator<MemoryOwner> memoryOwners() {
+        Iterator<Map<Integer, RecordWriter<T>>> iterator = writers.values().iterator();
+        return Iterators.concat(
+                new Iterator<Iterator<MemoryOwner>>() {
+                    @Override
+                    public boolean hasNext() {
+                        return iterator.hasNext();
+                    }
+
+                    @Override
+                    public Iterator<MemoryOwner> next() {
+                        return Iterators.transform(
+                                iterator.next().values().iterator(),
+                                writer -> (MemoryOwner) writer);
+                    }
+                });
+    }
+
+    @Override
+    protected void notifyNewWriter(RecordWriter<T> writer) {
+        if (!(writer instanceof MemoryOwner)) {
+            throw new RuntimeException(
+                    "Should create a MemoryOwner for MemoryTableWrite,"
+                            + " but this is: "
+                            + writer.getClass());
+        }
+        memoryPoolFactory.notifyNewOwner((MemoryOwner) writer);
+    }
+}
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 655b38d..a2f0a5d 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -28,6 +28,8 @@ import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
+import org.apache.flink.table.store.file.memory.MemoryOwner;
 import org.apache.flink.table.store.file.mergetree.Increment;
 import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
@@ -42,7 +44,6 @@ import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.util.function.QuadFunction;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,6 +74,10 @@ public class TestFileStore extends KeyValueFileStore {
 
     private static final Logger LOG = LoggerFactory.getLogger(TestFileStore.class);
 
+    public static final MemorySize WRITE_BUFFER_SIZE = MemorySize.parse("16 kb");
+
+    public static final MemorySize PAGE_SIZE = MemorySize.parse("4 kb");
+
     private final String root;
     private final RowDataSerializer keySerializer;
     private final RowDataSerializer valueSerializer;
@@ -87,8 +92,8 @@ public class TestFileStore extends KeyValueFileStore {
             MergeFunction mergeFunction) {
         Configuration conf = new Configuration();
 
-        conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, MemorySize.parse("16 kb"));
-        conf.set(MergeTreeOptions.PAGE_SIZE, MemorySize.parse("4 kb"));
+        conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, WRITE_BUFFER_SIZE);
+        conf.set(MergeTreeOptions.PAGE_SIZE, PAGE_SIZE);
         conf.set(MergeTreeOptions.TARGET_FILE_SIZE, MemorySize.parse("1 kb"));
 
         conf.set(
@@ -155,7 +160,7 @@ public class TestFileStore extends KeyValueFileStore {
                 kvs,
                 partitionCalculator,
                 bucketCalculator,
-                FileStoreWrite::createWriter,
+                false,
                 (commit, committable) -> {
                     logOffsets.forEach(committable::addLogOffset);
                     commit.commit(committable, Collections.emptyMap());
@@ -172,7 +177,7 @@ public class TestFileStore extends KeyValueFileStore {
                 kvs,
                 partitionCalculator,
                 bucketCalculator,
-                FileStoreWrite::createEmptyWriter,
+                true,
                 (commit, committable) ->
                         commit.overwrite(partition, committable, Collections.emptyMap()));
     }
@@ -181,13 +186,7 @@ public class TestFileStore extends KeyValueFileStore {
             List<KeyValue> kvs,
             Function<KeyValue, BinaryRowData> partitionCalculator,
             Function<KeyValue, Integer> bucketCalculator,
-            QuadFunction<
-                            FileStoreWrite<KeyValue>,
-                            BinaryRowData,
-                            Integer,
-                            ExecutorService,
-                            RecordWriter<KeyValue>>
-                    createWriterFunction,
+            boolean emptyWriter,
             BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
             throws Exception {
         FileStoreWrite<KeyValue> write = newWrite();
@@ -201,8 +200,18 @@ public class TestFileStore extends KeyValueFileStore {
                             (b, w) -> {
                                 if (w == null) {
                                     ExecutorService service = Executors.newSingleThreadExecutor();
-                                    return createWriterFunction.apply(
-                                            write, partition, bucket, service);
+                                    RecordWriter<KeyValue> writer =
+                                            emptyWriter
+                                                    ? write.createEmptyWriter(
+                                                            partition, bucket, service)
+                                                    : write.createWriter(
+                                                            partition, bucket, service);
+                                    ((MemoryOwner) writer)
+                                            .setMemoryPool(
+                                                    new HeapMemorySegmentPool(
+                                                            WRITE_BUFFER_SIZE.getBytes(),
+                                                            (int) PAGE_SIZE.getBytes()));
+                                    return writer;
                                 } else {
                                     return w;
                                 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index e7607a1..72f5837 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.store.file.data.DataFileReader;
 import org.apache.flink.table.store.file.data.DataFileWriter;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.format.FlushingFileFormat;
+import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
 import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
@@ -264,20 +265,20 @@ public class MergeTreeTest {
     private MergeTreeWriter createMergeTreeWriter(List<DataFileMeta> files) {
         long maxSequenceNumber =
                 files.stream().map(DataFileMeta::maxSequenceNumber).max(Long::compare).orElse(-1L);
-        return new MergeTreeWriter(
-                new SortBufferMemTable(
+        MergeTreeWriter writer =
+                new MergeTreeWriter(
                         dataFileWriter.keyType(),
                         dataFileWriter.valueType(),
-                        options.writeBufferSize,
-                        options.pageSize),
-                createCompactManager(dataFileWriter, service),
-                new Levels(comparator, files, options.numLevels),
-                maxSequenceNumber,
-                comparator,
-                new DeduplicateMergeFunction(),
-                dataFileWriter,
-                options.commitForceCompact,
-                options.numSortedRunStopTrigger);
+                        createCompactManager(dataFileWriter, service),
+                        new Levels(comparator, files, options.numLevels),
+                        maxSequenceNumber,
+                        comparator,
+                        new DeduplicateMergeFunction(),
+                        dataFileWriter,
+                        options.commitForceCompact,
+                        options.numSortedRunStopTrigger);
+        writer.setMemoryPool(new HeapMemorySegmentPool(options.writeBufferSize, options.pageSize));
+        return writer;
     }
 
     private CompactManager createCompactManager(
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
index c92fff2..3666b66 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTableTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.mergetree;
 
 import org.apache.flink.table.runtime.generated.RecordComparator;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
 import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunctionTestUtils;
@@ -54,8 +55,7 @@ public abstract class SortBufferMemTableTestBase {
                     new RowType(
                             Collections.singletonList(
                                     new RowType.RowField("value", new BigIntType()))),
-                    32 * 1024 * 3L,
-                    32 * 1024);
+                    new HeapMemorySegmentPool(32 * 1024 * 3L, 32 * 1024));
 
     protected abstract boolean addOnly();
 
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index d51e441..3c92759 100644
--- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.TestFileStore;
 import org.apache.flink.table.store.file.TestKeyValueGenerator;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
 import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
 
 import org.slf4j.Logger;
@@ -38,6 +39,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.apache.flink.table.store.file.TestFileStore.PAGE_SIZE;
+import static org.apache.flink.table.store.file.TestFileStore.WRITE_BUFFER_SIZE;
 import static org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
 
 /** Testing {@link Thread}s to perform concurrent commits. */
@@ -197,10 +200,13 @@ public class TestCommitThread extends Thread {
                             t.setName(Thread.currentThread().getName() + "-writer-service-pool");
                             return t;
                         });
-        if (empty) {
-            return (MergeTreeWriter) write.createEmptyWriter(partition, 0, service);
-        } else {
-            return (MergeTreeWriter) write.createWriter(partition, 0, service);
-        }
+        MergeTreeWriter writer =
+                empty
+                        ? (MergeTreeWriter) write.createEmptyWriter(partition, 0, service)
+                        : (MergeTreeWriter) write.createWriter(partition, 0, service);
+        writer.setMemoryPool(
+                new HeapMemorySegmentPool(
+                        WRITE_BUFFER_SIZE.getBytes(), (int) PAGE_SIZE.getBytes()));
+        return writer;
     }
 }
diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
new file mode 100644
index 0000000..0e81bae
--- /dev/null
+++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.WriteMode;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.table.sink.FileCommittable;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableRead;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ChangelogWithKeyFileStoreTable}. */
+public class WritePreemptMemoryTest extends FileStoreTableTestBase {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void writeMultiplePartitions() throws Exception {
+        testWritePreemptMemory(false);
+    }
+
+    @Test
+    public void writeSinglePartition() throws Exception {
+        testWritePreemptMemory(true);
+    }
+
+    private void testWritePreemptMemory(boolean singlePartition) throws Exception {
+        // write
+        FileStoreTable table = createFileStoreTable();
+        TableWrite write = table.newWrite();
+        Random random = new Random();
+        List<String> expected = new ArrayList<>();
+        for (int i = 0; i < 10_000; i++) {
+            GenericRowData row =
+                    GenericRowData.of(singlePartition ? 0 : random.nextInt(5), i, i * 10L);
+            write.write(row);
+            expected.add(BATCH_ROW_TO_STRING.apply(row));
+        }
+        List<FileCommittable> committables = write.prepareCommit();
+        table.newCommit().commit("0", committables);
+        write.close();
+
+        // read
+        List<Split> splits = table.newScan().plan().splits;
+        TableRead read = table.newRead();
+        List<String> results = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            results.addAll(getResult(read, splits, binaryRow(i), 0, BATCH_ROW_TO_STRING));
+        }
+        assertThat(results).containsExactlyInAnyOrder(expected.toArray(new String[0]));
+    }
+
+    @Override
+    protected FileStoreTable createFileStoreTable() throws Exception {
+        Path tablePath = new Path(tempDir.toString());
+        Configuration conf = new Configuration();
+        conf.set(FileStoreOptions.PATH, tablePath.toString());
+        conf.set(FileStoreOptions.FILE_FORMAT, "avro");
+        conf.set(FileStoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        conf.set(MergeTreeOptions.WRITE_BUFFER_SIZE, new MemorySize(30 * 1024));
+        conf.set(MergeTreeOptions.PAGE_SIZE, new MemorySize(1024));
+        SchemaManager schemaManager = new SchemaManager(tablePath);
+        TableSchema schema =
+                schemaManager.commitNewVersion(
+                        new UpdateSchema(
+                                ROW_TYPE,
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "a"),
+                                conf.toMap(),
+                                ""));
+        return new ChangelogWithKeyFileStoreTable(
+                tablePath.getName(), schemaManager, schema, "user");
+    }
+}