You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/05/04 13:59:36 UTC

[ignite-3] branch main updated: IGNITE-16887 [Native Persistence 3.0] Porting a checkpoint and related code, part 1 (#791)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a3875f417 IGNITE-16887 [Native Persistence 3.0] Porting a checkpoint and related code, part 1 (#791)
a3875f417 is described below

commit a3875f417bb889badcfb63319a3f8bb1b95a395c
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Wed May 4 16:59:32 2022 +0300

    IGNITE-16887 [Native Persistence 3.0] Porting a checkpoint and related code, part 1 (#791)
---
 .../ignite/internal/util/CollectionUtils.java      |  24 +-
 .../apache/ignite/internal/util/IgniteUtils.java   |  28 ++
 .../ignite/internal/util/CollectionUtilsTest.java  |  33 +-
 .../ignite/internal/util/IgniteUtilsTest.java      |  45 ++
 modules/page-memory/pom.xml                        |   6 +
 .../persistence/ItBplusTreePageMemoryImplTest.java |   2 +
 .../ItBplusTreeReuseListPageMemoryImplTest.java    |   2 +
 ...olicyFactory.java => PageMemoryDataRegion.java} |  27 +-
 .../pagememory/mem/DirectMemoryProvider.java       |   9 +-
 .../pagememory/persistence/PageHeader.java         |   2 +-
 .../pagememory/persistence/PageMemoryImpl.java     | 557 +++++++++++++++++++--
 .../pagememory/persistence/PageStoreWriter.java    |  40 ++
 .../persistence/checkpoint/CheckpointPages.java    |  91 ++++
 .../CheckpointProgress.java}                       |  25 +-
 .../CheckpointReadLockTimeoutException.java}       |  26 +-
 .../checkpoint/CheckpointReadWriteLock.java        | 153 ++++++
 .../CheckpointState.java}                          |  35 +-
 .../checkpoint/CheckpointTimeoutLock.java          | 233 +++++++++
 .../Checkpointer.java}                             |  25 +-
 .../ReentrantReadWriteLockWithTracking.java        | 208 ++++++++
 .../ClockPageReplacementFlags.java                 |   4 +-
 .../ClockPageReplacementPolicy.java                |   5 +-
 .../ClockPageReplacementPolicyFactory.java         |   6 +-
 .../replacement/DelayedDirtyPageStoreWrite.java    | 116 +++++
 .../replacement/DelayedPageReplacementTracker.java | 224 +++++++++
 .../{ => replacement}/PageReplacementPolicy.java   |   2 +-
 .../PageReplacementPolicyFactory.java              |   2 +-
 .../RandomLruPageReplacementPolicy.java            |  15 +-
 .../RandomLruPageReplacementPolicyFactory.java     |   2 +-
 .../{ => replacement}/SegmentedLruPageList.java    |   2 +-
 .../SegmentedLruPageReplacementPolicy.java         |   3 +-
 .../SegmentedLruPageReplacementPolicyFactory.java  |   6 +-
 .../pagememory/impl/PageMemoryNoLoadSelfTest.java  |  14 +-
 .../persistence/PageMemoryImplNoLoadTest.java      |  74 ++-
 .../checkpoint/CheckpointPagesTest.java            | 119 +++++
 .../checkpoint/CheckpointReadWriteLockTest.java    | 201 ++++++++
 .../checkpoint/CheckpointTimeoutLockTest.java      | 356 +++++++++++++
 .../ReentrantReadWriteLockWithTrackingTest.java    | 191 +++++++
 ...gion.java => AbstractPageMemoryDataRegion.java} |   5 +-
 .../pagememory/PageMemoryPartitionStorage.java     |   3 +-
 .../storage/pagememory/PageMemoryTableStorage.java |   4 +-
 .../pagememory/VolatilePageMemoryDataRegion.java   |   4 +-
 42 files changed, 2763 insertions(+), 166 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index f85f78c43..264f1cbe0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -146,9 +146,6 @@ public final class CollectionUtils {
         }
 
         return new AbstractCollection<>() {
-            /** Total size of the collections. */
-            int size = -1;
-
             /** {@inheritDoc} */
             @Override
             public Iterator<T> iterator() {
@@ -158,18 +155,27 @@ public final class CollectionUtils {
             /** {@inheritDoc} */
             @Override
             public int size() {
-                if (size == -1) {
-                    int s = 0;
+                int size = 0;
 
-                    for (Collection<T> collection : collections) {
-                        s += collection.size();
-                    }
+                for (int i = 0; i < collections.length; i++) {
+                    size += collections[i].size();
 
-                    size = s;
                 }
 
                 return size;
             }
+
+            /** {@inheritDoc} */
+            @Override
+            public boolean contains(Object o) {
+                for (int i = 0; i < collections.length; i++) {
+                    if (collections[i].contains(o)) {
+                        return true;
+                    }
+                }
+
+                return false;
+            }
         };
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index b8eee8e07..30e0495b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -34,8 +34,11 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -690,4 +693,29 @@ public class IgniteUtils {
     public static boolean isPow2(int i) {
         return i > 0 && (i & (i - 1)) == 0;
     }
+
+    /**
+     * Waits if necessary for this future to complete, and then returns its result ignoring interrupts.
+     *
+     * @return Result value.
+     * @throws CancellationException If this future was cancelled.
+     * @throws ExecutionException If this future completed exceptionally.
+     */
+    public static <T> T getUninterruptibly(CompletableFuture<T> future) throws ExecutionException {
+        boolean interrupted = false;
+
+        try {
+            while (true) {
+                try {
+                    return future.get();
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
index a0669a78d..39878bf72 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
@@ -26,10 +26,12 @@ import static org.apache.ignite.internal.util.CollectionUtils.setOf;
 import static org.apache.ignite.internal.util.CollectionUtils.union;
 import static org.apache.ignite.internal.util.CollectionUtils.viewReadOnly;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -114,13 +116,42 @@ public class CollectionUtilsTest {
     @Test
     void testCollectionUnion() {
         assertTrue(union().isEmpty());
-        assertTrue(union().isEmpty());
+        assertTrue(union((Collection<Object>[]) null).isEmpty());
         assertTrue(union(List.of()).isEmpty());
 
         assertEquals(List.of(1), collect(union(List.of(1), List.of())));
         assertEquals(List.of(1), collect(union(List.of(), List.of(1))));
 
         assertEquals(List.of(1, 2), collect(union(List.of(1), List.of(2))));
+        assertEquals(List.of(1, 2, 2), collect(union(List.of(1), List.of(2), List.of(2))));
+
+        assertFalse(union().contains(0));
+        assertFalse(union(List.of()).contains(0));
+        assertFalse(union(List.of(1)).contains(0));
+        assertFalse(union(List.of(1), List.of()).contains(0));
+        assertFalse(union(List.of(), List.of(1)).contains(0));
+        assertFalse(union(List.of(1), List.of(2, 3)).contains(0));
+
+        assertTrue(union(List.of(0)).contains(0));
+        assertTrue(union(List.of(), List.of(0)).contains(0));
+        assertTrue(union(List.of(0), List.of()).contains(0));
+
+        assertEquals(0, union().size());
+        assertEquals(0, union(List.of()).size());
+        assertEquals(1, union(List.of(1)).size());
+        assertEquals(1, union(List.of(), List.of(1)).size());
+        assertEquals(1, union(List.of(1), List.of()).size());
+        assertEquals(2, union(List.of(1), List.of(2)).size());
+        assertEquals(3, union(List.of(1), List.of(2, 3)).size());
+        assertEquals(5, union(List.of(1, 4, 5), List.of(2, 3)).size());
+
+        List<Integer> integers = new ArrayList<>(List.of(1, 2, 3));
+
+        Collection<Integer> union = union(integers);
+
+        integers.remove(0);
+
+        assertEquals(2, union.size());
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
index 96618ee8a..fcadfccf8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsTest.java
@@ -17,15 +17,25 @@
 
 package org.apache.ignite.internal.util;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
 import static org.apache.ignite.internal.util.IgniteUtils.isPow2;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.arrayWithSize;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -79,4 +89,39 @@ class IgniteUtilsTest {
         assertFalse(isPow2(7));
         assertFalse(isPow2(9));
     }
+
+    @Test
+    void testGetUninterruptibly() throws Exception {
+        assertThat(getUninterruptibly(completedFuture(true)), equalTo(true));
+        assertThat(Thread.currentThread().isInterrupted(), equalTo(false));
+
+        ExecutionException exception0 = assertThrows(
+                ExecutionException.class,
+                () -> getUninterruptibly(failedFuture(new Exception("test")))
+        );
+
+        assertThat(exception0.getCause(), instanceOf(Exception.class));
+        assertThat(exception0.getCause().getMessage(), equalTo("test"));
+        assertThat(Thread.currentThread().isInterrupted(), equalTo(false));
+
+        CompletableFuture<?> canceledFuture = new CompletableFuture<>();
+        canceledFuture.cancel(false);
+
+        assertThrows(CancellationException.class, () -> getUninterruptibly(canceledFuture));
+        assertThat(Thread.currentThread().isInterrupted(), equalTo(false));
+
+        // Checks interrupt.
+
+        runAsync(() -> {
+            try {
+                Thread.currentThread().interrupt();
+
+                getUninterruptibly(completedFuture(null));
+
+                assertThat(Thread.currentThread().isInterrupted(), equalTo(true));
+            } catch (ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }).get(1, TimeUnit.SECONDS);
+    }
 }
diff --git a/modules/page-memory/pom.xml b/modules/page-memory/pom.xml
index f69863181..598e62ee0 100644
--- a/modules/page-memory/pom.xml
+++ b/modules/page-memory/pom.xml
@@ -80,6 +80,12 @@
             <scope>test</scope>
             <type>test-jar</type>
         </dependency>
+
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java
index 4c6b688fb..f2170c290 100644
--- a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java
+++ b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreePageMemoryImplTest.java
@@ -53,6 +53,8 @@ public class ItBplusTreePageMemoryImplTest extends ItBplusTreeSelfTest {
                 sizes,
                 new TestPageReadWriteManager(),
                 (page, fullPageId, pageMemoryEx) -> {
+                },
+                (fullPageId, buf, tag) -> {
                 }
         );
     }
diff --git a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java
index fe29150d0..b68c1b834 100644
--- a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java
+++ b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/persistence/ItBplusTreeReuseListPageMemoryImplTest.java
@@ -52,6 +52,8 @@ public class ItBplusTreeReuseListPageMemoryImplTest extends ItBplusTreeReuseSelf
                 sizes,
                 new TestPageReadWriteManager(),
                 (page, fullPageId, pageMemoryEx) -> {
+                },
+                (fullPageId, buf, tag) -> {
                 }
         );
     }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
similarity index 59%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
copy to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
index 0eeb7dcc1..dbc6e3c2f 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemoryDataRegion.java
@@ -15,21 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory;
+
+import org.jetbrains.annotations.Nullable;
 
 /**
- * {@link ClockPageReplacementPolicy} factory.
+ * Data region based on {@link PageMemory}.
  */
-public class ClockPageReplacementPolicyFactory implements PageReplacementPolicyFactory {
-    /** {@inheritDoc} */
-    @Override
-    public long requiredMemory(int pagesCnt) {
-        return ClockPageReplacementFlags.requiredMemory(pagesCnt);
-    }
+public interface PageMemoryDataRegion {
+    /**
+     * Returns {@link true} if the date region is persistent.
+     */
+    boolean persistent();
 
-    /** {@inheritDoc} */
-    @Override
-    public PageReplacementPolicy create(PageMemoryImpl.Segment seg, long ptr, int pagesCnt) {
-        return new ClockPageReplacementPolicy(seg, ptr, pagesCnt);
-    }
+    /**
+     * Returns page memory, {@code null} if not started.
+     */
+    @Nullable
+    PageMemory pageMemory();
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java
index f8dfcdcb0..b7534981e 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.pagememory.mem;
 
+import org.jetbrains.annotations.Nullable;
+
 /**
  * Direct memory provider interface. Not thread-safe.
  */
@@ -26,19 +28,20 @@ public interface DirectMemoryProvider {
      *
      * @param chunkSizes Chunk sizes.
      */
-    public void initialize(long[] chunkSizes);
+    void initialize(long[] chunkSizes);
 
     /**
      * Shuts down the provider.
      *
      * @param deallocate {@code True} to deallocate memory, {@code false} to allow memory reuse.
      */
-    public void shutdown(boolean deallocate);
+    void shutdown(boolean deallocate);
 
     /**
      * Attempts to allocate next memory region. Will return {@code null} if no more regions are available.
      *
      * @return Next memory region.
      */
-    public DirectMemoryRegion nextRegion();
+    @Nullable
+    DirectMemoryRegion nextRegion();
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java
index d776e21ea..dddf0eb51 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageHeader.java
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.pagememory.FullPageId;
 /**
  * Page header.
  */
-class PageHeader {
+public class PageHeader {
     /** Page marker. */
     public static final long PAGE_MARKER = 0x0000000000000001L;
 
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
index eb05a3617..cd3c6d2ba 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.pagememory.persistence;
 
 import static java.lang.System.lineSeparator;
+import static org.apache.ignite.internal.pagememory.FullPageId.NULL_PAGE;
 import static org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionConfigurationSchema.CLOCK_REPLACEMENT_MODE;
 import static org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionConfigurationSchema.RANDOM_LRU_REPLACEMENT_MODE;
 import static org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionConfigurationSchema.SEGMENTED_LRU_REPLACEMENT_MODE;
@@ -29,8 +30,10 @@ import static org.apache.ignite.internal.pagememory.io.PageIo.setPageId;
 import static org.apache.ignite.internal.pagememory.persistence.PageHeader.dirty;
 import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
 import static org.apache.ignite.internal.pagememory.persistence.PageHeader.isAcquired;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.readPageId;
 import static org.apache.ignite.internal.pagememory.persistence.PageHeader.tempBufferPointer;
 import static org.apache.ignite.internal.pagememory.persistence.PageHeader.writeTimestamp;
+import static org.apache.ignite.internal.pagememory.persistence.PagePool.SEGMENT_INDEX_MASK;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.effectivePageId;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageId;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.pageIndex;
@@ -43,25 +46,30 @@ import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapInt;
 import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
 import static org.apache.ignite.internal.util.GridUnsafe.copyMemory;
 import static org.apache.ignite.internal.util.GridUnsafe.getInt;
+import static org.apache.ignite.internal.util.GridUnsafe.getLong;
 import static org.apache.ignite.internal.util.GridUnsafe.putIntVolatile;
 import static org.apache.ignite.internal.util.GridUnsafe.setMemory;
 import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+import static org.apache.ignite.internal.util.GridUnsafe.zeroMemory;
 import static org.apache.ignite.internal.util.IgniteUtils.hash;
 import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
 import static org.apache.ignite.internal.util.IgniteUtils.readableSize;
 import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
 import static org.apache.ignite.internal.util.IgniteUtils.toHexString;
 import static org.apache.ignite.internal.util.OffheapReadWriteLock.TAG_LOCK_ALWAYS;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
@@ -75,7 +83,14 @@ import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
 import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
 import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
 import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
-import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointPages;
+import org.apache.ignite.internal.pagememory.persistence.replacement.ClockPageReplacementPolicyFactory;
+import org.apache.ignite.internal.pagememory.persistence.replacement.DelayedPageReplacementTracker;
+import org.apache.ignite.internal.pagememory.persistence.replacement.PageReplacementPolicy;
+import org.apache.ignite.internal.pagememory.persistence.replacement.PageReplacementPolicyFactory;
+import org.apache.ignite.internal.pagememory.persistence.replacement.RandomLruPageReplacementPolicyFactory;
+import org.apache.ignite.internal.pagememory.persistence.replacement.SegmentedLruPageReplacementPolicyFactory;
+import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.OffheapReadWriteLock;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -109,6 +124,13 @@ import org.jetbrains.annotations.TestOnly;
  */
 @SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
 public class PageMemoryImpl implements PageMemoryEx {
+    /**
+     * When set to {@code true} (default), pages are written to page store without holding segment lock (with delay).
+     * Because other thread may require exactly the same page to be loaded from store, reads are protected by locking.
+     */
+    // TODO: IGNITE-16350 Move to config or something else.
+    public static final String IGNITE_DELAYED_REPLACED_PAGE_WRITE = "IGNITE_DELAYED_REPLACED_PAGE_WRITE";
+
     /** Logger. */
     private static final IgniteLogger LOG = IgniteLogger.forClass(PageMemoryImpl.class);
 
@@ -116,10 +138,10 @@ public class PageMemoryImpl implements PageMemoryEx {
     public static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
 
     /** Invalid relative pointer value. */
-    static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
+    public static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
 
     /** Pointer which means that this page is outdated (for example, group was destroyed, partition eviction'd happened. */
-    static final long OUTDATED_REL_PTR = INVALID_REL_PTR + 1;
+    public static final long OUTDATED_REL_PTR = INVALID_REL_PTR + 1;
 
     /** Page lock offset. */
     public static final int PAGE_LOCK_OFFSET = 32;
@@ -148,7 +170,8 @@ public class PageMemoryImpl implements PageMemoryEx {
     /** Direct memory allocator. */
     private final DirectMemoryProvider directMemoryProvider;
 
-    /** Segments array. */
+    /** Segments array, {@code null} if not {@link #start() started}. */
+    @Nullable
     private volatile Segment[] segments;
 
     /** Lock for segments changes. */
@@ -168,21 +191,39 @@ public class PageMemoryImpl implements PageMemoryEx {
     /** Flag indicating page replacement started (rotation with disk), allocating new page requires freeing old one. */
     private volatile int pageReplacementWarned;
 
-    /** Segments sizes. */
+    /** Segments sizes, the last one being the {@link #checkpointPool checkpoint buffer} size. */
     private final long[] sizes;
 
     /** {@code False} if memory was not started or already stopped and is not supposed for any usage. */
     private volatile boolean started;
 
+    /** See {@link #safeToUpdate()}. */
+    private final AtomicBoolean safeToUpdate = new AtomicBoolean(true);
+
+    /** Checkpoint page pool, {@code null} if not {@link #start() started}. */
+    @Nullable
+    private volatile PagePool checkpointPool;
+
+    /**
+     * Delayed page replacement (rotation with disk) tracker. Because other thread may require exactly the same page to be loaded from
+     * store, reads are protected by locking, {@code null} if delayed write functionality is disabled.
+     */
+    @Nullable
+    private final DelayedPageReplacementTracker delayedPageReplacementTracker;
+
+    /** Flush dirty page closure. */
+    private final PageStoreWriter flushDirtyPage;
+
     /**
      * Constructor.
      *
      * @param directMemoryProvider Memory allocator to use.
      * @param dataRegionCfg Data region configuration.
      * @param ioRegistry IO registry.
-     * @param sizes Segments sizes.
+     * @param sizes Segments sizes, the last one being the checkpoint buffer size.
      * @param pmPageMgr Page store manager.
      * @param changeTracker Callback invoked to track changes in pages.
+     * @param flushDirtyPage Write callback invoked when a dirty page is removed for replacement.
      */
     public PageMemoryImpl(
             DirectMemoryProvider directMemoryProvider,
@@ -190,7 +231,8 @@ public class PageMemoryImpl implements PageMemoryEx {
             PageIoRegistry ioRegistry,
             long[] sizes,
             PageReadWriteManager pmPageMgr,
-            @Nullable PageChangeTracker changeTracker
+            @Nullable PageChangeTracker changeTracker,
+            PageStoreWriter flushDirtyPage
     ) {
         this.directMemoryProvider = directMemoryProvider;
         this.dataRegionCfg = dataRegionCfg.value();
@@ -198,6 +240,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         this.sizes = sizes;
         this.pmPageMgr = pmPageMgr;
         this.changeTracker = changeTracker;
+        this.flushDirtyPage = flushDirtyPage;
 
         int pageSize = this.dataRegionCfg.pageSize();
 
@@ -223,6 +266,9 @@ public class PageMemoryImpl implements PageMemoryEx {
             default:
                 throw new IgniteInternalException("Unexpected page replacement mode: " + replacementMode);
         }
+
+        delayedPageReplacementTracker = getBoolean(IGNITE_DELAYED_REPLACED_PAGE_WRITE, true)
+                ? new DelayedPageReplacementTracker(pageSize, flushDirtyPage, LOG, sizes.length - 1) : null;
     }
 
     /** {@inheritDoc} */
@@ -255,6 +301,8 @@ public class PageMemoryImpl implements PageMemoryEx {
 
             DirectMemoryRegion cpReg = regions.get(regs - 1);
 
+            checkpointPool = new PagePool(regs - 1, cpReg, sysPageSize, rwLock);
+
             long checkpointBuf = cpReg.size();
 
             long totalAllocated = 0;
@@ -542,8 +590,6 @@ public class PageMemoryImpl implements PageMemoryEx {
     /** {@inheritDoc} */
     @Override
     public long acquirePage(int grpId, long pageId, IoStatisticsHolder statHolder) throws IgniteInternalCheckedException {
-        assert started;
-
         return acquirePage(grpId, pageId, statHolder, false);
     }
 
@@ -750,23 +796,6 @@ public class PageMemoryImpl implements PageMemoryEx {
         return pageSize();
     }
 
-    /**
-     * Returns Max dirty ratio from the segments.
-     */
-    double getDirtyPagesRatio() {
-        if (segments == null) {
-            return 0;
-        }
-
-        double res = 0;
-
-        for (Segment segment : segments) {
-            res = Math.max(res, segment.getDirtyPagesRatio());
-        }
-
-        return res;
-    }
-
     /**
      * Returns total pages can be placed in all segments.
      */
@@ -1014,6 +1043,44 @@ public class PageMemoryImpl implements PageMemoryEx {
     private long postWriteLockPage(long absPtr, FullPageId fullId) {
         writeTimestamp(absPtr, coarseCurrentTimeMillis());
 
+        // Create a buffer copy if the page is scheduled for a checkpoint.
+        if (isInCheckpoint(fullId) && tempBufferPointer(absPtr) == INVALID_REL_PTR) {
+            long tmpRelPtr = checkpointPool.borrowOrAllocateFreePage(tag(fullId.pageId()));
+
+            if (tmpRelPtr == INVALID_REL_PTR) {
+                rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS);
+
+                throw new IgniteInternalException(
+                        "Failed to allocate temporary buffer for checkpoint (increase checkpointPageBufferSize configuration property): "
+                                + dataRegionCfg.name());
+            }
+
+            // Pin the page until checkpoint is not finished.
+            PageHeader.acquirePage(absPtr);
+
+            long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr);
+
+            copyMemory(
+                    null,
+                    absPtr + PAGE_OVERHEAD,
+                    null,
+                    tmpAbsPtr + PAGE_OVERHEAD,
+                    pageSize()
+            );
+
+            assert getType(tmpAbsPtr + PAGE_OVERHEAD) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(fullId.pageId());
+            assert getVersion(tmpAbsPtr + PAGE_OVERHEAD) != 0 :
+                    "Invalid state. Version is 0! pageId = " + hexLong(fullId.pageId());
+
+            dirty(absPtr, false);
+            tempBufferPointer(absPtr, tmpRelPtr);
+            // info for checkpoint buffer cleaner.
+            fullPageId(tmpAbsPtr, fullId);
+
+            assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480
+            assert getCrc(tmpAbsPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480
+        }
+
         assert getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO IGNITE-16612
 
         return absPtr + PAGE_OVERHEAD;
@@ -1042,7 +1109,7 @@ public class PageMemoryImpl implements PageMemoryEx {
             long pageId = getPageId(page + PAGE_OVERHEAD);
 
             try {
-                assert pageId != 0 : hexLong(PageHeader.readPageId(page));
+                assert pageId != 0 : hexLong(readPageId(page));
 
                 rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, tag(pageId));
 
@@ -1119,11 +1186,15 @@ public class PageMemoryImpl implements PageMemoryEx {
         boolean wasDirty = dirty(absPtr, dirty);
 
         if (dirty) {
+            // TODO: IGNITE-16898 Don't forget add assertion for checkpoint lock held by this thread
+
             if (!wasDirty || forceAdd) {
                 Segment seg = segment(pageId.groupId(), pageId.pageId());
 
                 if (seg.dirtyPages.add(pageId)) {
-                    seg.dirtyPagesCntr.incrementAndGet();
+                    if (seg.dirtyPagesCntr.incrementAndGet() >= seg.maxDirtyPages) {
+                        safeToUpdate.set(false);
+                    }
                 }
             }
         } else {
@@ -1141,7 +1212,14 @@ public class PageMemoryImpl implements PageMemoryEx {
         return segments[idx];
     }
 
-    private static int segmentIndex(int grpId, long pageId, int segments) {
+    /**
+     * Returns segment index.
+     *
+     * @param grpId Group ID.
+     * @param pageId Page ID.
+     * @param segments Number of segments.
+     */
+    public static int segmentIndex(int grpId, long pageId, int segments) {
         pageId = effectivePageId(pageId);
 
         // Take a prime number larger than total number of partitions.
@@ -1154,12 +1232,12 @@ public class PageMemoryImpl implements PageMemoryEx {
      * Returns a collection of all pages currently marked as dirty. Will create a collection copy.
      */
     @TestOnly
-    public Collection<FullPageId> dirtyPages() {
+    public Set<FullPageId> dirtyPages() {
         if (segments == null) {
-            return Collections.emptySet();
+            return Set.of();
         }
 
-        Collection<FullPageId> res = new HashSet<>((int) loadedPages());
+        Set<FullPageId> res = new HashSet<>((int) loadedPages());
 
         for (Segment seg : segments) {
             res.addAll(seg.dirtyPages);
@@ -1171,7 +1249,7 @@ public class PageMemoryImpl implements PageMemoryEx {
     /**
      * Page segment.
      */
-    class Segment extends ReentrantReadWriteLock {
+    public class Segment extends ReentrantReadWriteLock {
         /** Serial version uid. */
         private static final long serialVersionUID = 0L;
 
@@ -1200,11 +1278,15 @@ public class PageMemoryImpl implements PageMemoryEx {
         private long memPerRepl;
 
         /** Pages marked as dirty since the last checkpoint. */
-        private volatile Collection<FullPageId> dirtyPages = ConcurrentHashMap.newKeySet();
+        private volatile Set<FullPageId> dirtyPages = ConcurrentHashMap.newKeySet();
 
         /** Atomic size counter for {@link #dirtyPages}. */
         private final AtomicLong dirtyPagesCntr = new AtomicLong();
 
+        /** Wrapper of pages of current checkpoint. */
+        @Nullable
+        private volatile CheckpointPages checkpointPages;
+
         /** Maximum number of dirty pages. */
         private final long maxDirtyPages;
 
@@ -1270,13 +1352,6 @@ public class PageMemoryImpl implements PageMemoryEx {
             }
         }
 
-        /**
-         * Returns dirtyRatio to be compared with Throttle threshold.
-         */
-        private double getDirtyPagesRatio() {
-            return dirtyPagesCntr.doubleValue() / pages();
-        }
-
         /**
          * Returns max number of pages this segment can allocate.
          */
@@ -1357,12 +1432,39 @@ public class PageMemoryImpl implements PageMemoryEx {
             }
 
             if (isDirty(absPtr)) {
-                return false;
-            }
+                CheckpointPages checkpointPages = this.checkpointPages;
+                // Can evict a dirty page only if should be written by a checkpoint.
+                // These pages does not have tmp buffer.
+                if (checkpointPages != null && checkpointPages.allowToSave(fullPageId)) {
+                    assert pmPageMgr != null;
+
+                    PageStoreWriter saveDirtyPage = delayedPageReplacementTracker != null
+                            ? delayedPageReplacementTracker.delayedPageWrite() : flushDirtyPage;
 
-            loadedPages.remove(fullPageId.groupId(), fullPageId.effectivePageId());
+                    saveDirtyPage.writePage(
+                            fullPageId,
+                            wrapPointer(absPtr + PAGE_OVERHEAD, pageSize()),
+                            partGeneration(
+                                    fullPageId.groupId(),
+                                    partitionId(fullPageId.pageId())
+                            )
+                    );
+
+                    setDirty(fullPageId, absPtr, false, true);
+
+                    checkpointPages.markAsSaved(fullPageId);
+
+                    loadedPages.remove(fullPageId.groupId(), fullPageId.effectivePageId());
+
+                    return true;
+                }
+
+                return false;
+            } else {
+                loadedPages.remove(fullPageId.groupId(), fullPageId.effectivePageId());
 
-            return true;
+                return true;
+            }
         }
 
         /**
@@ -1386,10 +1488,29 @@ public class PageMemoryImpl implements PageMemoryEx {
 
             dirty(absPtr, false);
 
+            long tmpBufPtr = tempBufferPointer(absPtr);
+
+            if (tmpBufPtr != INVALID_REL_PTR) {
+                zeroMemory(checkpointPool.absolute(tmpBufPtr) + PAGE_OVERHEAD, pageSize());
+
+                tempBufferPointer(absPtr, INVALID_REL_PTR);
+
+                // We pinned the page when allocated the temp buffer, release it now.
+                PageHeader.releasePage(absPtr);
+
+                releaseCheckpointBufferPage(tmpBufPtr);
+            }
+
             if (rmv) {
                 loadedPages.remove(grpId, effectivePageId(pageId));
             }
 
+            CheckpointPages cpPages = checkpointPages;
+
+            if (cpPages != null) {
+                cpPages.markAsSaved(new FullPageId(pageId, grpId));
+            }
+
             Collection<FullPageId> dirtyPages = this.dirtyPages;
 
             if (dirtyPages != null) {
@@ -1549,9 +1670,16 @@ public class PageMemoryImpl implements PageMemoryEx {
         /**
          * Returns IO registry.
          */
-        PageIoRegistry ioRegistry() {
+        public PageIoRegistry ioRegistry() {
             return ioRegistry;
         }
+
+        /**
+         * Gets checkpoint pages.
+         */
+        public CheckpointPages checkpointPages() {
+            return checkpointPages;
+        }
     }
 
     private static int updateAtomicInt(long ptr, int delta) {
@@ -1568,7 +1696,7 @@ public class PageMemoryImpl implements PageMemoryEx {
 
     private static long updateAtomicLong(long ptr, long delta) {
         while (true) {
-            long old = GridUnsafe.getLong(ptr);
+            long old = getLong(ptr);
 
             long updated = old + delta;
 
@@ -1598,4 +1726,337 @@ public class PageMemoryImpl implements PageMemoryEx {
          */
         void apply(long page, FullPageId fullPageId, PageMemoryEx pageMemoryEx);
     }
+
+    /**
+     * Heuristic method which allows a thread to check if it is safe to start memory structure modifications in regard with checkpointing.
+     * May return false-negative result during or after partition eviction.
+     *
+     * @return {@code False} if there are too many dirty pages and a thread should wait for a checkpoint to begin.
+     */
+    public boolean safeToUpdate() {
+        if (segments != null) {
+            return safeToUpdate.get();
+        }
+
+        return true;
+    }
+
+    /**
+     * Returns number of pages used in checkpoint buffer.
+     */
+    public int usedCheckpointBufferPages() {
+        PagePool checkpointPool = this.checkpointPool;
+
+        return checkpointPool == null ? 0 : checkpointPool.size();
+    }
+
+    /**
+     * Returns max number of pages in checkpoint buffer.
+     */
+    public int maxCheckpointBufferPages() {
+        PagePool checkpointPool = this.checkpointPool;
+
+        return checkpointPool == null ? 0 : checkpointPool.pages();
+    }
+
+    private void releaseCheckpointBufferPage(long tmpBufPtr) {
+        checkpointPool.releaseFreePage(tmpBufPtr);
+    }
+
+    /**
+     * Returns {@code true} if it was added to the checkpoint list.
+     *
+     * @param pageId Page ID to check if it was added to the checkpoint list.
+     */
+    boolean isInCheckpoint(FullPageId pageId) {
+        Segment seg = segment(pageId.groupId(), pageId.pageId());
+
+        CheckpointPages pages0 = seg.checkpointPages;
+
+        return pages0 != null && pages0.contains(pageId);
+    }
+
+    /**
+     * Returns {@code true} if remove successfully.
+     *
+     * @param fullPageId Page ID to clear.
+     */
+    boolean clearCheckpoint(FullPageId fullPageId) {
+        Segment seg = segment(fullPageId.groupId(), fullPageId.pageId());
+
+        CheckpointPages pages0 = seg.checkpointPages;
+
+        assert pages0 != null;
+
+        return pages0.markAsSaved(fullPageId);
+    }
+
+    /**
+     * Makes a full copy of the dirty page for checkpointing, then marks the page as not dirty.
+     *
+     * @param absPtr Absolute page pointer.
+     * @param fullId Full page id.
+     * @param buf Buffer for copy page content for future write via {@link PageStoreWriter}.
+     * @param pageSingleAcquire Page is acquired only once. We don't pin the page second time (until page will not be copied) in case
+     *      checkpoint temporary buffer is used.
+     * @param pageStoreWriter Checkpoint page writer.
+     */
+    private void copyPageForCheckpoint(
+            long absPtr,
+            FullPageId fullId,
+            ByteBuffer buf,
+            int tag,
+            boolean pageSingleAcquire,
+            PageStoreWriter pageStoreWriter
+    ) throws IgniteInternalCheckedException {
+        assert absPtr != 0;
+        assert isAcquired(absPtr) || !isInCheckpoint(fullId);
+
+        // Exception protection flag.
+        // No need to write if exception occurred.
+        boolean canWrite = false;
+
+        boolean locked = rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+        if (!locked) {
+            // We release the page only once here because this page will be copied sometime later and
+            // will be released properly then.
+            if (!pageSingleAcquire) {
+                PageHeader.releasePage(absPtr);
+            }
+
+            buf.clear();
+
+            if (isInCheckpoint(fullId)) {
+                pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG);
+            }
+
+            return;
+        }
+
+        if (!clearCheckpoint(fullId)) {
+            rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+            if (!pageSingleAcquire) {
+                PageHeader.releasePage(absPtr);
+            }
+
+            return;
+        }
+
+        try {
+            long tmpRelPtr = tempBufferPointer(absPtr);
+
+            if (tmpRelPtr != INVALID_REL_PTR) {
+                tempBufferPointer(absPtr, INVALID_REL_PTR);
+
+                long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr);
+
+                copyInBuffer(tmpAbsPtr, buf);
+
+                fullPageId(tmpAbsPtr, NULL_PAGE);
+
+                zeroMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize());
+
+                releaseCheckpointBufferPage(tmpRelPtr);
+
+                // Need release again because we pin page when resolve abs pointer,
+                // and page did not have tmp buffer page.
+                if (!pageSingleAcquire) {
+                    PageHeader.releasePage(absPtr);
+                }
+            } else {
+                copyInBuffer(absPtr, buf);
+
+                dirty(absPtr, false);
+            }
+
+            assert getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(fullId.pageId());
+            assert getVersion(buf) != 0 : "Invalid state. Version is 0! pageId = " + hexLong(fullId.pageId());
+
+            canWrite = true;
+        } finally {
+            rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, TAG_LOCK_ALWAYS);
+
+            if (canWrite) {
+                buf.rewind();
+
+                pageStoreWriter.writePage(fullId, buf, tag);
+
+                buf.rewind();
+            }
+
+            // We pinned the page either when allocated the temp buffer, or when resolved abs pointer.
+            // Must release the page only after write unlock.
+            PageHeader.releasePage(absPtr);
+        }
+    }
+
+    /**
+     * Prepare page for write during checkpoint. {@link PageStoreWriter} will be called when the page will be ready to write.
+     *
+     * @param fullId Page ID to get byte buffer for. The page ID must be present in the collection returned by the {@link
+     * #beginCheckpoint(CompletableFuture)} method call.
+     * @param buf Temporary buffer to write changes into.
+     * @param pageStoreWriter Checkpoint page write context.
+     * @throws IgniteInternalCheckedException If failed to obtain page data.
+     */
+    public void checkpointWritePage(
+            FullPageId fullId,
+            ByteBuffer buf,
+            PageStoreWriter pageStoreWriter
+    ) throws IgniteInternalCheckedException {
+        assert buf.remaining() == pageSize();
+
+        Segment seg = segment(fullId.groupId(), fullId.pageId());
+
+        long absPtr = 0;
+
+        long relPtr;
+
+        int tag;
+
+        boolean pageSingleAcquire = false;
+
+        seg.readLock().lock();
+
+        try {
+            if (!isInCheckpoint(fullId)) {
+                return;
+            }
+
+            relPtr = resolveRelativePointer(seg, fullId, tag = generationTag(seg, fullId));
+
+            // Page may have been cleared during eviction. We have nothing to do in this case.
+            if (relPtr == INVALID_REL_PTR) {
+                return;
+            }
+
+            if (relPtr != OUTDATED_REL_PTR) {
+                absPtr = seg.absolute(relPtr);
+
+                // Pin the page until page will not be copied. This helpful to prevent page replacement of this page.
+                if (tempBufferPointer(absPtr) == INVALID_REL_PTR) {
+                    PageHeader.acquirePage(absPtr);
+                } else {
+                    pageSingleAcquire = true;
+                }
+            }
+        } finally {
+            seg.readLock().unlock();
+        }
+
+        if (relPtr == OUTDATED_REL_PTR) {
+            seg.writeLock().lock();
+
+            try {
+                // Double-check.
+                relPtr = resolveRelativePointer(seg, fullId, generationTag(seg, fullId));
+
+                if (relPtr == INVALID_REL_PTR) {
+                    return;
+                }
+
+                if (relPtr == OUTDATED_REL_PTR) {
+                    relPtr = seg.refreshOutdatedPage(
+                            fullId.groupId(),
+                            fullId.effectivePageId(),
+                            true
+                    );
+
+                    seg.pageReplacementPolicy.onRemove(relPtr);
+
+                    seg.pool.releaseFreePage(relPtr);
+                }
+
+                return;
+            } finally {
+                seg.writeLock().unlock();
+            }
+        }
+
+        copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, pageStoreWriter);
+    }
+
+    /**
+     * Get arbitrary page from cp buffer.
+     */
+    public FullPageId pullPageFromCpBuffer() {
+        long idx = getLong(checkpointPool.lastAllocatedIdxPtr);
+
+        long lastIdx = ThreadLocalRandom.current().nextLong(idx / 2, idx);
+
+        while (--lastIdx > 1) {
+            assert (lastIdx & SEGMENT_INDEX_MASK) == 0L;
+
+            long relative = checkpointPool.relative(lastIdx);
+
+            long freePageAbsPtr = checkpointPool.absolute(relative);
+
+            FullPageId fullPageId = fullPageId(freePageAbsPtr);
+
+            if (fullPageId.pageId() == NULL_PAGE.pageId() || fullPageId.groupId() == NULL_PAGE.groupId()) {
+                continue;
+            }
+
+            if (!isInCheckpoint(fullPageId)) {
+                continue;
+            }
+
+            return fullPageId;
+        }
+
+        return NULL_PAGE;
+    }
+
+    /**
+     * Gets a collection of dirty page IDs since the last checkpoint. If a dirty page is being written after the checkpointing operation
+     * begun, the modifications will be written to a temporary buffer which will be flushed to the main memory after the checkpointing
+     * finished. This method must be called when no concurrent operations on pages are performed.
+     *
+     * @param allowToReplace The sign which allows replacing pages from a checkpoint by page replacer.
+     * @return Collection view of dirty page IDs.
+     * @throws IgniteInternalException If checkpoint has been already started and was not finished.
+     */
+    public Collection<FullPageId> beginCheckpoint(CompletableFuture<?> allowToReplace) throws IgniteInternalException {
+        if (segments == null) {
+            return List.of();
+        }
+
+        Collection<FullPageId>[] collections = new Collection[segments.length];
+
+        for (int i = 0; i < segments.length; i++) {
+            Segment seg = segments[i];
+
+            if (seg.checkpointPages != null) {
+                throw new IgniteInternalException("Failed to begin checkpoint (it is already in progress).");
+            }
+
+            Set<FullPageId> dirtyPages = seg.dirtyPages;
+            collections[i] = dirtyPages;
+
+            seg.checkpointPages = new CheckpointPages(dirtyPages, allowToReplace);
+
+            seg.resetDirtyPages();
+        }
+
+        safeToUpdate.set(true);
+
+        return CollectionUtils.union(collections);
+    }
+
+    /**
+     * Finishes checkpoint operation.
+     */
+    public void finishCheckpoint() {
+        if (segments == null) {
+            return;
+        }
+
+        synchronized (segmentsLock) {
+            for (Segment seg : segments) {
+                seg.checkpointPages = null;
+            }
+        }
+    }
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
new file mode 100644
index 000000000..99921e9dd
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Interface for write page to {@link PageStore}.
+ */
+// TODO: IGNITE-15818 Maybe refactor.
+public interface PageStoreWriter {
+    /**
+     * Callback for write page. {@link PageMemoryImpl} will copy page content to buffer before call.
+     *
+     * @param fullPageId Page ID to get byte buffer for. The page ID must be present in the collection returned by the {@link
+     * PageMemoryImpl#beginCheckpoint(CompletableFuture)} method call.
+     * @param buf Temporary buffer to write changes into.
+     * @param tag {@code Partition generation} if data was read, {@code null} otherwise (data already saved to storage).
+     * @throws IgniteInternalCheckedException If write page failed.
+     */
+    void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteInternalCheckedException;
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
new file mode 100644
index 000000000..4c48a9770
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPages.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
+
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * View of pages which should be stored during current checkpoint.
+ */
+public class CheckpointPages {
+    private final Set<FullPageId> segCheckpointPages;
+
+    private final CompletableFuture<?> allowToReplace;
+
+    /**
+     * Constructor.
+     *
+     * @param pages Pages which would be stored to disk in current checkpoint, does not copy the set.
+     * @param replaceFuture The sign which allows replacing pages from a checkpoint by page replacer.
+     */
+    public CheckpointPages(Set<FullPageId> pages, CompletableFuture<?> replaceFuture) {
+        segCheckpointPages = pages;
+        allowToReplace = replaceFuture;
+    }
+
+    /**
+     * Returns {@code true} If fullPageId is allowable to store to disk.
+     *
+     * @param fullPageId Page id for checking.
+     * @throws IgniteInternalCheckedException If the waiting sign which allows replacing pages from a checkpoint by page replacer fails.
+     */
+    public boolean allowToSave(FullPageId fullPageId) throws IgniteInternalCheckedException {
+        try {
+            // Uninterruptibly is important because otherwise in case of interrupt of client thread node would be stopped.
+            getUninterruptibly(allowToReplace);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalCheckedException(e.getCause());
+        } catch (CancellationException e) {
+            throw new IgniteInternalCheckedException(e);
+        }
+
+        return segCheckpointPages.contains(fullPageId);
+    }
+
+    /**
+     * Returns {@code true} If fullPageId is candidate to stored to disk by current checkpoint.
+     *
+     * @param fullPageId Page id for checking.
+     */
+    public boolean contains(FullPageId fullPageId) {
+        return segCheckpointPages.contains(fullPageId);
+    }
+
+    /**
+     * Returns {@code true} if it is marking was successful.
+     *
+     * @param fullPageId Page id which should be marked as saved to disk.
+     */
+    public boolean markAsSaved(FullPageId fullPageId) {
+        return segCheckpointPages.remove(fullPageId);
+    }
+
+    /**
+     * Returns size of all pages in current checkpoint.
+     */
+    public int size() {
+        return segCheckpointPages.size();
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
similarity index 59%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
copy to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
index 0eeb7dcc1..7c9d5eb89 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
@@ -15,21 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+
+import java.util.concurrent.CompletableFuture;
 
 /**
- * {@link ClockPageReplacementPolicy} factory.
+ * Represents information of progress of a current checkpoint and allows obtaining future to wait for a particular checkpoint state.
  */
-public class ClockPageReplacementPolicyFactory implements PageReplacementPolicyFactory {
-    /** {@inheritDoc} */
-    @Override
-    public long requiredMemory(int pagesCnt) {
-        return ClockPageReplacementFlags.requiredMemory(pagesCnt);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public PageReplacementPolicy create(PageMemoryImpl.Segment seg, long ptr, int pagesCnt) {
-        return new ClockPageReplacementPolicy(seg, ptr, pagesCnt);
-    }
+// TODO: IGNITE-16898 Continue porting the code
+public interface CheckpointProgress {
+    /**
+     * Returns future which can be used for detection when current checkpoint reaches the specific state.
+     */
+    CompletableFuture<?> futureFor(CheckpointState state);
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadLockTimeoutException.java
similarity index 58%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
copy to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadLockTimeoutException.java
index 651cc14df..300fd67a2 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadLockTimeoutException.java
@@ -15,23 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+import org.apache.ignite.lang.IgniteInternalException;
 
 /**
- * {@link RandomLruPageReplacementPolicy} factory.
+ * Indicates checkpoint read lock acquisition failure which did not lead to node invalidation.
  */
-public class RandomLruPageReplacementPolicyFactory implements PageReplacementPolicyFactory {
-    /** {@inheritDoc} */
-    @Override
-    public long requiredMemory(int pagesCnt) {
-        return 0;
-    }
+// TODO: IGNITE-16899 Change to inherit from IgniteInternalCheckedException
+public class CheckpointReadLockTimeoutException extends IgniteInternalException {
+    private static final long serialVersionUID = 0L;
 
-    /** {@inheritDoc} */
-    @Override
-    public PageReplacementPolicy create(Segment seg, long ptr, int pagesCnt) {
-        return new RandomLruPageReplacementPolicy(seg);
+    /**
+     * Constructor.
+     *
+     * @param msg Error message.
+     */
+    CheckpointReadLockTimeoutException(String msg) {
+        super(msg);
     }
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java
new file mode 100644
index 000000000..339f9a28f
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteInternalException;
+
+/**
+ * Wrapper of the classic read write lock with checkpoint features.
+ */
+public class CheckpointReadWriteLock {
+    private final ThreadLocal<Integer> checkpointReadLockHoldCount = ThreadLocal.withInitial(() -> 0);
+
+    /**
+     * Any thread with a such prefix is managed by the checkpoint. So some conditions can rely on it(ex. we don't need a checkpoint lock
+     * there because checkpoint is already held write lock).
+     */
+    // TODO: IGNITE-16898 I think it needs to be redone or relocated
+    static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner";
+
+    /** Checkpoint lock. */
+    private final ReentrantReadWriteLockWithTracking checkpointLock;
+
+    /**
+     * Constructor.
+     *
+     * @param checkpointLock Checkpoint lock.
+     */
+    public CheckpointReadWriteLock(ReentrantReadWriteLockWithTracking checkpointLock) {
+        this.checkpointLock = checkpointLock;
+    }
+
+    /**
+     * Gets the checkpoint read lock.
+     *
+     * @throws IgniteInternalException If failed.
+     */
+    public void readLock() {
+        if (isWriteLockHeldByCurrentThread()) {
+            return;
+        }
+
+        checkpointLock.readLock().lock();
+
+        checkpointReadLockHoldCount.set(checkpointReadLockHoldCount.get() + 1);
+    }
+
+    /**
+     * Tries to get a checkpoint read lock.
+     *
+     * @param timeout – Time to wait for the read lock.
+     * @param unit – Time unit of the timeout argument.
+     * @throws IgniteInternalException If failed.
+     */
+    public boolean tryReadLock(long timeout, TimeUnit unit) throws InterruptedException {
+        if (isWriteLockHeldByCurrentThread()) {
+            return true;
+        }
+
+        boolean res = checkpointLock.readLock().tryLock(timeout, unit);
+
+        if (res) {
+            checkpointReadLockHoldCount.set(checkpointReadLockHoldCount.get() + 1);
+        }
+
+        return res;
+    }
+
+    /**
+     * Tries to get a checkpoint read lock.
+     *
+     * @return {@code True} if the checkpoint read lock is acquired.
+     */
+    public boolean tryReadLock() {
+        if (isWriteLockHeldByCurrentThread()) {
+            return true;
+        }
+
+        boolean res = checkpointLock.readLock().tryLock();
+
+        if (res) {
+            checkpointReadLockHoldCount.set(checkpointReadLockHoldCount.get() + 1);
+        }
+
+        return res;
+    }
+
+    /**
+     * Returns {@code true} if checkpoint lock is held by current thread.
+     */
+    public boolean checkpointLockIsHeldByThread() {
+        return checkpointLock.isWriteLockedByCurrentThread()
+                || checkpointReadLockHoldCount.get() > 0
+                || Thread.currentThread().getName().startsWith(CHECKPOINT_RUNNER_THREAD_PREFIX);
+    }
+
+    /**
+     * Releases the checkpoint read lock.
+     */
+    public void readUnlock() {
+        if (isWriteLockHeldByCurrentThread()) {
+            return;
+        }
+
+        checkpointLock.readLock().unlock();
+
+        checkpointReadLockHoldCount.set(checkpointReadLockHoldCount.get() - 1);
+    }
+
+    /**
+     * Takes the checkpoint write lock.
+     */
+    public void writeLock() {
+        checkpointLock.writeLock().lock();
+    }
+
+    /**
+     * Releases the checkpoint write lock.
+     */
+    public void writeUnlock() {
+        checkpointLock.writeLock().unlock();
+    }
+
+    /**
+     * Returns {@code true} if current thread hold write lock.
+     */
+    public boolean isWriteLockHeldByCurrentThread() {
+        return checkpointLock.isWriteLockedByCurrentThread();
+    }
+
+    /**
+     * Returns the number of reentrant read holds on this lock by the current thread. A reader thread has a hold on a lock for each lock
+     * action that is not matched by an unlock action.
+     */
+    public int getReadHoldCount() {
+        return checkpointLock.getReadHoldCount();
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java
similarity index 54%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
copy to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java
index 651cc14df..b246e7736 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java
@@ -15,23 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
-
-import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
 /**
- * {@link RandomLruPageReplacementPolicy} factory.
+ * Possible checkpoint states. Ordinal is important. Every next state follows the previous one.
  */
-public class RandomLruPageReplacementPolicyFactory implements PageReplacementPolicyFactory {
-    /** {@inheritDoc} */
-    @Override
-    public long requiredMemory(int pagesCnt) {
-        return 0;
-    }
+// TODO: IGNITE-16898 Review states
+public enum CheckpointState {
+    /** Checkpoint is waiting to execution. **/
+    SCHEDULED,
+
+    /** Checkpoint was awakened and it is preparing to start. **/
+    LOCK_TAKEN,
+
+    /** Dirty pages snapshot has been taken. **/
+    PAGE_SNAPSHOT_TAKEN,
+
+    /** Checkpoint counted the pages and write lock was released. **/
+    LOCK_RELEASED,
+
+    /** Checkpoint marker was stored to disk. **/
+    MARKER_STORED_TO_DISK,
 
-    /** {@inheritDoc} */
-    @Override
-    public PageReplacementPolicy create(Segment seg, long ptr, int pagesCnt) {
-        return new RandomLruPageReplacementPolicy(seg);
-    }
+    /** Checkpoint was finished. **/
+    FINISHED
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java
new file mode 100644
index 000000000..d19f26be0
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLock.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.getUninterruptibly;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Checkpoint lock for outer usage which should be used to protect data during writing to memory. It contains complex logic for the correct
+ * taking of inside checkpoint lock(timeout, force checkpoint, etc.).
+ */
+public class CheckpointTimeoutLock implements IgniteComponent {
+    /** Ignite logger. */
+    protected final IgniteLogger log;
+
+    /**
+     * {@link PageMemoryImpl#safeToUpdate() Safe update check} for all page memories, should return {@code false} if there are many dirty
+     * pages and a checkpoint is needed.
+     */
+    private final BooleanSupplier safeToUpdateAllPageMemories;
+
+    /** Internal checkpoint lock. */
+    private final CheckpointReadWriteLock checkpointReadWriteLock;
+
+    /** Service for triggering the checkpoint. */
+    private final Checkpointer checkpointer;
+
+    /** Timeout for checkpoint read lock acquisition in milliseconds. */
+    private volatile long checkpointReadLockTimeout;
+
+    /** Stop flag. */
+    private boolean stop;
+
+    /**
+     * Constructor.
+     *
+     * @param log Logger.
+     * @param checkpointReadWriteLock Checkpoint read-write lock.
+     * @param checkpointReadLockTimeout Timeout for checkpoint read lock acquisition in milliseconds.
+     * @param safeToUpdateAllPageMemories {@link PageMemoryImpl#safeToUpdate() Safe update check} for all page memories, should return
+     *      {@code false} if there are many dirty pages and a checkpoint is needed.
+     * @param checkpointer Service for triggering the checkpoint.
+     */
+    public CheckpointTimeoutLock(
+            IgniteLogger log,
+            CheckpointReadWriteLock checkpointReadWriteLock,
+            long checkpointReadLockTimeout,
+            BooleanSupplier safeToUpdateAllPageMemories,
+            Checkpointer checkpointer
+    ) {
+        this.log = log;
+        this.checkpointReadWriteLock = checkpointReadWriteLock;
+        this.checkpointReadLockTimeout = checkpointReadLockTimeout;
+        this.safeToUpdateAllPageMemories = safeToUpdateAllPageMemories;
+        this.checkpointer = checkpointer;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        stop = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() {
+        checkpointReadWriteLock.writeLock();
+
+        try {
+            stop = true;
+        } finally {
+            checkpointReadWriteLock.writeUnlock();
+        }
+    }
+
+    /**
+     * Gets the checkpoint read lock.
+     *
+     * @throws IgniteInternalException If failed.
+     * @throws CheckpointReadLockTimeoutException If failed to get checkpoint read lock by timeout.
+     */
+    public void checkpointReadLock() {
+        if (checkpointReadWriteLock.isWriteLockHeldByCurrentThread()) {
+            return;
+        }
+
+        long timeout = checkpointReadLockTimeout;
+
+        long start = coarseCurrentTimeMillis();
+
+        boolean interrupted = false;
+
+        try {
+            for (; ; ) {
+                try {
+                    if (timeout > 0 && (coarseCurrentTimeMillis() - start) >= timeout) {
+                        failCheckpointReadLock();
+                    }
+
+                    try {
+                        if (timeout > 0) {
+                            if (!checkpointReadWriteLock.tryReadLock(timeout - (coarseCurrentTimeMillis() - start), MILLISECONDS)) {
+                                failCheckpointReadLock();
+                            }
+                        } else {
+                            checkpointReadWriteLock.readLock();
+                        }
+                    } catch (InterruptedException e) {
+                        interrupted = true;
+
+                        continue;
+                    }
+
+                    if (stop) {
+                        checkpointReadWriteLock.readUnlock();
+
+                        throw new IgniteInternalException(new NodeStoppingException("Failed to get checkpoint read lock"));
+                    }
+
+                    if (checkpointReadWriteLock.getReadHoldCount() > 1
+                            || safeToUpdateAllPageMemories.getAsBoolean()
+                            || checkpointer.runner() == null
+                    ) {
+                        break;
+                    } else {
+                        // If the checkpoint is triggered outside the lock,
+                        // it could cause the checkpoint to fire again for the same reason
+                        // (due to a data race between collecting dirty pages and triggering the checkpoint).
+                        CheckpointProgress checkpoint = checkpointer.scheduleCheckpoint(0, "too many dirty pages");
+
+                        checkpointReadWriteLock.readUnlock();
+
+                        if (timeout > 0 && coarseCurrentTimeMillis() - start >= timeout) {
+                            failCheckpointReadLock();
+                        }
+
+                        try {
+                            getUninterruptibly(checkpoint.futureFor(LOCK_RELEASED));
+                        } catch (ExecutionException e) {
+                            throw new IgniteInternalException("Failed to wait for checkpoint begin", e.getCause());
+                        } catch (CancellationException e) {
+                            throw new IgniteInternalException("Failed to wait for checkpoint begin", e);
+                        }
+                    }
+                } catch (CheckpointReadLockTimeoutException e) {
+                    log.error(e.getMessage(), e);
+
+                    throw e;
+
+                    // TODO: IGNITE-16899 After the implementation of FailureProcessor,
+                    // by analogy with 2.0, we need to reset the timeout and try again
+                    //timeout = 0;
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Tries to get a checkpoint read lock.
+     *
+     * @return {@code True} if the checkpoint read lock is acquired.
+     */
+    public boolean tryCheckpointReadLock() {
+        return checkpointReadWriteLock.tryReadLock();
+    }
+
+    /**
+     * Releases the checkpoint read lock.
+     */
+    public void checkpointReadUnlock() {
+        checkpointReadWriteLock.readUnlock();
+    }
+
+    /**
+     * Returns timeout for checkpoint read lock acquisition in milliseconds.
+     */
+    public long checkpointReadLockTimeout() {
+        return checkpointReadLockTimeout;
+    }
+
+    /**
+     * Sets timeout for checkpoint read lock acquisition.
+     *
+     * @param val New timeout in milliseconds, non-positive value denotes infinite timeout.
+     */
+    public void checkpointReadLockTimeout(long val) {
+        checkpointReadLockTimeout = val;
+    }
+
+    /**
+     * Returns {@code true} if checkpoint lock is held by current thread.
+     */
+    public boolean checkpointLockIsHeldByThread() {
+        return checkpointReadWriteLock.checkpointLockIsHeldByThread();
+    }
+
+    private void failCheckpointReadLock() throws CheckpointReadLockTimeoutException {
+        // TODO: IGNITE-16899 After the implementation of FailureProcessor, by analogy with 2.0,
+        // either fail the node or try acquire read lock again by throwing an CheckpointReadLockTimeoutException
+
+        throw new CheckpointReadLockTimeoutException("Checkpoint read lock acquisition has been timed out");
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
similarity index 52%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java
copy to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index 625c3579b..94056b669 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -15,27 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Page replacement policy factory.
+ * Empty.
  */
-public interface PageReplacementPolicyFactory {
+// TODO: IGNITE-16898 Continue porting the code
+public abstract class Checkpointer {
     /**
-     * Calculates amount of memory required to service {@code pagesCnt} pages.
+     * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
      *
-     * @param pagesCnt Pages count.
+     * @param delayFromNow Delay from now in milliseconds.
+     * @param reason Wakeup reason.
+     * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
      */
-    long requiredMemory(int pagesCnt);
+    public abstract CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason);
 
     /**
-     * Create page replacement policy.
-     *
-     * @param seg Page memory segment.
-     * @param ptr Pointer to memory region.
-     * @param pagesCnt Pages count.
+     * Returns runner thread, {@code null} if the worker has not yet started executing.
      */
-    PageReplacementPolicy create(Segment seg, long ptr, int pagesCnt);
+    public abstract @Nullable Thread runner();
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTracking.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTracking.java
new file mode 100644
index 000000000..da5a9a44b
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTracking.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.lang.ThreadLocal.withInitial;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * ReentrantReadWriteLock adapter with readLock tracking.
+ */
+public class ReentrantReadWriteLockWithTracking implements ReadWriteLock {
+    /** Delegate instance. */
+    private final ReentrantReadWriteLock delegate = new ReentrantReadWriteLock();
+
+    /** Read lock holder. */
+    private final ReentrantReadWriteLock.ReadLock readLock;
+
+    /** Write lock holder. */
+    private final ReentrantReadWriteLock.WriteLock writeLock = new ReentrantReadWriteLock.WriteLock(delegate) {
+    };
+
+    /**
+     * ReentrantReadWriteLock wrapper, provides additional trace info on {@link ReadLockWithTracking#unlock()} method, if someone holds the
+     * lock more than {@code readLockThreshold}.
+     *
+     * @param log Ignite logger.
+     * @param readLockThreshold ReadLock threshold timeout.
+     */
+    public ReentrantReadWriteLockWithTracking(IgniteLogger log, long readLockThreshold) {
+        readLock = new ReadLockWithTracking(delegate, log, readLockThreshold);
+    }
+
+    /**
+     * Delegator implementation.
+     */
+    public ReentrantReadWriteLockWithTracking() {
+        readLock = new ReentrantReadWriteLock.ReadLock(delegate) {
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ReentrantReadWriteLock.ReadLock readLock() {
+        return readLock;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public ReentrantReadWriteLock.WriteLock writeLock() {
+        return writeLock;
+    }
+
+    /**
+     * Returns {@code true} if the current thread holds write lock and {@code false} otherwise.
+     */
+    public boolean isWriteLockedByCurrentThread() {
+        return delegate.isWriteLockedByCurrentThread();
+    }
+
+    /**
+     * Returns the number of reentrant read holds on this lock by the current thread. A reader thread has a hold on a lock for each lock
+     * action that is not matched by an unlock action.
+     */
+    public int getReadHoldCount() {
+        return delegate.getReadHoldCount();
+    }
+
+    /**
+     * Returns the number of read locks held for this lock. This method is designed for use in monitoring system state, not for
+     * synchronization control.
+     */
+    public int getReadLockCount() {
+        return delegate.getReadLockCount();
+    }
+
+    /**
+     * Tracks long read lock holders.
+     */
+    private static class ReadLockWithTracking extends ReentrantReadWriteLock.ReadLock {
+        private static final long serialVersionUID = 0L;
+
+        private final ThreadLocal<IgniteBiTuple<Integer, Long>> readLockHolderTs = withInitial(() -> new IgniteBiTuple<>(0, 0L));
+
+        private final IgniteLogger log;
+
+        private final long readLockThreshold;
+
+        /**
+         * Constructor.
+         *
+         * @param lock Outer lock object.
+         * @param log Logger.
+         * @param readLockThreshold Lock print threshold in milliseconds.
+         */
+        protected ReadLockWithTracking(ReentrantReadWriteLock lock, IgniteLogger log, long readLockThreshold) {
+            super(lock);
+
+            this.log = log;
+
+            this.readLockThreshold = readLockThreshold;
+        }
+
+        private void inc() {
+            IgniteBiTuple<Integer, Long> val = readLockHolderTs.get();
+
+            int cntr = val.get1();
+
+            if (cntr == 0) {
+                val.set2(coarseCurrentTimeMillis());
+            }
+
+            val.set1(++cntr);
+
+            readLockHolderTs.set(val);
+        }
+
+        private void dec() {
+            IgniteBiTuple<Integer, Long> val = readLockHolderTs.get();
+
+            int cntr = val.get1();
+
+            if (--cntr == 0) {
+                long timeout = coarseCurrentTimeMillis() - val.get2();
+
+                if (timeout > readLockThreshold) {
+                    log.warn(
+                            "ReadLock held the lock more than " + timeout + " ms.",
+                            new IgniteInternalException()
+                    );
+                }
+            }
+
+            val.set1(cntr);
+
+            readLockHolderTs.set(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void lock() {
+            super.lock();
+
+            inc();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void lockInterruptibly() throws InterruptedException {
+            super.lockInterruptibly();
+
+            inc();
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean tryLock() {
+            if (super.tryLock()) {
+                inc();
+
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
+            if (super.tryLock(timeout, unit)) {
+                inc();
+
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void unlock() {
+            super.unlock();
+
+            dec();
+        }
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementFlags.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementFlags.java
similarity index 96%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementFlags.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementFlags.java
index 339ab2d21..2164ba389 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementFlags.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementFlags.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
 
 import static org.apache.ignite.internal.util.GridUnsafe.compareAndSwapLong;
 import static org.apache.ignite.internal.util.GridUnsafe.getLong;
@@ -43,7 +43,7 @@ public class ClockPageReplacementFlags {
      * @param totalPagesCnt Total pages count.
      * @param memPtr Pointer to memory region.
      */
-    ClockPageReplacementFlags(int totalPagesCnt, long memPtr) {
+    public ClockPageReplacementFlags(int totalPagesCnt, long memPtr) {
         pagesCnt = totalPagesCnt;
         flagsPtr = memPtr;
 
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicy.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementPolicy.java
similarity index 93%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicy.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementPolicy.java
index ea29aed72..6926e7b47 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicy.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementPolicy.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
 
 import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
 import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.INVALID_REL_PTR;
@@ -23,6 +23,7 @@ import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.O
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
 
 import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.LoadedPagesMap;
 import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 
@@ -40,7 +41,7 @@ public class ClockPageReplacementPolicy extends PageReplacementPolicy {
      * @param ptr Pointer to memory region.
      * @param pagesCnt Pages count.
      */
-    protected ClockPageReplacementPolicy(Segment seg, long ptr, int pagesCnt) {
+    public ClockPageReplacementPolicy(Segment seg, long ptr, int pagesCnt) {
         super(seg);
 
         flags = new ClockPageReplacementFlags(pagesCnt, ptr);
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementPolicyFactory.java
similarity index 84%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementPolicyFactory.java
index 0eeb7dcc1..91b3a53c3 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/ClockPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/ClockPageReplacementPolicyFactory.java
@@ -15,7 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
+
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
 
 /**
  * {@link ClockPageReplacementPolicy} factory.
@@ -29,7 +31,7 @@ public class ClockPageReplacementPolicyFactory implements PageReplacementPolicyF
 
     /** {@inheritDoc} */
     @Override
-    public PageReplacementPolicy create(PageMemoryImpl.Segment seg, long ptr, int pagesCnt) {
+    public PageReplacementPolicy create(Segment seg, long ptr, int pagesCnt) {
         return new ClockPageReplacementPolicy(seg, ptr, pagesCnt);
     }
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageStoreWrite.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageStoreWrite.java
new file mode 100644
index 000000000..500d51892
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedDirtyPageStoreWrite.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.replacement;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Not thread safe and stateful class for page replacement of one page with write() delay. This allows to write page content without holding
+ * segment lock. Page data is copied into temp buffer during {@link #writePage(FullPageId, ByteBuffer, int)} and then sent to real
+ * implementation by {@link #finishReplacement()}.
+ */
+// TODO: IGNITE-15818 Maybe refactor.
+public class DelayedDirtyPageStoreWrite implements PageStoreWriter {
+    /** Real flush dirty page implementation. */
+    private final PageStoreWriter flushDirtyPage;
+
+    /** Page size. */
+    private final int pageSize;
+
+    /** Thread local with byte buffers. */
+    private final ThreadLocal<ByteBuffer> byteBufThreadLoc;
+
+    /** Replacing pages tracker, used to register & unregister pages being written. */
+    private final DelayedPageReplacementTracker tracker;
+
+    /** Full page id to be written on {@link #finishReplacement()} or null if nothing to write. */
+    @Nullable
+    private FullPageId fullPageId;
+
+    /** Byte buffer with page data to be written on {@link #finishReplacement()} or null if nothing to write. */
+    @Nullable
+    private ByteBuffer byteBuf;
+
+    /** Partition update tag to be used in{@link #finishReplacement()} or null if -1 to write. */
+    private int tag = -1;
+
+    /**
+     * Constructor.
+     *
+     * @param flushDirtyPage real writer to save page to store.
+     * @param byteBufThreadLoc thread local buffers to use for pages copying.
+     * @param pageSize page size.
+     * @param tracker tracker to lock/unlock page reads.
+     */
+    public DelayedDirtyPageStoreWrite(
+            PageStoreWriter flushDirtyPage,
+            ThreadLocal<ByteBuffer> byteBufThreadLoc,
+            int pageSize,
+            DelayedPageReplacementTracker tracker
+    ) {
+        this.flushDirtyPage = flushDirtyPage;
+        this.pageSize = pageSize;
+        this.byteBufThreadLoc = byteBufThreadLoc;
+        this.tracker = tracker;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writePage(FullPageId fullPageId, ByteBuffer byteBuf, int tag) {
+        tracker.lock(fullPageId);
+
+        ByteBuffer tlb = byteBufThreadLoc.get();
+
+        tlb.rewind();
+
+        long writeAddr = GridUnsafe.bufferAddress(tlb);
+        long origBufAddr = GridUnsafe.bufferAddress(byteBuf);
+
+        GridUnsafe.copyMemory(origBufAddr, writeAddr, pageSize);
+
+        this.fullPageId = fullPageId;
+        this.byteBuf = tlb;
+        this.tag = tag;
+    }
+
+    /**
+     * Runs actual write if required. Method is 'no op' if there was no page selected for replacement.
+     *
+     * @throws IgniteInternalCheckedException if write failed.
+     */
+    public void finishReplacement() throws IgniteInternalCheckedException {
+        if (byteBuf == null && fullPageId == null) {
+            return;
+        }
+
+        try {
+            flushDirtyPage.writePage(fullPageId, byteBuf, tag);
+        } finally {
+            tracker.unlock(fullPageId);
+
+            fullPageId = null;
+            byteBuf = null;
+            tag = -1;
+        }
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
new file mode 100644
index 000000000..12ec6b0c4
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/DelayedPageReplacementTracker.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.replacement;
+
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.segmentIndex;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * Delayed page writes tracker. Provides delayed write implementations and allows to check if page is actually being written to page store.
+ */
+public class DelayedPageReplacementTracker {
+    /** Page size. */
+    private final int pageSize;
+
+    /** Flush dirty page real implementation. */
+    private final PageStoreWriter flushDirtyPage;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Lock stripes for pages read protection. */
+    private final Stripe[] stripes;
+
+    /** Byte buffer thread local. */
+    private final ThreadLocal<ByteBuffer> byteBufThreadLoc = new ThreadLocal<>() {
+        /** {@inheritDoc} */
+        @Override
+        protected ByteBuffer initialValue() {
+            ByteBuffer buf = ByteBuffer.allocateDirect(pageSize);
+
+            buf.order(ByteOrder.nativeOrder());
+
+            return buf;
+        }
+    };
+
+    /**
+     * Dirty page write for replacement operations thread local. Because page write {@link DelayedDirtyPageStoreWrite} is stateful and not
+     * thread safe, this thread local protects from GC pressure on pages replacement. <br> Map is used instead of build-in thread local to
+     * allow GC to remove delayed writers for alive threads after node stop.
+     */
+    private final Map<Long, DelayedDirtyPageStoreWrite> delayedPageWriteThreadLocMap = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param pageSize Page size.
+     * @param flushDirtyPage Flush dirty page.
+     * @param log Logger.
+     * @param segmentCnt Segments count.
+     */
+    public DelayedPageReplacementTracker(
+            int pageSize,
+            PageStoreWriter flushDirtyPage,
+            IgniteLogger log,
+            int segmentCnt
+    ) {
+        this.pageSize = pageSize;
+        this.flushDirtyPage = flushDirtyPage;
+        this.log = log;
+
+        stripes = new Stripe[segmentCnt];
+
+        for (int i = 0; i < stripes.length; i++) {
+            stripes[i] = new Stripe();
+        }
+    }
+
+    /**
+     * Returns delayed page write implementation, finish method to be called to actually write page.
+     */
+    public DelayedDirtyPageStoreWrite delayedPageWrite() {
+        return delayedPageWriteThreadLocMap.computeIfAbsent(Thread.currentThread().getId(),
+                id -> new DelayedDirtyPageStoreWrite(flushDirtyPage, byteBufThreadLoc, pageSize, this));
+    }
+
+    /**
+     * Returns stripe related to current page identifier.
+     *
+     * @param id Full page ID.
+     */
+    private Stripe stripe(FullPageId id) {
+        int segmentIdx = segmentIndex(id.groupId(), id.pageId(), stripes.length);
+
+        return stripes[segmentIdx];
+    }
+
+    /**
+     * Locks the page.
+     *
+     * @param id Full page ID to lock from read.
+     */
+    public void lock(FullPageId id) {
+        stripe(id).lock(id);
+    }
+
+    /**
+     * Method is returned when page is available to be loaded from store, or waits for replacement finish.
+     *
+     * @param id full page ID to be loaded from store.
+     */
+    public void waitUnlock(FullPageId id) {
+        stripe(id).waitUnlock(id);
+    }
+
+    /**
+     * Unlocks the page.
+     *
+     * @param id Full page ID, which write has been finished, it is available for reading.
+     */
+    public void unlock(FullPageId id) {
+        stripe(id).unlock(id);
+    }
+
+    /**
+     * Stripe for locking pages from reading from store in parallel with not finished write.
+     */
+    private class Stripe {
+        /**
+         * Page IDs which are locked for reading from store. Page content is being written right now. guarded by collection object monitor.
+         */
+        private final Collection<FullPageId> locked = new HashSet<>(Runtime.getRuntime().availableProcessors() * 2);
+
+        /**
+         * Has locked pages, flag for fast check if there are some pages, what were replaced and is being written. Write to field is guarded
+         * by {@link #locked} monitor.
+         */
+        private volatile boolean hasLockedPages;
+
+        /**
+         * Locks the page.
+         *
+         * @param id full page ID to lock from read
+         */
+        public void lock(FullPageId id) {
+            synchronized (locked) {
+                hasLockedPages = true;
+
+                boolean add = locked.add(id);
+
+                assert add : "Double locking of page for replacement is not possible";
+            }
+        }
+
+        /**
+         * Method is returned when page is available to be loaded from store, or waits for replacement finish.
+         *
+         * @param id Full page ID to be loaded from store.
+         */
+        public void waitUnlock(FullPageId id) {
+            if (!hasLockedPages) {
+                return;
+            }
+
+            synchronized (locked) {
+                if (!hasLockedPages) {
+                    return;
+                }
+
+                boolean interrupted = false;
+
+                while (locked.contains(id)) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Found replaced page [" + id + "] which is being written to page store, wait for finish replacement");
+                    }
+
+                    try {
+                        // Uninterruptable wait.
+                        locked.wait();
+                    } catch (InterruptedException e) {
+                        interrupted = true;
+                    }
+                }
+
+                if (interrupted) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        /**
+         * Unlocks the page.
+         *
+         * @param id Full page ID, which write has been finished, it is available for reading.
+         */
+        public void unlock(FullPageId id) {
+            synchronized (locked) {
+                boolean rmv = locked.remove(id);
+
+                assert rmv : "Unlocking page ID never locked, id " + id;
+
+                if (locked.isEmpty()) {
+                    hasLockedPages = false;
+                }
+
+                locked.notifyAll();
+            }
+        }
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicy.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/PageReplacementPolicy.java
similarity index 96%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicy.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/PageReplacementPolicy.java
index 6cca28c86..ba2170b5e 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicy.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/PageReplacementPolicy.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
 
 import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/PageReplacementPolicyFactory.java
similarity index 95%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/PageReplacementPolicyFactory.java
index 625c3579b..387c9988d 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/PageReplacementPolicyFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
 
 import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
 
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicy.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
similarity index 92%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicy.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
index 8ecae86bb..8ce0471ca 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicy.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicy.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
 
 import static org.apache.ignite.internal.pagememory.PageIdAllocator.META_PAGE_ID;
+import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
 import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.INVALID_REL_PTR;
 import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.PAGE_OVERHEAD;
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
@@ -28,7 +29,11 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.pagememory.freelist.io.PagesListMetaIo;
 import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.persistence.LoadedPagesMap;
+import org.apache.ignite.internal.pagememory.persistence.PageHeader;
 import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
+import org.apache.ignite.internal.pagememory.persistence.PagePool;
+import org.apache.ignite.internal.pagememory.persistence.ReplaceCandidate;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 
 /**
@@ -95,7 +100,7 @@ public class RandomLruPageReplacementPolicy extends PageReplacementPolicy {
 
                 final long absPageAddr = seg.absolute(rndAddr);
 
-                FullPageId fullId = PageHeader.fullPageId(absPageAddr);
+                FullPageId fullId = fullPageId(absPageAddr);
 
                 // Check page mapping consistency.
                 assert fullId.equals(nearest.fullId()) : "Invalid page mapping [tableId=" + nearest.fullId()
@@ -152,7 +157,7 @@ public class RandomLruPageReplacementPolicy extends PageReplacementPolicy {
 
             final long absRmvAddr = seg.absolute(relRmvAddr);
 
-            final FullPageId fullPageId = PageHeader.fullPageId(absRmvAddr);
+            final FullPageId fullPageId = fullPageId(absRmvAddr);
 
             if (!seg.tryToRemovePage(fullPageId, absRmvAddr)) {
                 if (iterations > 10) {
@@ -214,7 +219,7 @@ public class RandomLruPageReplacementPolicy extends PageReplacementPolicy {
 
             final long absPageAddr = seg.absolute(addr);
 
-            FullPageId fullId = PageHeader.fullPageId(absPageAddr);
+            FullPageId fullId = fullPageId(absPageAddr);
 
             if (partGen < seg.partGeneration(fullId.groupId(), partitionId(fullId.pageId()))) {
                 return seg.refreshOutdatedPage(fullId.groupId(), fullId.pageId(), true);
@@ -228,7 +233,7 @@ public class RandomLruPageReplacementPolicy extends PageReplacementPolicy {
 
             final long absEvictAddr = seg.absolute(addr);
 
-            final FullPageId fullPageId = PageHeader.fullPageId(absEvictAddr);
+            final FullPageId fullPageId = fullPageId(absEvictAddr);
 
             if (seg.tryToRemovePage(fullPageId, absEvictAddr)) {
                 return addr;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicyFactory.java
similarity index 94%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicyFactory.java
index 651cc14df..9db7446c1 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/RandomLruPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/RandomLruPageReplacementPolicyFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
 
 import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
 
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageList.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageList.java
similarity index 99%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageList.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageList.java
index c37cee4a5..bc84f6ebf 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageList.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageList.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
 
 import static org.apache.ignite.internal.util.GridUnsafe.getInt;
 import static org.apache.ignite.internal.util.GridUnsafe.getLong;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicy.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageReplacementPolicy.java
similarity index 96%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicy.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageReplacementPolicy.java
index 247830f2a..9f1a250aa 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicy.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageReplacementPolicy.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
 
 import static org.apache.ignite.internal.pagememory.persistence.PageHeader.fullPageId;
 import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.INVALID_REL_PTR;
@@ -23,6 +23,7 @@ import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.O
 import static org.apache.ignite.internal.pagememory.util.PageIdUtils.partitionId;
 
 import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.LoadedPagesMap;
 import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicyFactory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageReplacementPolicyFactory.java
similarity index 84%
rename from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicyFactory.java
rename to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageReplacementPolicyFactory.java
index 4e1234a83..5c076e1d6 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/SegmentedLruPageReplacementPolicyFactory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/replacement/SegmentedLruPageReplacementPolicyFactory.java
@@ -15,7 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence;
+package org.apache.ignite.internal.pagememory.persistence.replacement;
+
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.Segment;
 
 /**
  * {@link SegmentedLruPageReplacementPolicy} factory.
@@ -29,7 +31,7 @@ public class SegmentedLruPageReplacementPolicyFactory implements PageReplacement
 
     /** {@inheritDoc} */
     @Override
-    public PageReplacementPolicy create(PageMemoryImpl.Segment seg, long ptr, int pagesCnt) {
+    public PageReplacementPolicy create(Segment seg, long ptr, int pagesCnt) {
         return new SegmentedLruPageReplacementPolicy(seg, ptr, pagesCnt);
     }
 }
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
index 3b5aaa914..d74eb9107 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.pagememory.util.PageIdUtils;
 import org.apache.ignite.internal.pagememory.util.PageUtils;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -62,6 +63,13 @@ public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
     @InjectConfiguration(polymorphicExtensions = UnsafeMemoryAllocatorConfigurationSchema.class)
     protected PageMemoryDataRegionConfiguration dataRegionCfg;
 
+    @BeforeEach
+    void setUp() throws Exception {
+        dataRegionCfg
+                .change(cfg -> cfg.changePageSize(PAGE_SIZE).changeInitSize(MAX_MEMORY_SIZE).changeMaxSize(MAX_MEMORY_SIZE))
+                .get(1, SECONDS);
+    }
+
     @Test
     public void testPageTearingInner() throws Exception {
         PageMemory mem = memory();
@@ -302,12 +310,6 @@ public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
      * @throws Exception If failed.
      */
     protected PageMemory memory() throws Exception {
-        dataRegionCfg.change(cfg ->
-                cfg.changePageSize(PAGE_SIZE)
-                        .changeInitSize(MAX_MEMORY_SIZE)
-                        .changeMaxSize(MAX_MEMORY_SIZE)
-        ).get(1, SECONDS);
-
         DirectMemoryProvider provider = new UnsafeMemoryProvider(null);
 
         PageIoRegistry ioRegistry = new PageIoRegistry();
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
index 10682d5ea..35ee3233d 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
@@ -18,9 +18,17 @@
 package org.apache.ignite.internal.pagememory.persistence;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.PAGE_OVERHEAD;
 import static org.apache.ignite.internal.util.Constants.MiB;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Set;
 import java.util.stream.LongStream;
+import org.apache.ignite.internal.pagememory.FullPageId;
 import org.apache.ignite.internal.pagememory.PageMemory;
 import org.apache.ignite.internal.pagememory.impl.PageMemoryNoLoadSelfTest;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
@@ -33,11 +41,11 @@ import org.junit.jupiter.api.Test;
 public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
     /** {@inheritDoc} */
     @Override
-    protected PageMemory memory() throws Exception {
-        dataRegionCfg
-                .change(cfg -> cfg.changePageSize(PAGE_SIZE).changeInitSize(MAX_MEMORY_SIZE).changeMaxSize(MAX_MEMORY_SIZE))
-                .get(1, SECONDS);
+    protected PageMemory memory() {
+        return memory(LongStream.range(0, 10).map(i -> 5 * MiB).toArray());
+    }
 
+    protected PageMemoryImpl memory(long[] sizes) {
         PageIoRegistry ioRegistry = new PageIoRegistry();
 
         ioRegistry.loadFromServiceLoader();
@@ -46,9 +54,11 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
                 new UnsafeMemoryProvider(null),
                 dataRegionCfg,
                 ioRegistry,
-                LongStream.range(0, 10).map(i -> 5 * MiB).toArray(),
+                sizes,
                 new TestPageReadWriteManager(),
                 (page, fullPageId, pageMemoryEx) -> {
+                },
+                (fullPageId, buf, tag) -> {
                 }
         );
     }
@@ -59,4 +69,58 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
     public void testPageHandleDeallocation() {
         // No-op.
     }
+
+    @Test
+    void testDirtyPages() throws Exception {
+        PageMemoryImpl memory = (PageMemoryImpl) memory();
+
+        memory.start();
+
+        try {
+            Set<FullPageId> dirtyPages = Set.of(allocatePage(memory), allocatePage(memory));
+
+            assertThat(memory.dirtyPages(), equalTo(dirtyPages));
+
+            // TODO: IGNITE-16898 After the checkpoint check that there are no dirty pages
+        } finally {
+            memory.stop(true);
+        }
+    }
+
+    @Test
+    void testSafeToUpdate() throws Exception {
+        long systemPageSize = PAGE_SIZE + PAGE_OVERHEAD;
+
+        dataRegionCfg
+                .change(c -> c.changeInitSize(128 * systemPageSize).changeMaxSize(128 * systemPageSize))
+                .get(1, SECONDS);
+
+        PageMemoryImpl memory = memory(new long[]{100 * systemPageSize, 28 * systemPageSize});
+
+        memory.start();
+
+        try {
+            long maxPages = memory.totalPages();
+
+            long maxDirtyPages = (maxPages * 3 / 4);
+
+            assertThat(maxDirtyPages, greaterThanOrEqualTo(50L));
+
+            for (int i = 0; i < maxDirtyPages - 1; i++) {
+                allocatePage(memory);
+
+                assertTrue(memory.safeToUpdate(), "i=" + i);
+            }
+
+            for (int i = (int) maxDirtyPages - 1; i < maxPages; i++) {
+                allocatePage(memory);
+
+                assertFalse(memory.safeToUpdate(), "i=" + i);
+            }
+
+            // TODO: IGNITE-16898 After the checkpoint check assertTrue(memory.safeToUpdate())
+        } finally {
+            memory.stop(true);
+        }
+    }
 }
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
new file mode 100644
index 000000000..8ec9518d8
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link CheckpointPages} testing.
+ */
+public class CheckpointPagesTest {
+    @Test
+    void testContains() {
+        CheckpointPages checkpointPages = new CheckpointPages(
+                Set.of(new FullPageId(0, 0), new FullPageId(1, 0)),
+                completedFuture(null)
+        );
+
+        assertTrue(checkpointPages.contains(new FullPageId(0, 0)));
+        assertTrue(checkpointPages.contains(new FullPageId(1, 0)));
+
+        assertFalse(checkpointPages.contains(new FullPageId(2, 0)));
+        assertFalse(checkpointPages.contains(new FullPageId(3, 0)));
+    }
+
+    @Test
+    void testSize() {
+        CheckpointPages checkpointPages = new CheckpointPages(
+                Set.of(new FullPageId(0, 0), new FullPageId(1, 0)),
+                completedFuture(null)
+        );
+
+        assertEquals(2, checkpointPages.size());
+    }
+
+    @Test
+    void testMarkAsSaved() {
+        CheckpointPages checkpointPages = new CheckpointPages(
+                new HashSet<>(Set.of(new FullPageId(0, 0), new FullPageId(1, 0), new FullPageId(2, 0))),
+                completedFuture(null)
+        );
+
+        assertTrue(checkpointPages.markAsSaved(new FullPageId(0, 0)));
+        assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
+        assertEquals(2, checkpointPages.size());
+
+        assertFalse(checkpointPages.markAsSaved(new FullPageId(0, 0)));
+        assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
+        assertEquals(2, checkpointPages.size());
+
+        assertTrue(checkpointPages.markAsSaved(new FullPageId(1, 0)));
+        assertFalse(checkpointPages.contains(new FullPageId(0, 0)));
+        assertEquals(1, checkpointPages.size());
+    }
+
+    @Test
+    void testAllowToSave() throws Exception {
+        Set<FullPageId> pages = Set.of(new FullPageId(0, 0), new FullPageId(1, 0), new FullPageId(2, 0));
+
+        CheckpointPages checkpointPages = new CheckpointPages(pages, completedFuture(null));
+
+        assertTrue(checkpointPages.allowToSave(new FullPageId(0, 0)));
+        assertTrue(checkpointPages.allowToSave(new FullPageId(1, 0)));
+        assertTrue(checkpointPages.allowToSave(new FullPageId(2, 0)));
+
+        assertFalse(checkpointPages.allowToSave(new FullPageId(3, 0)));
+
+        IgniteInternalCheckedException exception = assertThrows(
+                IgniteInternalCheckedException.class,
+                () -> new CheckpointPages(pages, failedFuture(new Exception("test"))).allowToSave(new FullPageId(0, 0))
+        );
+
+        assertThat(exception.getCause(), instanceOf(Exception.class));
+        assertThat(exception.getCause().getMessage(), equalTo("test"));
+
+        exception = assertThrows(
+                IgniteInternalCheckedException.class,
+                () -> {
+                    CompletableFuture<Object> future = new CompletableFuture<>();
+
+                    future.cancel(true);
+
+                    new CheckpointPages(pages, future).allowToSave(new FullPageId(0, 0));
+                }
+        );
+
+        assertThat(exception.getCause(), instanceOf(CancellationException.class));
+    }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLockTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLockTest.java
new file mode 100644
index 000000000..500c87fb6
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLockTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link CheckpointReadWriteLock} testing.
+ */
+public class CheckpointReadWriteLockTest {
+    @Test
+    void testReadLock() throws Exception {
+        CheckpointReadWriteLock lock0 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+        CheckpointReadWriteLock lock1 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+
+        lock1.writeLock();
+
+        lock0.readLock();
+        lock1.readLock();
+
+        assertEquals(1, lock0.getReadHoldCount());
+        assertEquals(0, lock1.getReadHoldCount());
+
+        lock1.writeUnlock();
+
+        runAsync(() -> {
+            assertEquals(0, lock0.getReadHoldCount());
+            assertEquals(0, lock1.getReadHoldCount());
+
+            lock0.readLock();
+            lock1.readLock();
+
+            assertEquals(1, lock0.getReadHoldCount());
+            assertEquals(1, lock1.getReadHoldCount());
+
+            lock0.readUnlock();
+            lock1.readUnlock();
+
+            assertEquals(0, lock0.getReadHoldCount());
+            assertEquals(0, lock1.getReadHoldCount());
+        }).get(1, TimeUnit.SECONDS);
+
+        lock1.writeLock();
+
+        assertEquals(1, lock0.getReadHoldCount());
+        assertEquals(0, lock1.getReadHoldCount());
+
+        lock0.readUnlock();
+        lock1.readUnlock();
+
+        assertEquals(0, lock0.getReadHoldCount());
+        assertEquals(0, lock1.getReadHoldCount());
+    }
+
+    @Test
+    void testTryReadLock() throws Exception {
+        CheckpointReadWriteLock lock0 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+        CheckpointReadWriteLock lock1 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+        CheckpointReadWriteLock lock2 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+
+        lock2.writeLock();
+
+        assertTrue(lock0.tryReadLock());
+        assertTrue(lock1.tryReadLock(1, TimeUnit.MILLISECONDS));
+
+        assertTrue(lock2.tryReadLock());
+        assertTrue(lock2.tryReadLock(1, TimeUnit.MILLISECONDS));
+
+        assertEquals(1, lock0.getReadHoldCount());
+        assertEquals(1, lock1.getReadHoldCount());
+        assertEquals(0, lock2.getReadHoldCount());
+
+        runAsync(() -> {
+            assertEquals(0, lock0.getReadHoldCount());
+            assertEquals(0, lock1.getReadHoldCount());
+            assertEquals(0, lock2.getReadHoldCount());
+
+            assertFalse(lock2.tryReadLock());
+
+            try {
+                assertFalse(lock2.tryReadLock(1, TimeUnit.MILLISECONDS));
+            } catch (InterruptedException e) {
+                fail(e);
+            }
+
+            assertEquals(0, lock0.getReadHoldCount());
+            assertEquals(0, lock1.getReadHoldCount());
+            assertEquals(0, lock2.getReadHoldCount());
+        }).get(1, TimeUnit.SECONDS);
+
+        lock2.writeUnlock();
+
+        runAsync(() -> {
+            try {
+                assertTrue(lock0.tryReadLock());
+                assertTrue(lock1.tryReadLock(1, TimeUnit.MILLISECONDS));
+
+                assertTrue(lock2.tryReadLock());
+                assertTrue(lock2.tryReadLock(1, TimeUnit.MILLISECONDS));
+
+                assertEquals(1, lock0.getReadHoldCount());
+                assertEquals(1, lock1.getReadHoldCount());
+                assertEquals(2, lock2.getReadHoldCount());
+
+                lock0.readUnlock();
+                lock1.readUnlock();
+                lock2.readUnlock();
+
+                assertEquals(0, lock0.getReadHoldCount());
+                assertEquals(0, lock1.getReadHoldCount());
+                assertEquals(1, lock2.getReadHoldCount());
+
+                lock2.readUnlock();
+
+                assertEquals(0, lock2.getReadHoldCount());
+            } catch (InterruptedException e) {
+                fail(e);
+            }
+        }).get(1, TimeUnit.SECONDS);
+
+        lock2.writeLock();
+
+        assertEquals(1, lock0.getReadHoldCount());
+        assertEquals(1, lock1.getReadHoldCount());
+        assertEquals(0, lock2.getReadHoldCount());
+
+        lock0.readUnlock();
+        lock1.readUnlock();
+        lock2.readUnlock();
+
+        assertEquals(0, lock0.getReadHoldCount());
+        assertEquals(0, lock1.getReadHoldCount());
+        assertEquals(0, lock2.getReadHoldCount());
+    }
+
+    @Test
+    void testCheckpointLockIsHeldByThread() throws Exception {
+        CheckpointReadWriteLock lock0 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+        CheckpointReadWriteLock lock1 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+        CheckpointReadWriteLock lock2 = new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking());
+
+        assertFalse(lock0.checkpointLockIsHeldByThread());
+        assertFalse(lock1.checkpointLockIsHeldByThread());
+        assertFalse(lock2.checkpointLockIsHeldByThread());
+
+        lock0.writeLock();
+        lock1.readLock();
+
+        assertTrue(lock0.checkpointLockIsHeldByThread());
+        assertTrue(lock1.checkpointLockIsHeldByThread());
+
+        runAsync(() -> assertTrue(lock2.checkpointLockIsHeldByThread()), "checkpoint-runner").get(1, TimeUnit.SECONDS);
+
+        runAsync(() -> {
+            assertFalse(lock0.checkpointLockIsHeldByThread());
+            assertFalse(lock1.checkpointLockIsHeldByThread());
+            assertFalse(lock2.checkpointLockIsHeldByThread());
+        }).get(1, TimeUnit.SECONDS);
+
+        runAsync(() -> {
+            assertFalse(lock0.tryReadLock());
+
+            try {
+                assertFalse(lock0.tryReadLock(1, TimeUnit.MILLISECONDS));
+            } catch (InterruptedException e) {
+                fail(e);
+            }
+
+            assertFalse(lock0.checkpointLockIsHeldByThread());
+        }).get(1, TimeUnit.SECONDS);
+
+        lock0.writeUnlock();
+        lock1.readUnlock();
+
+        assertFalse(lock0.checkpointLockIsHeldByThread());
+        assertFalse(lock1.checkpointLockIsHeldByThread());
+        assertFalse(lock2.checkpointLockIsHeldByThread());
+    }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
new file mode 100644
index 000000000..43cba5504
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link CheckpointTimeoutLock} testing.
+ */
+public class CheckpointTimeoutLockTest {
+    private final IgniteLogger log = IgniteLogger.forClass(CheckpointTimeoutLockTest.class);
+
+    @Nullable
+    private CheckpointTimeoutLock timeoutLock;
+
+    @AfterEach
+    void tearDown() {
+        if (timeoutLock != null) {
+            readUnlock(timeoutLock);
+
+            timeoutLock.stop();
+        }
+    }
+
+    @Test
+    void testCheckpointReadLockTimeout() {
+        timeoutLock = new CheckpointTimeoutLock(log, newReadWriteLock(), Long.MAX_VALUE, () -> true, mock(Checkpointer.class));
+
+        timeoutLock.start();
+
+        assertEquals(Long.MAX_VALUE, timeoutLock.checkpointReadLockTimeout());
+
+        timeoutLock.checkpointReadLockTimeout(Long.MIN_VALUE);
+
+        assertEquals(Long.MIN_VALUE, timeoutLock.checkpointReadLockTimeout());
+    }
+
+    @Test
+    void testCheckpointReadLock() throws Exception {
+        CheckpointTimeoutLock timeoutLock0 = new CheckpointTimeoutLock(log, newReadWriteLock(), 0, () -> true, mock(Checkpointer.class));
+        CheckpointTimeoutLock timeoutLock1 = new CheckpointTimeoutLock(log, newReadWriteLock(), 1, () -> true, mock(Checkpointer.class));
+
+        try {
+            timeoutLock0.start();
+            timeoutLock1.start();
+
+            assertDoesNotThrow(timeoutLock0::checkpointReadLock);
+            assertDoesNotThrow(timeoutLock1::checkpointReadLock);
+
+            // Checks reentrant lock.
+
+            assertDoesNotThrow(timeoutLock0::checkpointReadLock);
+            assertDoesNotThrow(timeoutLock1::checkpointReadLock);
+        } finally {
+            readUnlock(timeoutLock0);
+            readUnlock(timeoutLock1);
+
+            closeAll(timeoutLock0::stop, timeoutLock1::stop);
+        }
+    }
+
+    @Test
+    void testCheckpointReadLockWithWriteLockHeldByCurrentThread() {
+        CheckpointReadWriteLock readWriteLock = newReadWriteLock();
+
+        timeoutLock = new CheckpointTimeoutLock(log, readWriteLock, 1, () -> true, mock(Checkpointer.class));
+
+        timeoutLock.start();
+
+        readWriteLock.writeLock();
+
+        try {
+            assertDoesNotThrow(timeoutLock::checkpointReadLock);
+
+            // Check reentrant lock.
+
+            assertDoesNotThrow(timeoutLock::checkpointReadLock);
+        } finally {
+            writeUnlock(readWriteLock);
+        }
+    }
+
+    @Test
+    void testCheckpointReadLockFailOnNodeStop() {
+        timeoutLock = new CheckpointTimeoutLock(log, newReadWriteLock(), Long.MAX_VALUE, () -> true, mock(Checkpointer.class));
+
+        timeoutLock.stop();
+
+        IgniteInternalException exception = assertThrows(IgniteInternalException.class, timeoutLock::checkpointReadLock);
+
+        assertThat(exception.getCause(), instanceOf(NodeStoppingException.class));
+    }
+
+    @Test
+    void testCheckpointReadLockTimeoutFail() throws Exception {
+        CheckpointReadWriteLock readWriteLock0 = newReadWriteLock();
+        CheckpointReadWriteLock readWriteLock1 = newReadWriteLock();
+
+        CheckpointTimeoutLock timeoutLock0 = new CheckpointTimeoutLock(log, readWriteLock0, 0, () -> true, mock(Checkpointer.class));
+        CheckpointTimeoutLock timeoutLock1 = new CheckpointTimeoutLock(log, readWriteLock1, 1, () -> true, mock(Checkpointer.class));
+
+        try {
+            timeoutLock0.start();
+            timeoutLock1.start();
+
+            readWriteLock0.writeLock();
+            readWriteLock1.writeLock();
+
+            CountDownLatch startThreadLatch = new CountDownLatch(2);
+
+            CompletableFuture<?> readLockFuture0 = runAsync(() -> checkpointReadLock(startThreadLatch, timeoutLock0));
+            CompletableFuture<?> readLockFuture1 = runAsync(() -> checkpointReadLock(startThreadLatch, timeoutLock1));
+
+            assertTrue(startThreadLatch.await(100, MILLISECONDS));
+
+            // For the Windows case, getting a read lock can take up to 100 ms.
+            assertThrows(TimeoutException.class, () -> readLockFuture0.get(100, MILLISECONDS));
+
+            ExecutionException exception = assertThrows(ExecutionException.class, () -> readLockFuture1.get(100, MILLISECONDS));
+
+            assertThat(exception.getCause().getCause(), instanceOf(CheckpointReadLockTimeoutException.class));
+        } finally {
+            writeUnlock(readWriteLock0);
+            writeUnlock(readWriteLock1);
+
+            closeAll(timeoutLock0::stop, timeoutLock1::stop);
+        }
+    }
+
+    @Test
+    void testCheckpointReadLockTimeoutWithInterruptionFail() throws Exception {
+        CheckpointReadWriteLock readWriteLock0 = newReadWriteLock();
+        CheckpointReadWriteLock readWriteLock1 = newReadWriteLock();
+
+        CheckpointTimeoutLock timeoutLock0 = new CheckpointTimeoutLock(log, readWriteLock0, 0, () -> true, mock(Checkpointer.class));
+        CheckpointTimeoutLock timeoutLock1 = new CheckpointTimeoutLock(log, readWriteLock1, 1, () -> true, mock(Checkpointer.class));
+
+        try {
+            timeoutLock0.start();
+            timeoutLock1.start();
+
+            readWriteLock0.writeLock();
+            readWriteLock1.writeLock();
+
+            CountDownLatch startThreadLatch = new CountDownLatch(2);
+            CountDownLatch interruptedThreadLatch = new CountDownLatch(2);
+
+            CompletableFuture<?> readLockFuture0 = runAsync(() -> {
+                currentThread().interrupt();
+
+                try {
+                    checkpointReadLock(startThreadLatch, timeoutLock0);
+                } finally {
+                    interruptedThreadLatch.countDown();
+                }
+            });
+
+            CompletableFuture<?> readLockFuture1 = runAsync(() -> {
+                currentThread().interrupt();
+
+                try {
+                    checkpointReadLock(startThreadLatch, timeoutLock1);
+                } finally {
+                    interruptedThreadLatch.countDown();
+                }
+            });
+
+            assertTrue(startThreadLatch.await(100, MILLISECONDS));
+
+            // For the Windows case, getting a read lock can take up to 100 ms.
+            assertThrows(TimeoutException.class, () -> readLockFuture0.get(100, MILLISECONDS));
+
+            ExecutionException exception = assertThrows(ExecutionException.class, () -> readLockFuture1.get(100, MILLISECONDS));
+
+            assertThat(exception.getCause().getCause(), instanceOf(CheckpointReadLockTimeoutException.class));
+
+            writeUnlock(readWriteLock0);
+            writeUnlock(readWriteLock1);
+
+            assertTrue(interruptedThreadLatch.await(100, MILLISECONDS));
+        } finally {
+            writeUnlock(readWriteLock0);
+            writeUnlock(readWriteLock1);
+
+            closeAll(timeoutLock0::stop, timeoutLock1::stop);
+        }
+    }
+
+    @Test
+    void testScheduleCheckpoint() throws Exception {
+        CompletableFuture<?> lockRealiseFuture = mock(CompletableFuture.class);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        when(lockRealiseFuture.get()).then(a -> {
+            latch.countDown();
+
+            return null;
+        });
+
+        Checkpointer checkpointer = newCheckpointer(currentThread(), lockRealiseFuture);
+
+        AtomicBoolean safeToUpdate = new AtomicBoolean();
+
+        timeoutLock = new CheckpointTimeoutLock(log, newReadWriteLock(), 0, safeToUpdate::get, checkpointer);
+
+        timeoutLock.start();
+
+        CompletableFuture<?> readLockFuture = runAsync(() -> {
+            timeoutLock.checkpointReadLock();
+
+            timeoutLock.checkpointReadUnlock();
+        });
+
+        assertTrue(latch.await(100, MILLISECONDS));
+
+        safeToUpdate.set(true);
+
+        readLockFuture.get(100, MILLISECONDS);
+    }
+
+    @Test
+    void testFailureLockReleasedFuture() {
+        Checkpointer checkpointer = newCheckpointer(currentThread(), failedFuture(new Exception("test")));
+
+        timeoutLock = new CheckpointTimeoutLock(log, newReadWriteLock(), 0, () -> false, checkpointer);
+
+        timeoutLock.start();
+
+        IgniteInternalException exception = assertThrows(IgniteInternalException.class, timeoutLock::checkpointReadLock);
+
+        assertThat(exception.getCause(), instanceOf(Exception.class));
+        assertThat(exception.getCause().getMessage(), equalTo("test"));
+    }
+
+    @Test
+    void testCanceledLockReleasedFuture() {
+        CompletableFuture<?> future = new CompletableFuture<>();
+
+        future.cancel(true);
+
+        Checkpointer checkpointer = newCheckpointer(currentThread(), future);
+
+        PageMemoryDataRegion dataRegion = newPageMemoryDataRegion(true, new AtomicBoolean());
+
+        timeoutLock = new CheckpointTimeoutLock(log, newReadWriteLock(), 0, () -> false, checkpointer);
+
+        timeoutLock.start();
+
+        IgniteInternalException exception = assertThrows(IgniteInternalException.class, timeoutLock::checkpointReadLock);
+
+        assertThat(exception.getCause(), instanceOf(CancellationException.class));
+    }
+
+    private void checkpointReadLock(CountDownLatch latch, CheckpointTimeoutLock lock) {
+        latch.countDown();
+
+        lock.checkpointReadLock();
+
+        lock.checkpointReadUnlock();
+    }
+
+    private void writeUnlock(CheckpointReadWriteLock lock) {
+        if (lock.isWriteLockHeldByCurrentThread()) {
+            lock.writeUnlock();
+        }
+    }
+
+    private void readUnlock(CheckpointTimeoutLock lock) {
+        while (lock.checkpointLockIsHeldByThread()) {
+            lock.checkpointReadUnlock();
+        }
+    }
+
+    private CheckpointReadWriteLock newReadWriteLock() {
+        return new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking(log, 5_000));
+    }
+
+    private CheckpointProgress newCheckpointProgress(CompletableFuture<?> future) {
+        CheckpointProgress progress = mock(CheckpointProgress.class);
+
+        when(progress.futureFor(LOCK_RELEASED)).then(a -> future);
+
+        return progress;
+    }
+
+    private Checkpointer newCheckpointer(Thread runner, CompletableFuture<?> future) {
+        Checkpointer checkpointer = mock(Checkpointer.class);
+
+        when(checkpointer.runner()).thenReturn(runner);
+
+        CheckpointProgress checkpointProgress = newCheckpointProgress(future);
+
+        when(checkpointer.scheduleCheckpoint(0, "too many dirty pages")).thenReturn(checkpointProgress);
+
+        return checkpointer;
+    }
+
+    private PageMemoryDataRegion newPageMemoryDataRegion(boolean persistent, AtomicBoolean safeToUpdate) {
+        PageMemoryDataRegion dataRegion = mock(PageMemoryDataRegion.class);
+
+        when(dataRegion.persistent()).thenReturn(persistent);
+
+        PageMemoryImpl pageMemory = mock(PageMemoryImpl.class);
+
+        when(pageMemory.safeToUpdate()).then(a -> safeToUpdate.get());
+
+        when(dataRegion.pageMemory()).thenReturn(pageMemory);
+
+        return dataRegion;
+    }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTrackingTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTrackingTest.java
new file mode 100644
index 000000000..6089b83a6
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/ReentrantReadWriteLockWithTrackingTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+/**
+ * For {@link ReentrantReadWriteLockWithTracking} testing.
+ */
+public class ReentrantReadWriteLockWithTrackingTest {
+    private final IgniteLogger log = IgniteLogger.forClass(getClass());
+
+    @Test
+    void testIsWriteLockedByCurrentThread() throws Exception {
+        ReentrantReadWriteLockWithTracking lock0 = new ReentrantReadWriteLockWithTracking();
+        ReentrantReadWriteLockWithTracking lock1 = new ReentrantReadWriteLockWithTracking(log, Long.MAX_VALUE);
+
+        assertFalse(lock0.isWriteLockedByCurrentThread());
+        assertFalse(lock1.isWriteLockedByCurrentThread());
+
+        lock0.writeLock().lock();
+        lock1.writeLock().lock();
+
+        assertTrue(lock0.isWriteLockedByCurrentThread());
+        assertTrue(lock1.isWriteLockedByCurrentThread());
+
+        runAsync(() -> {
+            assertFalse(lock0.isWriteLockedByCurrentThread());
+            assertFalse(lock1.isWriteLockedByCurrentThread());
+        }).get(1, TimeUnit.SECONDS);
+
+        lock0.writeLock().unlock();
+        lock1.writeLock().unlock();
+
+        assertFalse(lock0.isWriteLockedByCurrentThread());
+        assertFalse(lock1.isWriteLockedByCurrentThread());
+    }
+
+    @Test
+    void testGetReadHoldCount() throws Exception {
+        ReentrantReadWriteLockWithTracking lock0 = new ReentrantReadWriteLockWithTracking();
+        ReentrantReadWriteLockWithTracking lock1 = new ReentrantReadWriteLockWithTracking(log, Long.MAX_VALUE);
+
+        assertEquals(0, lock0.getReadHoldCount());
+        assertEquals(0, lock1.getReadHoldCount());
+
+        lock0.readLock().lock();
+        lock0.readLock().lock();
+
+        lock1.readLock().lock();
+
+        assertEquals(2, lock0.getReadHoldCount());
+        assertEquals(1, lock1.getReadHoldCount());
+
+        runAsync(() -> {
+            assertEquals(0, lock0.getReadHoldCount());
+            assertEquals(0, lock1.getReadHoldCount());
+
+            lock0.readLock().lock();
+            lock1.readLock().lock();
+
+            assertEquals(1, lock0.getReadHoldCount());
+            assertEquals(1, lock1.getReadHoldCount());
+
+            lock0.readLock().unlock();
+            lock1.readLock().unlock();
+
+            assertEquals(0, lock0.getReadHoldCount());
+            assertEquals(0, lock1.getReadHoldCount());
+        }).get(1, TimeUnit.SECONDS);
+
+        assertEquals(2, lock0.getReadHoldCount());
+        assertEquals(1, lock1.getReadHoldCount());
+
+        lock0.readLock().unlock();
+        lock1.readLock().unlock();
+
+        assertEquals(1, lock0.getReadHoldCount());
+        assertEquals(0, lock1.getReadHoldCount());
+
+        lock0.readLock().unlock();
+
+        assertEquals(0, lock0.getReadHoldCount());
+        assertEquals(0, lock1.getReadHoldCount());
+    }
+
+    @Test
+    void testGetReadLockCount() throws Exception {
+        ReentrantReadWriteLockWithTracking lock0 = new ReentrantReadWriteLockWithTracking();
+        ReentrantReadWriteLockWithTracking lock1 = new ReentrantReadWriteLockWithTracking(log, Long.MAX_VALUE);
+
+        assertEquals(0, lock0.getReadLockCount());
+        assertEquals(0, lock1.getReadLockCount());
+
+        lock0.readLock().lock();
+        lock0.readLock().lock();
+
+        lock1.readLock().lock();
+
+        assertEquals(2, lock0.getReadLockCount());
+        assertEquals(1, lock1.getReadLockCount());
+
+        runAsync(() -> {
+            assertEquals(2, lock0.getReadLockCount());
+            assertEquals(1, lock1.getReadLockCount());
+
+            lock0.readLock().lock();
+            lock1.readLock().lock();
+
+            assertEquals(3, lock0.getReadLockCount());
+            assertEquals(2, lock1.getReadLockCount());
+
+            lock0.readLock().unlock();
+            lock1.readLock().unlock();
+
+            assertEquals(2, lock0.getReadLockCount());
+            assertEquals(1, lock1.getReadLockCount());
+        }).get(1, TimeUnit.SECONDS);
+
+        assertEquals(2, lock0.getReadLockCount());
+        assertEquals(1, lock1.getReadLockCount());
+
+        lock0.readLock().unlock();
+        lock1.readLock().unlock();
+
+        assertEquals(1, lock0.getReadLockCount());
+        assertEquals(0, lock1.getReadLockCount());
+
+        lock0.readLock().unlock();
+
+        assertEquals(0, lock0.getReadLockCount());
+        assertEquals(0, lock1.getReadLockCount());
+    }
+
+    @Test
+    void testPrintLongHoldReadLock() throws Exception {
+        IgniteLogger log = mock(IgniteLogger.class);
+
+        ArgumentCaptor<String> msgArgumentCaptor = ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<Throwable> throwableArgumentCaptor = ArgumentCaptor.forClass(Throwable.class);
+
+        doNothing().when(log).warn(msgArgumentCaptor.capture(), throwableArgumentCaptor.capture());
+
+        ReentrantReadWriteLockWithTracking lock0 = new ReentrantReadWriteLockWithTracking(log, 20);
+        ReentrantReadWriteLockWithTracking lock1 = new ReentrantReadWriteLockWithTracking(log, 200);
+
+        lock0.readLock().lock();
+        lock1.readLock().lock();
+
+        Thread.sleep(50);
+
+        lock0.readLock().unlock();
+        lock1.readLock().unlock();
+
+        assertThat(msgArgumentCaptor.getAllValues(), hasSize(1));
+        assertThat(throwableArgumentCaptor.getAllValues(), hasSize(1));
+
+        assertThat(msgArgumentCaptor.getValue(), Matchers.startsWith("ReadLock held the lock more than"));
+        assertThat(throwableArgumentCaptor.getValue(), instanceOf(IgniteInternalException.class));
+    }
+}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java
similarity index 88%
rename from modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryDataRegion.java
rename to modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java
index d194a3a04..dd0b0e9ca 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryDataRegion.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.pagememory;
 
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
 import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryDataRegionConfiguration;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
 import org.jetbrains.annotations.Nullable;
@@ -27,7 +28,7 @@ import org.jetbrains.annotations.Nullable;
  * Abstract data region for {@link PageMemoryStorageEngine}. Based on a {@link PageMemory}.
  */
 // TODO: IGNITE-16641 Add support for persistent case.
-abstract class PageMemoryDataRegion implements IgniteComponent {
+abstract class AbstractPageMemoryDataRegion implements PageMemoryDataRegion, IgniteComponent {
     protected final PageMemoryDataRegionConfiguration cfg;
 
     protected final PageIoRegistry ioRegistry;
@@ -40,7 +41,7 @@ abstract class PageMemoryDataRegion implements IgniteComponent {
      * @param cfg Data region configuration.
      * @param ioRegistry IO registry.
      */
-    public PageMemoryDataRegion(PageMemoryDataRegionConfiguration cfg, PageIoRegistry ioRegistry) {
+    public AbstractPageMemoryDataRegion(PageMemoryDataRegionConfiguration cfg, PageIoRegistry ioRegistry) {
         this.cfg = cfg;
         this.ioRegistry = ioRegistry;
     }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
index 3d25357b2..3ec2da275 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryPartitionStorage.java
@@ -72,7 +72,7 @@ class PageMemoryPartitionStorage implements PartitionStorage {
     public PageMemoryPartitionStorage(
             int partId,
             TableConfiguration tableCfg,
-            PageMemoryDataRegion dataRegion,
+            AbstractPageMemoryDataRegion dataRegion,
             TableFreeList freeList
     ) throws StorageException {
         assert partId >= 0 && partId < MAX_PARTITION_ID : partId;
@@ -409,6 +409,7 @@ class PageMemoryPartitionStorage implements PartitionStorage {
         }
     }
 
+    /** {@inheritDoc} */
     @Override
     public long rowsCount() {
         try {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
index 7f587de58..a109c5524 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PageMemoryTableStorage.java
@@ -39,7 +39,7 @@ import org.jetbrains.annotations.Nullable;
 // TODO: IGNITE-16641 Add support for persistent case.
 // TODO: IGNITE-16642 Support indexes.
 abstract class PageMemoryTableStorage implements TableStorage {
-    protected final PageMemoryDataRegion dataRegion;
+    protected final AbstractPageMemoryDataRegion dataRegion;
 
     protected final TableConfiguration tableCfg;
 
@@ -56,7 +56,7 @@ abstract class PageMemoryTableStorage implements TableStorage {
      * @param tableCfg – Table configuration.
      * @param dataRegion – Data region for the table.
      */
-    public PageMemoryTableStorage(TableConfiguration tableCfg, PageMemoryDataRegion dataRegion) {
+    public PageMemoryTableStorage(TableConfiguration tableCfg, AbstractPageMemoryDataRegion dataRegion) {
         this.dataRegion = dataRegion;
         this.tableCfg = tableCfg;
     }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
index 01f0b539b..cd44241a7 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryDataRegion.java
@@ -33,9 +33,9 @@ import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 
 /**
- * Implementation of {@link PageMemoryDataRegion} for in-memory case.
+ * Implementation of {@link AbstractPageMemoryDataRegion} for in-memory case.
  */
-class VolatilePageMemoryDataRegion extends PageMemoryDataRegion {
+class VolatilePageMemoryDataRegion extends AbstractPageMemoryDataRegion {
     private TableFreeList freeList;
 
     /**