You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/08/04 09:14:58 UTC

[ignite-3] branch main updated: IGNITE-16657 [Native Persistence 3.0] Implement partitions chunks merger (#972)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d05837ad1 IGNITE-16657 [Native Persistence 3.0] Implement partitions chunks merger (#972)
d05837ad1 is described below

commit d05837ad1dcb5ce5e0773714c7d0ebb8b067c287
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Thu Aug 4 12:14:53 2022 +0300

    IGNITE-16657 [Native Persistence 3.0] Implement partitions chunks merger (#972)
---
 .../PageMemoryCheckpointConfigurationSchema.java   |   7 +-
 .../persistence/checkpoint/CheckpointManager.java  |  28 +-
 .../checkpoint/CheckpointPagesWriter.java          |  12 +-
 .../checkpoint/CheckpointPagesWriterFactory.java   |   6 +-
 .../persistence/checkpoint/Checkpointer.java       |  19 +-
 .../persistence/compaction/Compactor.java          | 370 +++++++++++++++++++++
 .../persistence/store/AbstractFilePageStoreIo.java |  10 +-
 .../persistence/store/DeltaFilePageStoreIo.java    |  62 +++-
 .../persistence/store/FilePageStore.java           |  59 +++-
 .../persistence/store/FilePageStoreIo.java         |   7 +
 .../persistence/store/FilePageStoreManager.java    |  48 ++-
 .../store/PageReadWriteManagerImpl.java            |  96 ------
 .../pagememory/persistence/store/PageStore.java    |   3 +-
 .../persistence/checkpoint/CheckpointerTest.java   |  13 +-
 .../persistence/compaction/CompactorTest.java      | 206 ++++++++++++
 .../store/AbstractFilePageStoreIoTest.java         |   2 +-
 .../store/DeltaFilePageStoreIoTest.java            |  60 ++++
 .../store/FilePageStoreManagerTest.java            |  19 ++
 .../persistence/store/FilePageStoreTest.java       |  92 +++++
 .../store/PageReadWriteManagerImplTest.java        |  84 -----
 .../PersistentPageMemoryTableStorage.java          |   6 +
 21 files changed, 981 insertions(+), 228 deletions(-)

diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
index b80a18a30..2352b360a 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
@@ -39,7 +39,12 @@ public class PageMemoryCheckpointConfigurationSchema {
     /** Number of checkpoint threads. */
     @Range(min = 1)
     @Value(hasDefault = true)
-    public int threads = 4;
+    public int checkpointThreads = 4;
+
+    /** Number of threads to compact delta files. */
+    @Range(min = 1)
+    @Value(hasDefault = true)
+    public int compactionThreads = 4;
 
     /** Timeout for checkpoint read lock acquisition in milliseconds. */
     @Range(min = 0)
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index d94782e49..57e764058 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
+import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor;
 import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
@@ -76,6 +77,9 @@ public class CheckpointManager {
     /** File page store manager. */
     private final FilePageStoreManager filePageStoreManager;
 
+    /** Delta file compactor. */
+    private final Compactor compactor;
+
     /**
      * Constructor.
      *
@@ -133,6 +137,15 @@ public class CheckpointManager {
                 pageSize
         );
 
+        compactor = new Compactor(
+                Loggers.forClass(Compactor.class),
+                igniteInstanceName,
+                workerListener,
+                checkpointConfig.compactionThreads(),
+                filePageStoreManager,
+                pageSize
+        );
+
         checkpointer = new Checkpointer(
                 Loggers.forClass(Checkpoint.class),
                 igniteInstanceName,
@@ -141,6 +154,7 @@ public class CheckpointManager {
                 checkpointWorkflow,
                 checkpointPagesWriterFactory,
                 filePageStoreManager,
+                compactor,
                 checkpointConfig
         );
 
@@ -162,6 +176,8 @@ public class CheckpointManager {
         checkpointer.start();
 
         checkpointTimeoutLock.start();
+
+        compactor.start();
     }
 
     /**
@@ -171,7 +187,8 @@ public class CheckpointManager {
         IgniteUtils.closeAll(
                 checkpointTimeoutLock::stop,
                 checkpointer::stop,
-                checkpointWorkflow::stop
+                checkpointWorkflow::stop,
+                compactor::stop
         );
     }
 
@@ -285,4 +302,13 @@ public class CheckpointManager {
 
         return pageIndexes;
     }
+
+    /**
+     * Adds the number of delta files to compact.
+     *
+     * @param count Number of delta files.
+     */
+    public void addDeltaFileCountForCompaction(int count) {
+        compactor.addDeltaFiles(count);
+    }
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
index 6da756a10..29adbde0f 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java
@@ -75,8 +75,8 @@ public class CheckpointPagesWriter implements Runnable {
     /** Future which should be finished when all pages would be written. */
     private final CompletableFuture<?> doneFut;
 
-    /** Some action which will be executed every time before page will be written. */
-    private final Runnable beforePageWrite;
+    /** Update heartbeat callback. */
+    private final Runnable updateHeartbeat;
 
     /** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */
     private final ThreadLocal<ByteBuffer> threadBuf;
@@ -103,7 +103,7 @@ public class CheckpointPagesWriter implements Runnable {
      * @param writePageIds Queue of dirty page IDs to write.
      * @param updatedPartitions Updated partitions.
      * @param doneFut Done future.
-     * @param beforePageWrite Action to be performed before every page write.
+     * @param updateHeartbeat Update heartbeat callback.
      * @param log Logger.
      * @param threadBuf Thread local byte buffer.
      * @param checkpointProgress Checkpoint progress.
@@ -118,7 +118,7 @@ public class CheckpointPagesWriter implements Runnable {
             IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> writePageIds,
             ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
             CompletableFuture<?> doneFut,
-            Runnable beforePageWrite,
+            Runnable updateHeartbeat,
             ThreadLocal<ByteBuffer> threadBuf,
             CheckpointProgressImpl checkpointProgress,
             WriteDirtyPage pageWriter,
@@ -131,7 +131,7 @@ public class CheckpointPagesWriter implements Runnable {
         this.writePageIds = writePageIds;
         this.updatedPartitions = updatedPartitions;
         this.doneFut = doneFut;
-        this.beforePageWrite = beforePageWrite;
+        this.updateHeartbeat = updateHeartbeat;
         this.threadBuf = threadBuf;
         this.checkpointProgress = checkpointProgress;
         this.pageWriter = pageWriter;
@@ -183,7 +183,7 @@ public class CheckpointPagesWriter implements Runnable {
         AtomicBoolean writeMetaPage = new AtomicBoolean();
 
         while (!shutdownNow.getAsBoolean() && writePageIds.next(queueResult)) {
-            beforePageWrite.run();
+            updateHeartbeat.run();
 
             FullPageId fullId = queueResult.getValue();
 
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
index 831cbc0e4..ee8d15b20 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriterFactory.java
@@ -91,7 +91,7 @@ public class CheckpointPagesWriterFactory {
      * @param dirtyPageIdQueue Checkpoint dirty page ID queue to write.
      * @param updatedPartitions Updated partitions.
      * @param doneWriteFut Write done future.
-     * @param beforePageWrite Before page write callback.
+     * @param updateHeartbeat Update heartbeat callback.
      * @param checkpointProgress Current checkpoint data.
      * @param shutdownNow Checker of stop operation.
      */
@@ -100,7 +100,7 @@ public class CheckpointPagesWriterFactory {
             IgniteConcurrentMultiPairQueue<PersistentPageMemory, FullPageId> dirtyPageIdQueue,
             ConcurrentMap<GroupPartitionId, LongAdder> updatedPartitions,
             CompletableFuture<?> doneWriteFut,
-            Runnable beforePageWrite,
+            Runnable updateHeartbeat,
             CheckpointProgressImpl checkpointProgress,
             // TODO: IGNITE-16993 Consider a lock replacement
             BooleanSupplier shutdownNow
@@ -111,7 +111,7 @@ public class CheckpointPagesWriterFactory {
                 dirtyPageIdQueue,
                 updatedPartitions,
                 doneWriteFut,
-                beforePageWrite,
+                updateHeartbeat,
                 threadBuf,
                 checkpointProgress,
                 dirtyPageWriter,
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index f8569013a..1725816dd 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.LongAdder;
@@ -48,6 +47,7 @@ import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryChec
 import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
 import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor;
 import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
@@ -139,6 +139,9 @@ public class Checkpointer extends IgniteWorker {
     /** File page store manager. */
     private final FilePageStoreManager filePageStoreManager;
 
+    /** Delta file compactor. */
+    private final Compactor compactor;
+
     /**
      * Constructor.
      *
@@ -149,6 +152,7 @@ public class Checkpointer extends IgniteWorker {
      * @param checkpointWorkFlow Implementation of checkpoint.
      * @param factory Page writer factory.
      * @param filePageStoreManager File page store manager.
+     * @param compactor Delta file compactor.
      * @param checkpointConfig Checkpoint configuration.
      */
     Checkpointer(
@@ -159,6 +163,7 @@ public class Checkpointer extends IgniteWorker {
             CheckpointWorkflow checkpointWorkFlow,
             CheckpointPagesWriterFactory factory,
             FilePageStoreManager filePageStoreManager,
+            Compactor compactor,
             PageMemoryCheckpointConfiguration checkpointConfig
     ) {
         super(log, igniteInstanceName, "checkpoint-thread", workerListener);
@@ -168,10 +173,11 @@ public class Checkpointer extends IgniteWorker {
         this.checkpointWorkflow = checkpointWorkFlow;
         this.checkpointPagesWriterFactory = factory;
         this.filePageStoreManager = filePageStoreManager;
+        this.compactor = compactor;
 
         scheduledCheckpointProgress = new CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));
 
-        int checkpointWritePageThreads = checkpointConfig.threads().value();
+        int checkpointWritePageThreads = checkpointConfig.checkpointThreads().value();
 
         if (checkpointWritePageThreads > 1) {
             checkpointWritePagesPool = new ThreadPoolExecutor(
@@ -405,12 +411,7 @@ public class Checkpointer extends IgniteWorker {
             if (pageWritePool == null) {
                 write.run();
             } else {
-                try {
-                    pageWritePool.execute(write);
-                } catch (RejectedExecutionException ignore) {
-                    // Run the task synchronously.
-                    write.run();
-                }
+                pageWritePool.execute(write);
             }
         }
 
@@ -431,6 +432,8 @@ public class Checkpointer extends IgniteWorker {
 
         syncUpdatedPageStores(updatedPartitions);
 
+        compactor.addDeltaFiles(updatedPartitions.size());
+
         if (shutdownNow.getAsBoolean()) {
             currentCheckpointProgress.fail(new NodeStoppingException("Node is stopping."));
 
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
new file mode 100644
index 000000000..bf9f285ee
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.compaction;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.stream.Collectors.toCollection;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.worker.IgniteWorker;
+import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Entity to compact delta files.
+ *
+ * <p>To start compacting delta files, you need to notify about the appearance of {@link #addDeltaFiles(int) delta files ready for
+ * compaction}. Then all delta files {@link FilePageStore#getDeltaFileToCompaction() ready for compaction} will be collected and merged with
+ * their {@link FilePageStore file page stores} until all delta files are compacted.
+ *
+ * <p>Delta file compaction process consists of:
+ * <ul>
+ *  <li>Copying pages from a delta file to a partition file.</li>
+ *  <li>Fsync of the partition file.</li>
+ *  <li>Remove delta file from {@link FilePageStore} and file system.</li>
+ * </ul>
+ */
+public class Compactor extends IgniteWorker {
+    private final Object mux = new Object();
+
+    private final @Nullable ThreadPoolExecutor threadPoolExecutor;
+
+    private final AtomicInteger deltaFileCount = new AtomicInteger();
+
+    private final FilePageStoreManager filePageStoreManager;
+
+    /** Thread local with buffers for the compaction threads. */
+    private final ThreadLocal<ByteBuffer> threadBuf;
+
+    /**
+     * Creates new ignite worker with given parameters.
+     *
+     * @param log Logger.
+     * @param igniteInstanceName Name of the Ignite instance this runnable is used in.
+     * @param listener Listener for life-cycle events.
+     * @param threads Number of compaction threads.
+     * @param filePageStoreManager File page store manager.
+     * @param pageSize Page size in bytes.
+     */
+    public Compactor(
+            IgniteLogger log,
+            String igniteInstanceName,
+            @Nullable IgniteWorkerListener listener,
+            ConfigurationValue<Integer> threads,
+            FilePageStoreManager filePageStoreManager,
+            int pageSize
+    ) {
+        super(log, igniteInstanceName, "compaction-thread", listener);
+
+        this.filePageStoreManager = filePageStoreManager;
+
+        threadBuf = ThreadLocal.withInitial(() -> {
+            ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize);
+
+            tmpWriteBuf.order(ByteOrder.nativeOrder());
+
+            return tmpWriteBuf;
+        });
+
+        int threadCount = threads.value();
+
+        if (threadCount > 1) {
+            threadPoolExecutor = new ThreadPoolExecutor(
+                    threadCount,
+                    threadCount,
+                    30_000,
+                    MILLISECONDS,
+                    new LinkedBlockingQueue<>(),
+                    new NamedThreadFactory("compaction-runner-io", log)
+            );
+        } else {
+            threadPoolExecutor = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void body() throws InterruptedException {
+        try {
+            while (!isCancelled()) {
+                waitDeltaFiles();
+
+                if (isCancelled()) {
+                    log.info("Skipping the delta file compaction because the node is stopping");
+
+                    return;
+                }
+
+                doCompaction();
+            }
+        } catch (Throwable t) {
+            // TODO: IGNITE-16899 By analogy with 2.0, we need to handle the exception (err) by the FailureProcessor
+
+            throw new IgniteInternalException(t);
+        }
+    }
+
+    /**
+     * Waiting for delta files.
+     */
+    void waitDeltaFiles() {
+        try {
+            synchronized (mux) {
+                while (deltaFileCount.get() == 0 && !isCancelled()) {
+                    blockingSectionBegin();
+
+                    try {
+                        mux.wait();
+                    } finally {
+                        blockingSectionEnd();
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            isCancelled.set(true);
+        }
+    }
+
+    /**
+     * Adds the number of delta files to compact.
+     *
+     * @param count Number of delta files.
+     */
+    public void addDeltaFiles(int count) {
+        assert count >= 0;
+
+        if (count > 0) {
+            deltaFileCount.addAndGet(count);
+
+            synchronized (mux) {
+                mux.notifyAll();
+            }
+        }
+    }
+
+    /**
+     * Merges delta files with partition files.
+     */
+    void doCompaction() {
+        // Let's collect one delta file for each partition.
+        Queue<IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo>> queue = filePageStoreManager.allPageStores().stream()
+                .flatMap(List::stream)
+                .map(filePageStore -> {
+                    DeltaFilePageStoreIo deltaFileToCompaction = filePageStore.getDeltaFileToCompaction();
+
+                    return deltaFileToCompaction == null ? null : new IgniteBiTuple<>(filePageStore, deltaFileToCompaction);
+                })
+                .filter(Objects::nonNull)
+                .collect(toCollection(ConcurrentLinkedQueue::new));
+
+        assert !queue.isEmpty();
+
+        updateHeartbeat();
+
+        int threads = threadPoolExecutor == null ? 1 : threadPoolExecutor.getMaximumPoolSize();
+
+        CompletableFuture<?>[] futures = new CompletableFuture[threads];
+
+        for (int i = 0; i < threads; i++) {
+            CompletableFuture<?> future = futures[i] = new CompletableFuture<>();
+
+            Runnable merger = () -> {
+                IgniteBiTuple<FilePageStore, DeltaFilePageStoreIo> toMerge;
+
+                try {
+                    while ((toMerge = queue.poll()) != null) {
+                        mergeDeltaFileToMainFile(toMerge.get1(), toMerge.get2());
+                    }
+                } catch (Throwable ex) {
+                    future.completeExceptionally(ex);
+                }
+
+                future.complete(null);
+            };
+
+            if (isCancelled()) {
+                return;
+            }
+
+            if (threadPoolExecutor == null) {
+                merger.run();
+            } else {
+                threadPoolExecutor.execute(merger);
+            }
+        }
+
+        updateHeartbeat();
+
+        // Wait and check for errors.
+        CompletableFuture.allOf(futures).join();
+    }
+
+    /**
+     * Starts the compacter.
+     */
+    public void start() {
+        if (runner() != null) {
+            return;
+        }
+
+        assert runner() == null : "Compacter is running";
+
+        new IgniteThread(this).start();
+    }
+
+    /**
+     * Stops the compacter.
+     */
+    public void stop() throws Exception {
+        cancel();
+
+        boolean interrupt = false;
+
+        while (true) {
+            try {
+                join();
+
+                break;
+            } catch (InterruptedException e) {
+                interrupt = true;
+            }
+        }
+
+        if (interrupt) {
+            Thread.currentThread().interrupt();
+        }
+
+        if (threadPoolExecutor != null) {
+            IgniteUtils.shutdownAndAwaitTermination(threadPoolExecutor, 2, TimeUnit.MINUTES);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void cancel() {
+        if (log.isDebugEnabled()) {
+            log.debug("Cancelling grid runnable: " + this);
+        }
+
+        // Do not interrupt runner thread.
+        isCancelled.set(true);
+
+        synchronized (mux) {
+            mux.notifyAll();
+        }
+    }
+
+    /**
+     * Merges the main file page store with the delta file page store.
+     *
+     * <p>Steps:
+     * <ul>
+     *  <li>Copy pages from delta file page store to file page store.</li>
+     *  <li>Fsync the file page store.</li>
+     *  <li>Removing the delta file page store from a file page store.</li>
+     * </ul>
+     *
+     * @param filePageStore File page store.
+     * @param deltaFilePageStore Delta file page store.
+     * @throws Throwable If failed.
+     */
+    void mergeDeltaFileToMainFile(
+            FilePageStore filePageStore,
+            DeltaFilePageStoreIo deltaFilePageStore
+    ) throws Throwable {
+        // Copy pages deltaFilePageStore -> filePageStore.
+        ByteBuffer buffer = threadBuf.get();
+
+        for (long pageIndex : deltaFilePageStore.pageIndexes()) {
+            updateHeartbeat();
+
+            if (isCancelled()) {
+                return;
+            }
+
+            long pageOffset = deltaFilePageStore.pageOffset(pageIndex);
+
+            // pageIndex instead of pageId, only for debugging in case of errors
+            // since we do not know the pageId until we read it from the pageOffset.
+            boolean read = deltaFilePageStore.readWithMergedToFilePageStoreCheck(pageIndex, pageOffset, buffer.rewind(), false);
+
+            assert read : deltaFilePageStore.filePath();
+
+            long pageId = PageIo.getPageId(buffer.rewind());
+
+            assert pageId != 0 : deltaFilePageStore.filePath();
+
+            updateHeartbeat();
+
+            if (isCancelled()) {
+                return;
+            }
+
+            filePageStore.write(pageId, buffer.rewind(), true);
+        }
+
+        // Fsync the file page store.
+        updateHeartbeat();
+
+        if (isCancelled()) {
+            return;
+        }
+
+        filePageStore.sync();
+
+        // Removing the delta file page store from a file page store.
+        updateHeartbeat();
+
+        if (isCancelled()) {
+            return;
+        }
+
+        boolean removed = filePageStore.removeDeltaFile(deltaFilePageStore);
+
+        assert removed : filePageStore.filePath();
+
+        deltaFilePageStore.markMergedToFilePageStore();
+
+        deltaFilePageStore.stop(true);
+
+        deltaFileCount.decrementAndGet();
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java
index 9c1319938..7043f90ab 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIo.java
@@ -64,7 +64,11 @@ public abstract class AbstractFilePageStoreIo implements Closeable {
     /** Initialized file page store IO. */
     private volatile boolean initialized;
 
-    /** Caches the existence state of file. After it is initialized, it will be not {@code null} during lifecycle. */
+    /**
+     * Caches the existence state of file. After it is initialized, it will be not {@code null} during lifecycle.
+     *
+     * <p>Guarded by {@link #readWriteLock}.
+     */
     private @Nullable Boolean fileExists;
 
     /**
@@ -140,7 +144,7 @@ public abstract class AbstractFilePageStoreIo implements Closeable {
      * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code true}.
      * @throws IgniteInternalCheckedException If reading failed (IO error occurred).
      */
-    public void read(long pageId, long pageOff, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
+    protected void read(long pageId, long pageOff, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
         read0(pageId, pageOff, pageBuf, !skipCrc, keepCrc);
     }
 
@@ -513,8 +517,6 @@ public abstract class AbstractFilePageStoreIo implements Closeable {
             if (keepCrc) {
                 PageIo.setCrc(pageBuf, savedCrc32);
             }
-
-            return;
         } catch (IOException e) {
             throw new IgniteInternalCheckedException("Failed to read page [file=" + filePath + ", pageId=" + pageId + "]", e);
         }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java
index 2fd974a82..7d5256a16 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIo.java
@@ -31,6 +31,8 @@ import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import org.apache.ignite.internal.fileio.FileIo;
 import org.apache.ignite.internal.fileio.FileIoFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
 
 /**
  * Implementation of the class for working with the delta file page storage IO.
@@ -38,6 +40,9 @@ import org.apache.ignite.internal.fileio.FileIoFactory;
 public class DeltaFilePageStoreIo extends AbstractFilePageStoreIo {
     private final DeltaFilePageStoreIoHeader header;
 
+    /** Lock to prevent reads after merging with a file page store. */
+    private final IgniteSpinBusyLock mergedBusyLock = new IgniteSpinBusyLock();
+
     /**
      * Constructor.
      *
@@ -95,7 +100,16 @@ public class DeltaFilePageStoreIo extends AbstractFilePageStoreIo {
      */
     @Override
     public long pageOffset(long pageId) {
-        int searchResult = binarySearch(header.pageIndexes(), pageIndex(pageId));
+        return pageOffset(pageIndex(pageId));
+    }
+
+    /**
+     * Returns page offset within the store file, {@code -1} if page not found in delta file.
+     *
+     * @param pageIdx Page index.
+     */
+    public long pageOffset(int pageIdx) {
+        int searchResult = binarySearch(header.pageIndexes(), pageIdx);
 
         if (searchResult < 0) {
             return -1;
@@ -104,10 +118,56 @@ public class DeltaFilePageStoreIo extends AbstractFilePageStoreIo {
         return (long) searchResult * pageSize() + headerSize();
     }
 
+    /**
+     * Reads a page.
+     *
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer to read into.
+     * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code true}.
+     * @return {@code True} if the page was successfully read, otherwise the delta file page store is {@link #markMergedToFilePageStore()
+     * merged} with the file page store (must be read from file page store).
+     * @throws IgniteInternalCheckedException If reading failed (IO error occurred).
+     */
+    public boolean readWithMergedToFilePageStoreCheck(
+            long pageId,
+            long pageOff,
+            ByteBuffer pageBuf,
+            boolean keepCrc
+    ) throws IgniteInternalCheckedException {
+        if (!mergedBusyLock.enterBusy()) {
+            return false;
+        }
+
+        try {
+            super.read(pageId, pageOff, pageBuf, keepCrc);
+
+            return true;
+        } finally {
+            mergedBusyLock.leaveBusy();
+        }
+    }
+
     /**
      * Returns the index of the delta file page store.
      */
     public int fileIndex() {
         return header.index();
     }
+
+    /**
+     * Marks that the delta file page store has been merged with the file page store.
+     *
+     * <p>It waits for all current {@link #readWithMergedToFilePageStoreCheck(long, long, ByteBuffer, boolean) readings} to end, and
+     * subsequent ones will return {@code false} (will need to read from the file page store not from delta file page store).
+     */
+    public void markMergedToFilePageStore() {
+        mergedBusyLock.block();
+    }
+
+    /**
+     * Returns page indexes of the delta file page store.
+     */
+    public int[] pageIndexes() {
+        return header.pageIndexes();
+    }
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java
index 9ff231861..5a3b3a2e1 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStore.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -168,9 +169,9 @@ public class FilePageStore implements PageStore {
             long pageOff = deltaFilePageStoreIo.pageOffset(pageId);
 
             if (pageOff >= 0) {
-                deltaFilePageStoreIo.read(pageId, pageOff, pageBuf, keepCrc);
-
-                return;
+                if (deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(pageId, pageOff, pageBuf, keepCrc)) {
+                    return;
+                }
             }
         }
 
@@ -182,17 +183,7 @@ public class FilePageStore implements PageStore {
     public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
         assert pageIndex(pageId) <= pageCount : "pageIdx=" + pageIndex(pageId) + ", pageCount=" + pageCount;
 
-        for (DeltaFilePageStoreIo deltaFilePageStoreIo : deltaFilePageStoreIos) {
-            long pageOff = deltaFilePageStoreIo.pageOffset(pageId);
-
-            if (pageOff >= 0) {
-                deltaFilePageStoreIo.read(pageId, pageOff, pageBuf, keepCrc);
-
-                return;
-            }
-        }
-
-        filePageStoreIo.read(pageId, filePageStoreIo.pageOffset(pageId), pageBuf, keepCrc);
+        readWithoutPageIdCheck(pageId, pageBuf, keepCrc);
     }
 
     /** {@inheritDoc} */
@@ -344,4 +335,44 @@ public class FilePageStore implements PageStore {
     public int deltaFileCount() {
         return deltaFilePageStoreIos.size();
     }
+
+    /**
+     * Returns the delta file to compaction (oldest).
+     *
+     * <p>Thread safe.
+     */
+    public @Nullable DeltaFilePageStoreIo getDeltaFileToCompaction() {
+        // Snapshot of delta files.
+        Iterator<DeltaFilePageStoreIo> iterator = deltaFilePageStoreIos.iterator();
+
+        // Last one is the oldest.
+        DeltaFilePageStoreIo last = null;
+
+        int count = 0;
+
+        while (iterator.hasNext()) {
+            last = iterator.next();
+
+            count++;
+        }
+
+        // If last is just created, then it cannot be compacted yet.
+        if (count == 1 && newDeltaFilePageStoreIoFuture != null) {
+            last = null;
+        }
+
+        return last;
+    }
+
+    /**
+     * Deletes delta file.
+     *
+     * <p>Thread safe.
+     *
+     * @param deltaFilePageStoreIo Delta file to be deleted.
+     * @return {@code True} if the delta file being removed was present.
+     */
+    public boolean removeDeltaFile(DeltaFilePageStoreIo deltaFilePageStoreIo) {
+        return deltaFilePageStoreIos.remove(deltaFilePageStoreIo);
+    }
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java
index 0d340c9e9..e14d397bf 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIo.java
@@ -28,6 +28,7 @@ import java.nio.file.Path;
 import org.apache.ignite.internal.fileio.FileIo;
 import org.apache.ignite.internal.fileio.FileIoFactory;
 import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
 
 /**
  * Implementation of the class for working with the file page file storage IO.
@@ -83,6 +84,12 @@ public class FilePageStoreIo extends AbstractFilePageStoreIo {
         checkFilePageSize(this.header.pageSize(), header.pageSize());
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public void read(long pageId, long pageOff, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
+        super.read(pageId, pageOff, pageBuf, keepCrc);
+    }
+
     /** {@inheritDoc} */
     @Override
     public long pageOffset(long pageId) {
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
index 64d67fd28..70a15608c 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManager.java
@@ -21,6 +21,8 @@ import static java.nio.file.Files.createDirectories;
 import static java.util.Collections.unmodifiableList;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
 import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer;
 import static org.apache.ignite.internal.util.GridUnsafe.freeBuffer;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
@@ -82,9 +84,6 @@ public class FilePageStoreManager implements PageReadWriteManager {
     /** Page size in bytes. */
     private final int pageSize;
 
-    /** Page read write manager. */
-    private final PageReadWriteManager pageReadWriteManager = new PageReadWriteManagerImpl(this);
-
     /**
      * Executor to disallow running code that modifies data in {@link #groupPageStores} concurrently with cleanup of file page store.
      */
@@ -180,7 +179,15 @@ public class FilePageStoreManager implements PageReadWriteManager {
     /** {@inheritDoc} */
     @Override
     public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
-        pageReadWriteManager.read(grpId, pageId, pageBuf, keepCrc);
+        FilePageStore pageStore = getStore(grpId, partitionId(pageId));
+
+        try {
+            pageStore.read(pageId, pageBuf, keepCrc);
+        } catch (IgniteInternalCheckedException e) {
+            // TODO: IGNITE-16899 By analogy with 2.0, fail a node
+
+            throw e;
+        }
     }
 
     /** {@inheritDoc} */
@@ -191,13 +198,35 @@ public class FilePageStoreManager implements PageReadWriteManager {
             ByteBuffer pageBuf,
             boolean calculateCrc
     ) throws IgniteInternalCheckedException {
-        return pageReadWriteManager.write(grpId, pageId, pageBuf, calculateCrc);
+        FilePageStore pageStore = getStore(grpId, partitionId(pageId));
+
+        try {
+            pageStore.write(pageId, pageBuf, calculateCrc);
+        } catch (IgniteInternalCheckedException e) {
+            // TODO: IGNITE-16899 By analogy with 2.0, fail a node
+
+            throw e;
+        }
+
+        return pageStore;
     }
 
     /** {@inheritDoc} */
     @Override
     public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException {
-        return pageReadWriteManager.allocatePage(grpId, partId, flags);
+        assert partId >= 0 && partId <= MAX_PARTITION_ID : partId;
+
+        FilePageStore pageStore = getStore(grpId, partId);
+
+        try {
+            int pageIdx = pageStore.allocatePage();
+
+            return pageId(partId, flags, pageIdx);
+        } catch (IgniteInternalCheckedException e) {
+            // TODO: IGNITE-16899 By analogy with 2.0, fail a node
+
+            throw e;
+        }
     }
 
     /**
@@ -239,6 +268,13 @@ public class FilePageStoreManager implements PageReadWriteManager {
         return groupPageStores.get(grpId);
     }
 
+    /**
+     * Returns all page stores of all groups.
+     */
+    public Collection<List<FilePageStore>> allPageStores() {
+        return groupPageStores.allPageStores();
+    }
+
     /**
      * Returns partition file page store for the corresponding parameters.
      *
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java
deleted file mode 100644
index 94e47164f..000000000
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence.store;
-
-import static org.apache.ignite.internal.pagememory.PageIdAllocator.MAX_PARTITION_ID;
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.tostring.IgniteToStringExclude;
-import org.apache.ignite.lang.IgniteInternalCheckedException;
-
-/**
- * {@link org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager} implementation.
- */
-class PageReadWriteManagerImpl implements org.apache.ignite.internal.pagememory.persistence.PageReadWriteManager {
-    @IgniteToStringExclude
-    protected final FilePageStoreManager filePageStoreManager;
-
-    /**
-     * Constructor.
-     *
-     * @param filePageStoreManager File page store manager.
-     */
-    public PageReadWriteManagerImpl(FilePageStoreManager filePageStoreManager) {
-        this.filePageStoreManager = filePageStoreManager;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void read(int grpId, long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException {
-        FilePageStore pageStore = filePageStoreManager.getStore(grpId, partitionId(pageId));
-
-        try {
-            pageStore.read(pageId, pageBuf, keepCrc);
-        } catch (IgniteInternalCheckedException e) {
-            // TODO: IGNITE-16899 By analogy with 2.0, fail a node
-
-            throw e;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public PageStore write(
-            int grpId,
-            long pageId,
-            ByteBuffer pageBuf,
-            boolean calculateCrc
-    ) throws IgniteInternalCheckedException {
-        FilePageStore pageStore = filePageStoreManager.getStore(grpId, partitionId(pageId));
-
-        try {
-            pageStore.write(pageId, pageBuf, calculateCrc);
-        } catch (IgniteInternalCheckedException e) {
-            // TODO: IGNITE-16899 By analogy with 2.0, fail a node
-
-            throw e;
-        }
-
-        return pageStore;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public long allocatePage(int grpId, int partId, byte flags) throws IgniteInternalCheckedException {
-        assert partId >= 0 && partId <= MAX_PARTITION_ID : partId;
-
-        FilePageStore pageStore = filePageStoreManager.getStore(grpId, partId);
-
-        try {
-            int pageIdx = pageStore.allocatePage();
-
-            return pageId(partId, flags, pageIdx);
-        } catch (IgniteInternalCheckedException e) {
-            // TODO: IGNITE-16899 By analogy with 2.0, fail a node
-
-            throw e;
-        }
-    }
-}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java
index 9866ec28a..66d778a6a 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java
@@ -51,8 +51,7 @@ public interface PageStore extends Closeable {
      *
      * @param pageId Page ID.
      * @param pageBuf Page buffer to read into.
-     * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code
-     * keepCrc}.
+     * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code true}.
      * @throws IgniteInternalCheckedException If reading failed (IO error occurred).
      */
     void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException;
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index d4ad1f6b4..3e5f2b05f 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -46,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -71,6 +72,7 @@ import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
 import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
 import org.apache.ignite.internal.pagememory.persistence.WriteDirtyPage;
+import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor;
 import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
 import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
@@ -91,7 +93,7 @@ public class CheckpointerTest {
 
     private final IgniteLogger log = Loggers.forClass(CheckpointerTest.class);
 
-    @InjectConfiguration("mock : {threads=1, frequency=1000, frequencyDeviation=0}")
+    @InjectConfiguration("mock : {checkpointThreads=1, frequency=1000, frequencyDeviation=0}")
     private PageMemoryCheckpointConfiguration checkpointConfig;
 
     @BeforeAll
@@ -116,6 +118,7 @@ public class CheckpointerTest {
                 createCheckpointWorkflow(EMPTY),
                 createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class)),
                 mock(FilePageStoreManager.class),
+                mock(Compactor.class),
                 checkpointConfig
         );
 
@@ -148,6 +151,7 @@ public class CheckpointerTest {
                 mock(CheckpointWorkflow.class),
                 mock(CheckpointPagesWriterFactory.class),
                 mock(FilePageStoreManager.class),
+                mock(Compactor.class),
                 checkpointConfig
         ));
 
@@ -247,6 +251,7 @@ public class CheckpointerTest {
                 mock(CheckpointWorkflow.class),
                 mock(CheckpointPagesWriterFactory.class),
                 mock(FilePageStoreManager.class),
+                mock(Compactor.class),
                 checkpointConfig
         );
 
@@ -275,6 +280,7 @@ public class CheckpointerTest {
                 createCheckpointWorkflow(EMPTY),
                 createCheckpointPagesWriterFactory(mock(PartitionMetaManager.class)),
                 mock(FilePageStoreManager.class),
+                mock(Compactor.class),
                 checkpointConfig
         ));
 
@@ -352,6 +358,8 @@ public class CheckpointerTest {
 
         when(filePageStore.getNewDeltaFile()).thenReturn(completedFuture(mock(DeltaFilePageStoreIo.class)));
 
+        Compactor compactor = mock(Compactor.class);
+
         Checkpointer checkpointer = spy(new Checkpointer(
                 log,
                 "test",
@@ -360,6 +368,7 @@ public class CheckpointerTest {
                 createCheckpointWorkflow(dirtyPages),
                 createCheckpointPagesWriterFactory(partitionMetaManager),
                 createFilePageStoreManager(Map.of(new GroupPartitionId(0, 0), filePageStore)),
+                compactor,
                 checkpointConfig
         ));
 
@@ -367,6 +376,7 @@ public class CheckpointerTest {
 
         verify(dirtyPages, times(1)).toDirtyPageIdQueue();
         verify(checkpointer, times(1)).startCheckpointProgress();
+        verify(compactor, times(1)).addDeltaFiles(eq(1));
 
         assertEquals(checkpointer.lastCheckpointProgress().currentCheckpointPagesCount(), 3);
     }
@@ -381,6 +391,7 @@ public class CheckpointerTest {
                 mock(CheckpointWorkflow.class),
                 mock(CheckpointPagesWriterFactory.class),
                 mock(FilePageStoreManager.class),
+                mock(Compactor.class),
                 checkpointConfig
         );
 
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
new file mode 100644
index 000000000..d065efcda
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.compaction;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.util.GridUnsafe.bufferAddress;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.configuration.ConfigurationValue;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
+import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link Compactor} testing.
+ */
+public class CompactorTest {
+    private static final int PAGE_SIZE = 1024;
+
+    private final IgniteLogger log = Loggers.forClass(CompactorTest.class);
+
+    @Test
+    void testStartAndStop() throws Exception {
+        Compactor compactor = new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE);
+
+        compactor.start();
+
+        assertNull(compactor.runner());
+
+        assertFalse(compactor.isCancelled());
+        assertFalse(compactor.isDone());
+        assertFalse(Thread.currentThread().isInterrupted());
+
+        compactor.start();
+
+        assertTrue(waitForCondition(() -> compactor.runner() != null, 10, 1_000));
+
+        compactor.stop();
+
+        assertTrue(waitForCondition(() -> compactor.runner() == null, 10, 1_000));
+
+        assertTrue(compactor.isCancelled());
+        assertTrue(compactor.isDone());
+        assertFalse(Thread.currentThread().isInterrupted());
+    }
+
+    @Test
+    void testMergeDeltaFileToMainFile() throws Throwable {
+        Compactor compactor = new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE);
+
+        FilePageStore filePageStore = mock(FilePageStore.class);
+        DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class);
+
+        when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true);
+
+        when(deltaFilePageStoreIo.pageIndexes()).thenReturn(new int[]{0});
+
+        when(deltaFilePageStoreIo.readWithMergedToFilePageStoreCheck(anyLong(), anyLong(), any(ByteBuffer.class), anyBoolean()))
+                .then(answer -> {
+                    ByteBuffer buffer = answer.getArgument(2);
+
+                    PageIo.setPageId(bufferAddress(buffer), 1);
+
+                    return true;
+                });
+
+        compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo);
+
+        verify(deltaFilePageStoreIo, times(1)).readWithMergedToFilePageStoreCheck(eq(0L), eq(0L), any(ByteBuffer.class), anyBoolean());
+        verify(filePageStore, times(1)).write(eq(1L), any(ByteBuffer.class), anyBoolean());
+
+        verify(filePageStore, times(1)).sync();
+        verify(filePageStore, times(1)).removeDeltaFile(eq(deltaFilePageStoreIo));
+
+        verify(deltaFilePageStoreIo, times(1)).markMergedToFilePageStore();
+        verify(deltaFilePageStoreIo, times(1)).stop(eq(true));
+    }
+
+    @Test
+    void testDoCompaction() throws Throwable {
+        FilePageStore filePageStore = mock(FilePageStore.class);
+
+        DeltaFilePageStoreIo deltaFilePageStoreIo = mock(DeltaFilePageStoreIo.class);
+
+        when(filePageStore.getDeltaFileToCompaction()).thenReturn(deltaFilePageStoreIo);
+
+        FilePageStoreManager filePageStoreManager = mock(FilePageStoreManager.class);
+
+        when(filePageStoreManager.allPageStores()).thenReturn(List.of(List.of(filePageStore)));
+
+        Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), filePageStoreManager, PAGE_SIZE));
+
+        doAnswer(answer -> {
+            assertSame(filePageStore, answer.getArgument(0));
+            assertSame(deltaFilePageStoreIo, answer.getArgument(1));
+
+            return null;
+        })
+                .when(compactor)
+                .mergeDeltaFileToMainFile(any(FilePageStore.class), any(DeltaFilePageStoreIo.class));
+
+        compactor.doCompaction();
+
+        verify(filePageStore, times(1)).getDeltaFileToCompaction();
+
+        verify(compactor, times(1)).mergeDeltaFileToMainFile(any(FilePageStore.class), any(DeltaFilePageStoreIo.class));
+    }
+
+    @Test
+    void testBody() throws Exception {
+        Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE));
+
+        doNothing().when(compactor).waitDeltaFiles();
+
+        doAnswer(answer -> {
+            compactor.cancel();
+
+            return null;
+        }).when(compactor).doCompaction();
+
+        compactor.body();
+
+        verify(compactor, times(3)).isCancelled();
+        verify(compactor, times(1)).waitDeltaFiles();
+        verify(compactor, times(1)).doCompaction();
+    }
+
+    @Test
+    void testWaitDeltaFiles() throws Exception {
+        Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE));
+
+        CompletableFuture<?> waitDeltaFilesFuture = runAsync(compactor::waitDeltaFiles);
+
+        assertThrows(TimeoutException.class, () -> waitDeltaFilesFuture.get(100, MILLISECONDS));
+
+        compactor.addDeltaFiles(1);
+
+        waitDeltaFilesFuture.get(100, MILLISECONDS);
+    }
+
+    @Test
+    void testCancel() throws Exception {
+        Compactor compactor = spy(new Compactor(log, "test", null, threadsConfig(1), mock(FilePageStoreManager.class), PAGE_SIZE));
+
+        assertFalse(compactor.isCancelled());
+
+        CompletableFuture<?> waitDeltaFilesFuture = runAsync(compactor::waitDeltaFiles);
+
+        assertThrows(TimeoutException.class, () -> waitDeltaFilesFuture.get(100, MILLISECONDS));
+
+        compactor.cancel();
+
+        assertTrue(compactor.isCancelled());
+        assertFalse(Thread.currentThread().isInterrupted());
+
+        waitDeltaFilesFuture.get(100, MILLISECONDS);
+    }
+
+    private static ConfigurationValue<Integer> threadsConfig(int threads) {
+        ConfigurationValue<Integer> configValue = mock(ConfigurationValue.class);
+
+        when(configValue.value()).thenReturn(threads);
+
+        return configValue;
+    }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java
index b72d7783e..cd0cd9bbc 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/AbstractFilePageStoreIoTest.java
@@ -227,7 +227,7 @@ public abstract class AbstractFilePageStoreIoTest {
 
             long expPageId = createDataPageId(() -> 0);
 
-            ByteBuffer pageByteBuffer = createPageByteBuffer(0, PAGE_SIZE);
+            ByteBuffer pageByteBuffer = createPageByteBuffer(expPageId, PAGE_SIZE);
 
             // Puts random bytes after: type (2 byte) + version (2 byte) + crc (4 byte).
             pageByteBuffer.position(8).put(randomBytes(128));
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java
index 27beb4c08..cebf1bd4a 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/DeltaFilePageStoreIoTest.java
@@ -23,14 +23,21 @@ import static java.nio.file.StandardOpenOption.WRITE;
 import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
 import static org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.DELTA_FILE_VERSION_1;
 import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.arr;
+import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.createDataPageId;
+import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.createPageByteBuffer;
+import static org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.randomBytes;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.fileio.FileIo;
@@ -50,11 +57,19 @@ public class DeltaFilePageStoreIoTest extends AbstractFilePageStoreIoTest {
 
         try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(testFilePath, header)) {
             assertEquals(PAGE_SIZE, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 0)));
+            assertEquals(PAGE_SIZE, filePageStoreIo.pageOffset(0));
+
             assertEquals(2 * PAGE_SIZE, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 1)));
+            assertEquals(2 * PAGE_SIZE, filePageStoreIo.pageOffset(1));
+
             assertEquals(3 * PAGE_SIZE, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 2)));
+            assertEquals(3 * PAGE_SIZE, filePageStoreIo.pageOffset(2));
 
             assertEquals(-1, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 3)));
+            assertEquals(-1, filePageStoreIo.pageOffset(3));
+
             assertEquals(-1, filePageStoreIo.pageOffset(pageId(0, FLAG_DATA, 4)));
+            assertEquals(-1, filePageStoreIo.pageOffset(4));
         }
     }
 
@@ -134,6 +149,51 @@ public class DeltaFilePageStoreIoTest extends AbstractFilePageStoreIoTest {
         }
     }
 
+    @Test
+    void testMergedToFilePageStore() throws Exception {
+        DeltaFilePageStoreIoHeader header = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 1, PAGE_SIZE, arr(0, 1, 2));
+
+        try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(workDir.resolve("test"), header)) {
+            // Preparation for reading.
+            long pageId = createDataPageId(() -> 0);
+
+            ByteBuffer pageByteBuffer = createPageByteBuffer(pageId, PAGE_SIZE);
+
+            // Puts random bytes after: type (2 byte) + version (2 byte) + crc (4 byte).
+            pageByteBuffer.position(8).put(randomBytes(128));
+
+            filePageStoreIo.write(pageId, pageByteBuffer.rewind(), true);
+
+            filePageStoreIo.sync();
+
+            // Checking readings.
+            ByteBuffer buffer = ByteBuffer.allocateDirect(PAGE_SIZE).order(pageByteBuffer.order());
+
+            long pageOff = filePageStoreIo.pageOffset(pageId);
+
+            assertTrue(filePageStoreIo.readWithMergedToFilePageStoreCheck(pageId, pageOff, buffer, false));
+
+            assertEquals(pageByteBuffer.rewind(), buffer.rewind());
+
+            buffer.rewind().put(new byte[PAGE_SIZE]);
+
+            filePageStoreIo.markMergedToFilePageStore();
+
+            assertFalse(filePageStoreIo.readWithMergedToFilePageStoreCheck(pageId, pageOff, buffer.rewind(), false));
+
+            assertEquals(ByteBuffer.allocateDirect(PAGE_SIZE).order(pageByteBuffer.order()), buffer.rewind());
+        }
+    }
+
+    @Test
+    void testPageIndexes() throws Exception {
+        DeltaFilePageStoreIoHeader header = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 1, PAGE_SIZE, arr(0, 1, 2));
+
+        try (DeltaFilePageStoreIo filePageStoreIo = createFilePageStoreIo(workDir.resolve("test"), header)) {
+            assertArrayEquals(arr(0, 1, 2), filePageStoreIo.pageIndexes());
+        }
+    }
+
     /** {@inheritDoc} */
     @Override
     DeltaFilePageStoreIo createFilePageStoreIo(Path filePath) {
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java
index 959db3983..52588245c 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreManagerTest.java
@@ -45,6 +45,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
@@ -351,6 +352,24 @@ public class FilePageStoreManagerTest {
         );
     }
 
+    @Test
+    void testAllPageStores() throws Exception {
+        FilePageStoreManager manager = createManager();
+
+        manager.start();
+
+        manager.initialize("test0", 1, 1);
+        manager.initialize("test1", 2, 1);
+
+        assertThat(
+                manager.allPageStores().stream().flatMap(List::stream).map(FilePageStore::filePath).collect(toList()),
+                containsInAnyOrder(
+                        workDir.resolve("db/table-1").resolve("part-0.bin"),
+                        workDir.resolve("db/table-2").resolve("part-0.bin")
+                )
+        );
+    }
+
     private FilePageStoreManager createManager() throws Exception {
         return new FilePageStoreManager(log, "test", workDir, new RandomAccessFileIoFactory(), 1024);
     }
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java
index 19e6b20ae..573522291 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.pagememory.persistence.store;
 
+import static java.nio.ByteOrder.nativeOrder;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.DELTA_FILE_VERSION_1;
 import static org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.VERSION_1;
@@ -29,8 +30,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
@@ -38,6 +41,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
@@ -411,6 +415,94 @@ public class FilePageStoreTest {
         }
     }
 
+    @Test
+    void testRemoveDeltaFile() throws Exception {
+        DeltaFilePageStoreIo deltaFile0 = mock(DeltaFilePageStoreIo.class);
+        DeltaFilePageStoreIo deltaFile1 = mock(DeltaFilePageStoreIo.class);
+
+        try (FilePageStore filePageStore = createFilePageStore(workDir.resolve("test"), deltaFile0, deltaFile1)) {
+            assertEquals(2, filePageStore.deltaFileCount());
+
+            assertTrue(filePageStore.removeDeltaFile(deltaFile0));
+            assertFalse(filePageStore.removeDeltaFile(deltaFile0));
+
+            assertEquals(1, filePageStore.deltaFileCount());
+
+            assertTrue(filePageStore.removeDeltaFile(deltaFile1));
+            assertFalse(filePageStore.removeDeltaFile(deltaFile1));
+
+            assertEquals(0, filePageStore.deltaFileCount());
+        }
+    }
+
+    @Test
+    void testGetDeltaFileToCompaction() throws Exception {
+        DeltaFilePageStoreIo deltaFile0 = mock(DeltaFilePageStoreIo.class);
+        DeltaFilePageStoreIo deltaFile1 = mock(DeltaFilePageStoreIo.class);
+
+        when(deltaFile0.fileIndex()).thenReturn(0);
+        when(deltaFile0.fileIndex()).thenReturn(1);
+
+        try (FilePageStore filePageStore = createFilePageStore(workDir.resolve("test"), deltaFile0, deltaFile1)) {
+            assertSame(deltaFile1, filePageStore.getDeltaFileToCompaction());
+
+            CompletableFuture<DeltaFilePageStoreIo> createNewDeltaFileFuture = filePageStore.getOrCreateNewDeltaFile(
+                    index -> workDir.resolve("delta" + index),
+                    TestPageStoreUtils::arr
+            );
+
+            createNewDeltaFileFuture.get(1, SECONDS);
+
+            assertSame(deltaFile1, filePageStore.getDeltaFileToCompaction());
+
+            filePageStore.removeDeltaFile(deltaFile1);
+
+            assertSame(deltaFile0, filePageStore.getDeltaFileToCompaction());
+
+            filePageStore.removeDeltaFile(deltaFile0);
+
+            assertNull(filePageStore.getDeltaFileToCompaction());
+
+            filePageStore.completeNewDeltaFile();
+
+            assertSame(createNewDeltaFileFuture.join(), filePageStore.getDeltaFileToCompaction());
+
+            filePageStore.removeDeltaFile(createNewDeltaFileFuture.join());
+
+            assertNull(filePageStore.getDeltaFileToCompaction());
+        }
+    }
+
+    @Test
+    void testReadWithMergedDeltaFiles() throws Exception {
+        RandomAccessFileIoFactory ioFactory = new RandomAccessFileIoFactory();
+
+        FilePageStoreHeader header = new FilePageStoreHeader(VERSION_1, PAGE_SIZE);
+        DeltaFilePageStoreIoHeader deltaHeader = new DeltaFilePageStoreIoHeader(DELTA_FILE_VERSION_1, 0, PAGE_SIZE, arr(0));
+
+        try (
+                DeltaFilePageStoreIo deltaIo = spy(new DeltaFilePageStoreIo(ioFactory, deltaFilePath(0), deltaHeader));
+                FilePageStoreIo storeIo = spy(new FilePageStoreIo(ioFactory, workDir.resolve("test"), header));
+                FilePageStore filePageStore = new FilePageStore(storeIo, deltaIo);
+        ) {
+            long pageId = createDataPageId(filePageStore::allocatePage);
+
+            ByteBuffer buffer = ByteBuffer.allocateDirect(PAGE_SIZE).order(nativeOrder());
+
+            filePageStore.read(pageId, buffer, true);
+
+            verify(deltaIo, times(1)).readWithMergedToFilePageStoreCheck(eq(pageId), anyLong(), eq(buffer), eq(true));
+            verify(storeIo, times(0)).read(eq(pageId), anyLong(), eq(buffer), eq(true));
+
+            deltaIo.markMergedToFilePageStore();
+
+            filePageStore.read(pageId, buffer.rewind(), true);
+
+            verify(deltaIo, times(2)).readWithMergedToFilePageStoreCheck(eq(pageId), anyLong(), eq(buffer), eq(true));
+            verify(storeIo, times(1)).read(eq(pageId), anyLong(), eq(buffer), eq(true));
+        }
+    }
+
     private static FilePageStore createFilePageStore(Path filePath) {
         return createFilePageStore(filePath, new FilePageStoreHeader(VERSION_1, PAGE_SIZE));
     }
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImplTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImplTest.java
deleted file mode 100644
index 723669d97..000000000
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/PageReadWriteManagerImplTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.pagememory.persistence.store;
-
-import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.nio.ByteBuffer;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-/**
- * For {@link PageReadWriteManagerImpl} testing.
- */
-public class PageReadWriteManagerImplTest {
-    private FilePageStore filePageStore;
-
-    private FilePageStoreManager filePageStoreManager;
-
-    private PageReadWriteManagerImpl pageReadWriteManager;
-
-    @BeforeEach
-    void setUp() throws Exception {
-        filePageStore = mock(FilePageStore.class);
-
-        filePageStoreManager = mock(FilePageStoreManager.class);
-
-        when(filePageStoreManager.getStore(0, 0)).thenReturn(filePageStore);
-
-        pageReadWriteManager = new PageReadWriteManagerImpl(filePageStoreManager);
-    }
-
-    @Test
-    void testRead() throws Exception {
-        long pageId = pageId(0, (byte) 0, 0);
-
-        ByteBuffer pageBuffer = mock(ByteBuffer.class);
-
-        pageReadWriteManager.read(0, pageId, pageBuffer, true);
-
-        verify(filePageStore, times(1)).read(pageId, pageBuffer, true);
-    }
-
-    @Test
-    void testWrite() throws Exception {
-        long pageId = pageId(0, (byte) 0, 0);
-
-        ByteBuffer pageBuffer = mock(ByteBuffer.class);
-
-        assertSame(filePageStore, pageReadWriteManager.write(0, pageId, pageBuffer, true));
-
-        verify(filePageStore, times(1)).write(pageId, pageBuffer, true);
-    }
-
-    @Test
-    void testAllocatePage() throws Exception {
-        assertEquals(
-                pageId(0, (byte) 1, 0),
-                pageReadWriteManager.allocatePage(0, 0, (byte) 1)
-        );
-
-        verify(filePageStore, times(1)).allocatePage();
-    }
-}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 529930123..593ce5c59 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -72,6 +72,12 @@ class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableStorage {
 
         try {
             dataRegion.filePageStoreManager().initialize(tableView.name(), tableView.tableId(), tableView.partitions());
+
+            int deltaFileCount = dataRegion.filePageStoreManager().getStores(tableView.tableId()).stream()
+                    .mapToInt(FilePageStore::deltaFileCount)
+                    .sum();
+
+            dataRegion.checkpointManager().addDeltaFileCountForCompaction(deltaFileCount);
         } catch (IgniteInternalCheckedException e) {
             throw new StorageException("Error initializing file page stores for table: " + tableView.name(), e);
         }