You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/05/13 08:47:46 UTC

[ignite-3] branch main updated: IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2 (#800)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7adac2ab3 IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2 (#800)
7adac2ab3 is described below

commit 7adac2ab324a075f2dd9317c546b1129e88d9cc1
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Fri May 13 11:47:41 2022 +0300

    IGNITE-16898 [Native Persistence 3.0] Porting a checkpoint and related code, part 2 (#800)
---
 .../ignite/internal/util/FastTimestamps.java       |   1 +
 .../apache/ignite/internal/util/IgniteUtils.java   |  12 +-
 .../ignite/internal/util/worker/IgniteWorker.java  | 321 ++++++++++++++
 .../util/worker/IgniteWorkerListener.java}         |  33 +-
 .../util/worker/WorkProgressDispatcher.java}       |  32 +-
 .../internal/util/worker/IgniteWorkerTest.java     | 466 +++++++++++++++++++++
 .../pagememory/persistence/PageMemoryImpl.java     |   2 +-
 .../pagememory/persistence/PageStoreWriter.java    |   2 +-
 .../persistence/checkpoint/Checkpoint.java         |  58 +++
 .../checkpoint/CheckpointDirtyPagesInfoHolder.java |  48 +++
 .../persistence/checkpoint/CheckpointListener.java |  69 +++
 .../checkpoint/CheckpointMarkersStorage.java       | 200 +++++++++
 .../persistence/checkpoint/CheckpointProgress.java |  24 +-
 .../checkpoint/CheckpointProgressImpl.java         | 265 ++++++++++++
 .../checkpoint/CheckpointReadWriteLock.java        |   2 +-
 .../persistence/checkpoint/CheckpointState.java    |   1 -
 .../persistence/checkpoint/CheckpointWorkflow.java | 351 ++++++++++++++++
 ...Checkpointer.java => CheckpointWriteOrder.java} |  33 +-
 .../persistence/checkpoint/Checkpointer.java       |  18 +-
 .../checkpoint/IgniteConcurrentMultiPairQueue.java | 227 ++++++++++
 .../persistence/PageMemoryImplNoLoadTest.java      |   4 +-
 .../checkpoint/CheckpointMarkersStorageTest.java   | 184 ++++++++
 .../checkpoint/CheckpointProgressImplTest.java     | 342 +++++++++++++++
 .../persistence/checkpoint/CheckpointTest.java     |  48 +++
 .../checkpoint/CheckpointTestUtils.java            |  55 +++
 .../checkpoint/CheckpointWorkflowTest.java         | 445 ++++++++++++++++++++
 .../IgniteConcurrentMultiPairQueueTest.java        | 149 +++++++
 27 files changed, 3333 insertions(+), 59 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
index af6f00540..c73cedb2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
@@ -31,6 +31,7 @@ public class FastTimestamps {
 
     private static void startUpdater() {
         Thread updater = new Thread("FastTimestamps updater") {
+            /** {@inheritDoc} */
             @Override
             public void run() {
                 while (true) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 30e0495b8..41a8f3d81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -332,9 +332,17 @@ public class IgniteUtils {
      * @return Absolute value.
      */
     public static int safeAbs(int i) {
-        i = Math.abs(i);
+        return Math.max(Math.abs(i), 0);
+    }
 
-        return i < 0 ? 0 : i;
+    /**
+     * Gets absolute value for long. If long is {@link Long#MIN_VALUE}, then {@code 0} is returned.
+     *
+     * @param i Long value.
+     * @return Absolute value.
+     */
+    public static long safeAbs(long i) {
+        return Math.max(Math.abs(i), 0);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorker.java
new file mode 100644
index 000000000..0cf0a40f6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorker.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.worker;
+
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extension to standard {@link Runnable} interface.
+ *
+ * <p>Adds proper details to be used with {@link Executor} implementations.
+ */
+public abstract class IgniteWorker implements Runnable, WorkProgressDispatcher {
+    /** Ignite logger. */
+    protected final IgniteLogger log;
+
+    /** Thread name. */
+    private final String name;
+
+    /** Ignite instance name. */
+    private final String igniteInstanceName;
+
+    /** Listener. */
+    private final IgniteWorkerListener listener;
+
+    /** Finish mark. */
+    private volatile boolean finished;
+
+    /** Whether this runnable is cancelled. */
+    protected final AtomicBoolean isCancelled = new AtomicBoolean();
+
+    /** Actual thread runner. */
+    private volatile Thread runner;
+
+    /** Timestamp to be updated by this worker periodically to indicate it's up and running. */
+    private volatile long heartbeatTimestamp;
+
+    /** Atomic field updater to change heartbeat. */
+    private static final AtomicLongFieldUpdater<IgniteWorker> HEARTBEAT_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(IgniteWorker.class, "heartbeatTimestamp");
+
+    /** Mutex for finish awaiting. */
+    private final Object mux = new Object();
+
+    /**
+     * Creates new grid worker with given parameters.
+     *
+     * @param log Logger.
+     * @param igniteInstanceName Name of the Ignite instance this runnable is used in.
+     * @param name Worker name. Note that in general thread name and worker (runnable) name are two different things.
+     *      The same worker can be executed by multiple threads and therefore for logging and debugging purposes we separate the two.
+     * @param listener Listener for life-cycle events.
+     */
+    protected IgniteWorker(
+            IgniteLogger log,
+            String igniteInstanceName,
+            String name,
+            @Nullable IgniteWorkerListener listener
+    ) {
+        this.log = log;
+        this.igniteInstanceName = igniteInstanceName;
+        this.name = name;
+        this.listener = listener;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public final void run() {
+        updateHeartbeat();
+
+        // Runner thread must be recorded first as other operations may depend on it being present.
+        runner = Thread.currentThread();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Grid runnable started: " + name);
+        }
+
+        try {
+            if (isCancelled.get()) {
+                onCancelledBeforeWorkerScheduled();
+            }
+
+            // Listener callback.
+            if (listener != null) {
+                listener.onStarted(this);
+            }
+
+            body();
+        } catch (InterruptedException e) {
+            if (log.isDebugEnabled()) {
+                log.debug("Caught interrupted exception: " + e);
+            }
+
+            Thread.currentThread().interrupt();
+        } catch (Throwable e) {
+            // Catch everything to make sure that it gets logged properly and
+            // not to kill any threads from the underlying thread pool.
+
+            log.error("Runtime error caught during grid runnable execution: " + this, e);
+
+            if (e instanceof Error) {
+                throw e;
+            }
+        } finally {
+            synchronized (mux) {
+                finished = true;
+
+                mux.notifyAll();
+            }
+
+            cleanup();
+
+            if (listener != null) {
+                listener.onStopped(this);
+            }
+
+            if (log.isDebugEnabled()) {
+                if (isCancelled.get()) {
+                    log.debug("Grid runnable finished due to cancellation: " + name);
+                } else if (runner.isInterrupted()) {
+                    log.debug("Grid runnable finished due to interruption without cancellation: " + name);
+                } else {
+                    log.debug("Grid runnable finished normally: " + name);
+                }
+            }
+
+            // Need to set runner to null, to make sure that
+            // further operations on this runnable won't
+            // affect the thread which could have been recycled
+            // by thread pool.
+            runner = null;
+        }
+    }
+
+    /**
+     * The implementation should provide the execution body for this runnable.
+     *
+     * @throws InterruptedException Thrown in case of interruption.
+     */
+    protected abstract void body() throws InterruptedException;
+
+    /**
+     * Optional method that will be called after runnable is finished. Default implementation is no-op.
+     */
+    protected void cleanup() {
+        /* No-op. */
+    }
+
+    /**
+     * Returns runner thread, {@code null} if the worker has not yet started executing.
+     */
+    public @Nullable Thread runner() {
+        return runner;
+    }
+
+    /**
+     * Returns Name of the Ignite instance this runnable belongs to.
+     */
+    public String igniteInstanceName() {
+        return igniteInstanceName;
+    }
+
+    /**
+     * Returns this runnable name.
+     */
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Cancels this runnable.
+     */
+    public void cancel() {
+        if (log.isDebugEnabled()) {
+            log.debug("Cancelling grid runnable: " + this);
+        }
+
+        onCancel(isCancelled.compareAndSet(false, true));
+    }
+
+    /**
+     * Joins this runnable.
+     *
+     * @throws InterruptedException Thrown in case of interruption.
+     */
+    public void join() throws InterruptedException {
+        if (log.isDebugEnabled()) {
+            log.debug("Joining grid runnable: " + this);
+        }
+
+        if ((runner == null && isCancelled.get()) || finished) {
+            return;
+        }
+
+        synchronized (mux) {
+            while (!finished) {
+                mux.wait();
+            }
+        }
+    }
+
+    /**
+     * Returns {@code true} if this runnable is cancelled - {@code false} otherwise.
+     *
+     * @see Future#isCancelled()
+     */
+    public boolean isCancelled() {
+        Thread runner = this.runner;
+
+        return isCancelled.get() || (runner != null && runner.isInterrupted());
+    }
+
+    /**
+     * Returns {@code true} if this runnable is finished - {@code false} otherwise.
+     */
+    public boolean isDone() {
+        return finished;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long heartbeat() {
+        return heartbeatTimestamp;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void updateHeartbeat() {
+        long currentTimestamp = coarseCurrentTimeMillis();
+        long heartbeatTimestamp = this.heartbeatTimestamp;
+
+        // Avoid heartbeat update while in the blocking section.
+        while (heartbeatTimestamp < currentTimestamp) {
+            if (HEARTBEAT_UPDATER.compareAndSet(this, heartbeatTimestamp, currentTimestamp)) {
+                return;
+            }
+
+            heartbeatTimestamp = this.heartbeatTimestamp;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void blockingSectionBegin() {
+        heartbeatTimestamp = Long.MAX_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void blockingSectionEnd() {
+        heartbeatTimestamp = coarseCurrentTimeMillis();
+    }
+
+    /**
+     * Can be called from {@link #runner()} thread to perform idleness handling.
+     */
+    protected void onIdle() {
+        if (listener != null) {
+            listener.onIdle(this);
+        }
+    }
+
+    /**
+     * Callback on runner cancellation.
+     *
+     * @param firstCancelRequest Flag indicating that worker cancellation was requested for the first time.
+     */
+    protected void onCancel(boolean firstCancelRequest) {
+        Thread runner = this.runner;
+
+        // Cannot apply Future.cancel() because if we do, then Future.get() would always
+        // throw CancellationException, and we would not be able to wait for task completion.
+        if (runner != null) {
+            runner.interrupt();
+        }
+    }
+
+    /**
+     * Callback on special case, when task is cancelled before is has been scheduled.
+     */
+    protected void onCancelledBeforeWorkerScheduled() {
+        Thread runner = this.runner;
+
+        assert runner != null : this;
+
+        runner.interrupt();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        Thread runner = this.runner;
+
+        return S.toString(IgniteWorker.class, this,
+                "hashCode", hashCode(),
+                "interrupted", (runner != null ? runner.isInterrupted() : "unknown"),
+                "runner", (runner == null ? "null" : runner.getName()));
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorkerListener.java
similarity index 56%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
copy to modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorkerListener.java
index 7c9d5eb89..3ebdb08d5 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/IgniteWorkerListener.java
@@ -15,18 +15,33 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-
-
-import java.util.concurrent.CompletableFuture;
+package org.apache.ignite.internal.util.worker;
 
 /**
- * Represents information of progress of a current checkpoint and allows obtaining future to wait for a particular checkpoint state.
+ * This interface defines worker listener.
  */
-// TODO: IGNITE-16898 Continue porting the code
-public interface CheckpointProgress {
+public interface IgniteWorkerListener {
+    /**
+     * Callback before executing the {@link IgniteWorker#body body of the worker}.
+     *
+     * @param worker Started worker.
+     */
+    default void onStarted(IgniteWorker worker) {
+    }
+
+    /**
+     * Callback after executing the {@link IgniteWorker#body body of the worker}.
+     *
+     * @param worker Stopped worker.
+     */
+    default void onStopped(IgniteWorker worker) {
+    }
+
     /**
-     * Returns future which can be used for detection when current checkpoint reaches the specific state.
+     * Callback on idle worker.
+     *
+     * @param worker Idle worker.
      */
-    CompletableFuture<?> futureFor(CheckpointState state);
+    default void onIdle(IgniteWorker worker) {
+    }
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/WorkProgressDispatcher.java
similarity index 51%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
copy to modules/core/src/main/java/org/apache/ignite/internal/util/worker/WorkProgressDispatcher.java
index 94056b669..68516d08c 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/worker/WorkProgressDispatcher.java
@@ -15,26 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.pagememory.persistence.checkpoint;
-
-import org.jetbrains.annotations.Nullable;
+package org.apache.ignite.internal.util.worker;
 
 /**
- * Empty.
+ * Dispatcher of workers' progress which allows us to understand if worker freezes.
  */
-// TODO: IGNITE-16898 Continue porting the code
-public abstract class Checkpointer {
+public interface WorkProgressDispatcher {
+    /**
+     * Last heartbeat timestamp.
+     */
+    long heartbeat();
+
+    /**
+     * Notifying dispatcher that work is in progress and thread didn't freeze.
+     */
+    void updateHeartbeat();
+
     /**
-     * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
-     *
-     * @param delayFromNow Delay from now in milliseconds.
-     * @param reason Wakeup reason.
-     * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
+     * Protects the worker from timeout penalties if subsequent instructions in the calling thread does not update heartbeat timestamp
+     * timely, e.g. due to blocking operations, up to the nearest {@link #blockingSectionEnd()} call. Nested calls are not supported.
      */
-    public abstract CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason);
+    void blockingSectionBegin();
 
     /**
-     * Returns runner thread, {@code null} if the worker has not yet started executing.
+     * Closes the protection section previously opened by {@link #blockingSectionBegin()}.
      */
-    public abstract @Nullable Thread runner();
+    void blockingSectionEnd();
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/worker/IgniteWorkerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/worker/IgniteWorkerTest.java
new file mode 100644
index 000000000..ca484abe5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/worker/IgniteWorkerTest.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.worker;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_IDLE;
+import static org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_STARTED;
+import static org.apache.ignite.internal.util.worker.IgniteWorkerTest.TestWorkerListener.ON_STOPPED;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link IgniteWorker} testing.
+ */
+public class IgniteWorkerTest {
+    static final String CLEANUP = "cleanup";
+
+    private final IgniteLogger log = IgniteLogger.forClass(IgniteWorkerTest.class);
+
+    @Test
+    void testNewIgniteWorker() {
+        IgniteWorker worker = new NoopWorker(log, null);
+
+        assertEquals("testNode", worker.igniteInstanceName());
+        assertEquals("testWorker", worker.name());
+
+        assertEquals(0, worker.heartbeat());
+
+        assertFalse(worker.isCancelled());
+        assertFalse(worker.isDone());
+
+        assertNull(worker.runner());
+    }
+
+    @Test
+    void testBlockingSelection() {
+        IgniteWorker worker = new NoopWorker(log, null);
+
+        long currentTimeMillis = coarseCurrentTimeMillis();
+
+        worker.blockingSectionBegin();
+
+        assertEquals(Long.MAX_VALUE, worker.heartbeat());
+
+        worker.blockingSectionEnd();
+
+        assertThat(worker.heartbeat(), greaterThanOrEqualTo(currentTimeMillis));
+
+        // Checks update heartbeat after blockingSectionBegin().
+
+        worker.blockingSectionBegin();
+
+        assertEquals(Long.MAX_VALUE, worker.heartbeat());
+
+        worker.updateHeartbeat();
+
+        assertThat(worker.heartbeat(), greaterThanOrEqualTo(currentTimeMillis));
+
+        worker.blockingSectionEnd();
+
+        assertThat(worker.heartbeat(), greaterThanOrEqualTo(currentTimeMillis));
+    }
+
+    @Test
+    void testUpdateHeartbeat() throws Exception {
+        IgniteWorker worker = new NoopWorker(log, null);
+
+        long currentTimeMillis = coarseCurrentTimeMillis();
+
+        worker.updateHeartbeat();
+
+        long heartbeat = worker.heartbeat();
+
+        assertThat(heartbeat, greaterThanOrEqualTo(currentTimeMillis));
+
+        Thread.sleep(10);
+
+        assertEquals(heartbeat, worker.heartbeat());
+
+        worker.updateHeartbeat();
+
+        assertThat(worker.heartbeat(), greaterThan(heartbeat));
+    }
+
+    @Test
+    void testIdle() {
+        List<String> events = new ArrayList<>();
+
+        TestWorkerListener listener = new TestWorkerListener(events);
+
+        IgniteWorker worker = new NoopWorker(log, listener);
+
+        worker.onIdle();
+
+        assertThat(events, equalTo(List.of(ON_IDLE)));
+    }
+
+    @Test
+    void testRun() {
+        List<String> events = new ArrayList<>();
+
+        TestWorkerListener listener = new TestWorkerListener(events) {
+            /** {@inheritDoc} */
+            @Override
+            public void onStarted(IgniteWorker worker) {
+                super.onStarted(worker);
+
+                assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+                assertSame(Thread.currentThread(), worker.runner());
+                assertFalse(worker.isCancelled());
+                assertFalse(worker.isDone());
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void onStopped(IgniteWorker worker) {
+                super.onStopped(worker);
+
+                assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+                assertSame(Thread.currentThread(), worker.runner());
+                assertFalse(worker.isCancelled());
+                assertTrue(worker.isDone());
+            }
+        };
+
+        IgniteWorker worker = new NoopWorker(log, listener) {
+            /** {@inheritDoc} */
+            @Override
+            protected void cleanup() {
+                events.add(CLEANUP);
+            }
+        };
+
+        worker.run();
+
+        assertThat(events, equalTo(List.of(ON_STARTED, CLEANUP, ON_STOPPED)));
+
+        assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+        assertNull(worker.runner());
+        assertFalse(worker.isCancelled());
+        assertTrue(worker.isDone());
+    }
+
+    @Test
+    void testInterruptFromBody() {
+        List<String> events = new ArrayList<>();
+
+        TestWorkerListener listener = new TestWorkerListener(events);
+
+        IgniteWorker worker = new NoopWorker(log, listener) {
+            /** {@inheritDoc} */
+            @Override
+            protected void body() throws InterruptedException {
+                Thread.currentThread().interrupt();
+
+                throw new InterruptedException();
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            protected void cleanup() {
+                events.add(CLEANUP);
+            }
+        };
+
+        worker.run();
+
+        assertThat(listener.events, equalTo(List.of(ON_STARTED, CLEANUP, ON_STOPPED)));
+
+        assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+        assertNull(worker.runner());
+        assertFalse(worker.isCancelled());
+        assertTrue(worker.isDone());
+        assertTrue(Thread.interrupted());
+    }
+
+    @Test
+    void testExceptionFromBody() {
+        List<String> events = new ArrayList<>();
+
+        TestWorkerListener listener = new TestWorkerListener(events);
+
+        IgniteWorker worker = new NoopWorker(log, listener) {
+            /** {@inheritDoc} */
+            @Override
+            protected void body() {
+                throw new RuntimeException();
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            protected void cleanup() {
+                events.add(CLEANUP);
+            }
+        };
+
+        worker.run();
+
+        assertThat(listener.events, equalTo(List.of(ON_STARTED, CLEANUP, ON_STOPPED)));
+
+        assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+        assertNull(worker.runner());
+        assertFalse(worker.isCancelled());
+        assertTrue(worker.isDone());
+    }
+
+    @Test
+    void testErrorFromBody() {
+        List<String> events = new ArrayList<>();
+
+        TestWorkerListener listener = new TestWorkerListener(events);
+
+        IgniteWorker worker = new NoopWorker(log, listener) {
+            /** {@inheritDoc} */
+            @Override
+            protected void body() {
+                throw new Error();
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            protected void cleanup() {
+                events.add(CLEANUP);
+            }
+        };
+
+        assertThrows(Error.class, worker::run);
+
+        assertThat(listener.events, equalTo(List.of(ON_STARTED, CLEANUP, ON_STOPPED)));
+
+        assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+        assertNull(worker.runner());
+        assertFalse(worker.isCancelled());
+        assertTrue(worker.isDone());
+    }
+
+    @Test
+    void testCancelWorker() {
+        List<String> events = new ArrayList<>();
+
+        TestWorkerListener listener = new TestWorkerListener(events);
+
+        IgniteWorker worker = new NoopWorker(log, listener) {
+            /** {@inheritDoc} */
+            @Override
+            protected void body() {
+                cancel();
+
+                cancel();
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            protected void cleanup() {
+                events.add(CLEANUP);
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            protected void onCancel(boolean firstCancelRequest) {
+                super.onCancel(firstCancelRequest);
+
+                events.add("firstCancel=" + firstCancelRequest);
+
+                assertTrue(runner().isInterrupted());
+                assertTrue(isCancelled());
+                assertFalse(isDone());
+            }
+        };
+
+        worker.run();
+
+        assertThat(listener.events, equalTo(List.of(ON_STARTED, "firstCancel=true", "firstCancel=false", CLEANUP, ON_STOPPED)));
+
+        assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+        assertNull(worker.runner());
+        assertTrue(worker.isCancelled());
+        assertTrue(worker.isDone());
+        assertTrue(Thread.interrupted());
+    }
+
+    @Test
+    void testCancelBeforeStartWorker() {
+        List<String> events = new ArrayList<>();
+
+        TestWorkerListener listener = new TestWorkerListener(events);
+
+        IgniteWorker worker = new NoopWorker(log, listener) {
+            /** {@inheritDoc} */
+            @Override
+            protected void cleanup() {
+                events.add(CLEANUP);
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            protected void onCancel(boolean firstCancelRequest) {
+                super.onCancel(firstCancelRequest);
+
+                events.add("firstCancel=" + firstCancelRequest);
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            protected void onCancelledBeforeWorkerScheduled() {
+                super.onCancelledBeforeWorkerScheduled();
+
+                events.add("onCancelledBeforeWorkerScheduled");
+
+                assertTrue(runner().isInterrupted());
+                assertTrue(isCancelled());
+                assertFalse(isDone());
+            }
+        };
+
+        worker.cancel();
+
+        worker.run();
+
+        assertThat(
+                listener.events,
+                equalTo(List.of("firstCancel=true", "onCancelledBeforeWorkerScheduled", ON_STARTED, CLEANUP, ON_STOPPED))
+        );
+
+        assertThat(worker.heartbeat(), lessThanOrEqualTo(coarseCurrentTimeMillis()));
+        assertNull(worker.runner());
+        assertTrue(worker.isCancelled());
+        assertTrue(worker.isDone());
+        assertTrue(Thread.interrupted());
+    }
+
+    @Test
+    void testJoin() throws Exception {
+        CountDownLatch finishBodyLatch = new CountDownLatch(1);
+
+        CountDownLatch startLatch = new CountDownLatch(2);
+
+        try {
+            IgniteWorker worker = new NoopWorker(log, null) {
+                /** {@inheritDoc} */
+                @Override
+                protected void body() throws InterruptedException {
+                    finishBodyLatch.await(1, TimeUnit.SECONDS);
+                }
+            };
+
+            CompletableFuture<?> runWorkerFuture = runAsync(() -> {
+                startLatch.countDown();
+
+                worker.run();
+            });
+
+            CompletableFuture<?> joinWorkerFuture = runAsync(() -> {
+                startLatch.countDown();
+
+                worker.join();
+            });
+
+            startLatch.await(100, TimeUnit.MILLISECONDS);
+
+            assertThrows(TimeoutException.class, () -> joinWorkerFuture.get(100, TimeUnit.MILLISECONDS));
+
+            finishBodyLatch.countDown();
+
+            runWorkerFuture.get(100, TimeUnit.MILLISECONDS);
+            joinWorkerFuture.get(100, TimeUnit.MILLISECONDS);
+        } finally {
+            finishBodyLatch.countDown();
+        }
+    }
+
+    /**
+     * A worker implementation that does nothing.
+     */
+    private static class NoopWorker extends IgniteWorker {
+        /**
+         * Constructor.
+         *
+         * @param log Logger.
+         * @param listener Listener for life-cycle events.
+         */
+        protected NoopWorker(IgniteLogger log, @Nullable IgniteWorkerListener listener) {
+            super(log, "testNode", "testWorker", listener);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        protected void body() throws InterruptedException {
+        }
+    }
+
+    /**
+     * Test listener implementation that simply collects events.
+     */
+    static class TestWorkerListener implements IgniteWorkerListener {
+        static final String ON_STARTED = "onStarted";
+
+        static final String ON_STOPPED = "onStopped";
+
+        static final String ON_IDLE = "onIdle";
+
+        final List<String> events;
+
+        /**
+         * Constructor.
+         *
+         * @param events For recording events.
+         */
+        private TestWorkerListener(List<String> events) {
+            this.events = events;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onStarted(IgniteWorker worker) {
+            events.add(ON_STARTED);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onStopped(IgniteWorker worker) {
+            events.add(ON_STOPPED);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onIdle(IgniteWorker worker) {
+            events.add(ON_IDLE);
+        }
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
index cd3c6d2ba..fe24c2e06 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImpl.java
@@ -1186,7 +1186,7 @@ public class PageMemoryImpl implements PageMemoryEx {
         boolean wasDirty = dirty(absPtr, dirty);
 
         if (dirty) {
-            // TODO: IGNITE-16898 Don't forget add assertion for checkpoint lock held by this thread
+            // TODO: IGNITE-16935 Don't forget add assertion for checkpoint lock held by this thread
 
             if (!wasDirty || forceAdd) {
                 Segment seg = segment(pageId.groupId(), pageId.pageId());
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
index 99921e9dd..29f7d257f 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PageStoreWriter.java
@@ -33,7 +33,7 @@ public interface PageStoreWriter {
      * @param fullPageId Page ID to get byte buffer for. The page ID must be present in the collection returned by the {@link
      * PageMemoryImpl#beginCheckpoint(CompletableFuture)} method call.
      * @param buf Temporary buffer to write changes into.
-     * @param tag {@code Partition generation} if data was read, {@code null} otherwise (data already saved to storage).
+     * @param tag {@code Partition generation} if data was read.
      * @throws IgniteInternalCheckedException If write page failed.
      */
     void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteInternalCheckedException;
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java
new file mode 100644
index 000000000..54904cc38
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpoint.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+
+/**
+ * Data class of checkpoint information.
+ */
+class Checkpoint {
+    /** Checkpoint pages. */
+    final IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> dirtyPages;
+
+    /** Checkpoint progress status. */
+    final CheckpointProgressImpl progress;
+
+    /** Number of dirty pages. */
+    final int dirtyPagesSize;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Pages to write to the page store.
+     * @param progress Checkpoint progress status.
+     */
+    Checkpoint(
+            IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> dirtyPages,
+            CheckpointProgressImpl progress
+    ) {
+        this.dirtyPages = dirtyPages;
+        this.progress = progress;
+
+        dirtyPagesSize = dirtyPages.initialSize();
+    }
+
+    /**
+     * Returns {@code true} if this checkpoint contains at least one dirty page.
+     */
+    public boolean hasDelta() {
+        return dirtyPagesSize != 0;
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java
new file mode 100644
index 000000000..823a116b7
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointDirtyPagesInfoHolder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import java.util.Collection;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Holder of information about dirty pages by {@link PageMemoryImpl} for checkpoint.
+ */
+class CheckpointDirtyPagesInfoHolder {
+    /** Total number of dirty pages. */
+    final int dirtyPageCount;
+
+    /** Collection of dirty pages per {@link PageMemoryImpl} distribution. */
+    final Collection<IgniteBiTuple<PageMemoryImpl, Collection<FullPageId>>> dirtyPages;
+
+    /**
+     * Constructor.
+     *
+     * @param dirtyPages Collection of dirty pages per {@link PageMemoryImpl} distribution.
+     * @param dirtyPageCount Total number of dirty pages.
+     */
+    public CheckpointDirtyPagesInfoHolder(
+            Collection<IgniteBiTuple<PageMemoryImpl, Collection<FullPageId>>> dirtyPages,
+            int dirtyPageCount
+    ) {
+        this.dirtyPages = dirtyPages;
+        this.dirtyPageCount = dirtyPageCount;
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointListener.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointListener.java
new file mode 100644
index 000000000..345b95b2c
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointListener.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Listener which methods will be called in a corresponded checkpoint life cycle period.
+ */
+public interface CheckpointListener {
+    /**
+     * Callback at the checkpoint start mark.
+     *
+     * <p>Holds checkpoint write lock.
+     *
+     * @param progress Progress of the current checkpoint.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    default void onMarkCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+    }
+
+    /**
+     * Callback at the beginning of the checkpoint.
+     *
+     * <p>After release checkpoint write lock.
+     *
+     * @param progress Progress of the current checkpoint.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    default void onCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+    }
+
+    /**
+     * Callback before the start of the checkpoint.
+     *
+     * <p>Holds checkpoint read lock.
+     *
+     * @param progress Progress of the current checkpoint.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    default void beforeCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+    }
+
+    /**
+     * Callback after checkpoint ends.
+     *
+     * <p>Does not hold checkpoint read and write locks.
+     *
+     * @param progress Progress of the current checkpoint.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    default void afterCheckpointEnd(CheckpointProgress progress) throws IgniteInternalCheckedException {
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java
new file mode 100644
index 000000000..a1f858bf5
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorage.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.exists;
+import static java.nio.file.Files.isDirectory;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toList;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstraction responsible for managing checkpoint markers storage.
+ */
+// TODO: IGNITE-15818 At the moment, a simple implementation has been made, it may need to be redone, you need to check it
+public class CheckpointMarkersStorage {
+    /** Checkpoint start marker. */
+    private static final String CHECKPOINT_START_MARKER = "START";
+
+    /** Checkpoint end marker. */
+    private static final String CHECKPOINT_END_MARKER = "END";
+
+    /** Checkpoint marker file name pattern. */
+    private static final Pattern CHECKPOINT_MARKER_FILE_NAME_PATTERN = Pattern.compile("(.*)-(START|END)\\.bin");
+
+    /** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end markers. */
+    private final Path checkpointDir;
+
+    /** Checkpoint IDs. */
+    private final Set<UUID> checkpointIds;
+
+    /**
+     * Constructor.
+     *
+     * @param storagePath Storage path.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public CheckpointMarkersStorage(
+            Path storagePath
+    ) throws IgniteInternalCheckedException {
+        checkpointDir = storagePath.resolve("cp");
+
+        try {
+            createDirectories(checkpointDir);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create directory for checkpoint metadata: " + checkpointDir, e);
+        }
+
+        checkCheckpointDir(checkpointDir);
+
+        try {
+            checkpointIds = list(checkpointDir)
+                    .map(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile)
+                    .collect(toCollection(ConcurrentHashMap::newKeySet));
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not read checkpoint markers: " + checkpointDir, e);
+        }
+    }
+
+    /**
+     * Callback at the start of the checkpoint.
+     *
+     * <p>Creates a start marker for a checkpoint.
+     *
+     * @param checkpointId Checkpoint id.
+     */
+    public void onCheckpointBegin(UUID checkpointId) throws IgniteInternalCheckedException {
+        assert !checkpointIds.contains(checkpointId) : checkpointId;
+
+        Path checkpointStartMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+
+        try {
+            createFile(checkpointStartMarker);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create start checkpoint marker: " + checkpointStartMarker, e);
+        }
+
+        checkpointIds.add(checkpointId);
+    }
+
+    /**
+     * Callback at the end of the checkpoint.
+     *
+     * <p>Creates an end marker for a checkpoint.
+     *
+     * @param checkpointId Checkpoint id.
+     */
+    public void onCheckpointEnd(UUID checkpointId) throws IgniteInternalCheckedException {
+        assert checkpointIds.contains(checkpointId) : checkpointId;
+
+        Path checkpointEndMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+        try {
+            createFile(checkpointEndMarker);
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not create end checkpoint marker: " + checkpointEndMarker, e);
+        }
+
+        for (Iterator<UUID> it = checkpointIds.iterator(); it.hasNext(); ) {
+            UUID id = it.next();
+
+            if (!id.equals(checkpointId)) {
+                removeCheckpointMarkers(id);
+
+                it.remove();
+            }
+        }
+    }
+
+    private void removeCheckpointMarkers(UUID checkpointId) {
+        Path startMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_START_MARKER));
+        Path endMarker = checkpointDir.resolve(checkpointMarkerFileName(checkpointId, CHECKPOINT_END_MARKER));
+
+        if (exists(startMarker)) {
+            startMarker.toFile().delete();
+        }
+
+        if (exists(endMarker)) {
+            endMarker.toFile().delete();
+        }
+    }
+
+    /**
+     * Checks that the directory contains only paired (start and end) checkpoint markers.
+     */
+    private static void checkCheckpointDir(Path checkpointDir) throws IgniteInternalCheckedException {
+        assert isDirectory(checkpointDir) : checkpointDir;
+
+        try {
+            Map<Boolean, List<Path>> files = list(checkpointDir)
+                    .collect(partitioningBy(path -> parseCheckpointIdFromMarkerFile(path) != null));
+
+            if (!files.get(false).isEmpty()) {
+                throw new IgniteInternalCheckedException(
+                        "Not checkpoint markers found, they need to be removed manually: " + files.get(false)
+                );
+            }
+
+            Map<UUID, List<Path>> checkpointMarkers = files.get(true).stream()
+                    .collect(groupingBy(CheckpointMarkersStorage::parseCheckpointIdFromMarkerFile));
+
+            List<UUID> checkpointsWithoutEndMarker = checkpointMarkers.entrySet().stream()
+                    .filter(e -> e.getValue().stream().noneMatch(path -> path.getFileName().toString().contains(CHECKPOINT_END_MARKER)))
+                    .map(Entry::getKey)
+                    .collect(toList());
+
+            if (!checkpointsWithoutEndMarker.isEmpty()) {
+                throw new IgniteInternalCheckedException("Found incomplete checkpoints: " + checkpointsWithoutEndMarker);
+            }
+        } catch (IOException e) {
+            throw new IgniteInternalCheckedException("Could not reads checkpoint markers: " + checkpointDir, e);
+        }
+    }
+
+    private static String checkpointMarkerFileName(UUID id, String checkpointMarker) {
+        return id + "-" + checkpointMarker + ".bin";
+    }
+
+    private static @Nullable UUID parseCheckpointIdFromMarkerFile(Path checkpointMarkerPath) {
+        Matcher matcher = CHECKPOINT_MARKER_FILE_NAME_PATTERN.matcher(checkpointMarkerPath.getFileName().toString());
+
+        if (!matcher.matches()) {
+            return null;
+        }
+
+        return UUID.fromString(matcher.group(1));
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
index 7c9d5eb89..00f37f8a8 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgress.java
@@ -17,16 +17,36 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Represents information of progress of a current checkpoint and allows obtaining future to wait for a particular checkpoint state.
  */
-// TODO: IGNITE-16898 Continue porting the code
 public interface CheckpointProgress {
+    /**
+     * Returns checkpoint ID.
+     */
+    UUID id();
+
+    /**
+     * Returns description of the reason of the current checkpoint.
+     */
+    @Nullable String reason();
+
+    /**
+     * Return {@code true} If checkpoint already started but have not finished yet.
+     */
+    boolean inProgress();
+
     /**
      * Returns future which can be used for detection when current checkpoint reaches the specific state.
      */
     CompletableFuture<?> futureFor(CheckpointState state);
+
+    /**
+     * Returns number of dirty pages in current checkpoint. If checkpoint is not running, returns {@code 0}.
+     */
+    int currentCheckpointPagesCount();
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
new file mode 100644
index 000000000..f067ca3c8
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.SCHEDULED;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data class representing the state of running/scheduled checkpoint.
+ */
+// TODO: IGNITE-16950 Check for a race between futureFor, transitTo and fail
+class CheckpointProgressImpl implements CheckpointProgress {
+    /** Checkpoint id. */
+    private final UUID id = UUID.randomUUID();
+
+    /** Scheduled time of checkpoint. */
+    private volatile long nextCheckpointNanos;
+
+    /** Current checkpoint state. */
+    private volatile AtomicReference<CheckpointState> state = new AtomicReference<>(SCHEDULED);
+
+    /** Future which would be finished when corresponds state is set. */
+    private final Map<CheckpointState, CompletableFuture<Void>> stateFutures = new ConcurrentHashMap<>();
+
+    /** Wakeup reason. */
+    private volatile String reason;
+
+    /** Number of dirty pages in current checkpoint at the beginning of checkpoint. */
+    private volatile int currCheckpointPagesCnt;
+
+    /** Cause of fail, which has happened during the checkpoint or {@code null} if checkpoint was successful. */
+    @Nullable
+    private volatile Throwable failCause;
+
+    /** Counter for written checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger writtenPagesCntr;
+
+    /** Counter for fsynced checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger syncedPagesCntr;
+
+    /** Counter for evicted checkpoint pages. Not {@link null} only if checkpoint is running. */
+    @Nullable
+    private volatile AtomicInteger evictedPagesCntr;
+
+    /**
+     * Constructor.
+     *
+     * @param delay Delay in nanos before next checkpoint is to be executed. Value is from {@code 0} to {@code 365} days.
+     */
+    CheckpointProgressImpl(long delay) {
+        nextCheckpointNanos(delay);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public UUID id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public @Nullable String reason() {
+        return reason;
+    }
+
+    /**
+     * Sets description of the reason of the current checkpoint.
+     *
+     * @param reason New wakeup reason.
+     */
+    public void reason(String reason) {
+        this.reason = reason;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean inProgress() {
+        return greaterOrEqualTo(LOCK_RELEASED) && !greaterOrEqualTo(FINISHED);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> futureFor(CheckpointState state) {
+        CompletableFuture<Void> stateFut = stateFutures.computeIfAbsent(state, (k) -> new CompletableFuture<>());
+
+        if (greaterOrEqualTo(state)) {
+            completeFuture(stateFut, failCause);
+        }
+
+        return stateFut;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int currentCheckpointPagesCount() {
+        return currCheckpointPagesCnt;
+    }
+
+    /**
+     * Sets current checkpoint pages num to store.
+     *
+     * @param num Pages to store.
+     */
+    public void currentCheckpointPagesCount(int num) {
+        currCheckpointPagesCnt = num;
+    }
+
+    /**
+     * Returns counter for written checkpoint pages. Not {@code null} only if checkpoint is running.
+     */
+    public @Nullable AtomicInteger writtenPagesCounter() {
+        return writtenPagesCntr;
+    }
+
+    /**
+     * Returns counter for fsynced checkpoint pages. Not {@code null} only if checkpoint is running.
+     */
+    public @Nullable AtomicInteger syncedPagesCounter() {
+        return syncedPagesCntr;
+    }
+
+    /**
+     * Returns Counter for evicted pages during current checkpoint. Not {@code null} only if checkpoint is running.
+     */
+    public @Nullable AtomicInteger evictedPagesCounter() {
+        return evictedPagesCntr;
+    }
+
+    /**
+     * Returns scheduled time of checkpoint in nanos.
+     */
+    public long nextCheckpointNanos() {
+        return nextCheckpointNanos;
+    }
+
+    /**
+     * Sets new scheduled time of checkpoint in nanos.
+     *
+     * @param delay Delay in nanos before next checkpoint is to be executed. Value is from {@code 0} to {@code 365} days.
+     */
+    public void nextCheckpointNanos(long delay) {
+        assert delay >= 0 : delay;
+        assert delay <= TimeUnit.DAYS.toNanos(365) : delay;
+
+        this.nextCheckpointNanos = System.nanoTime() + delay;
+    }
+
+    /**
+     * Clear checkpoint progress counters.
+     */
+    public void clearCounters() {
+        currCheckpointPagesCnt = 0;
+
+        writtenPagesCntr = null;
+        syncedPagesCntr = null;
+        evictedPagesCntr = null;
+    }
+
+    /**
+     * Initialize all counters before checkpoint.
+     *
+     * @param pagesSize Number of dirty pages in current checkpoint at the beginning of checkpoint.
+     */
+    public void initCounters(int pagesSize) {
+        currCheckpointPagesCnt = pagesSize;
+
+        writtenPagesCntr = new AtomicInteger();
+        syncedPagesCntr = new AtomicInteger();
+        evictedPagesCntr = new AtomicInteger();
+    }
+
+    /**
+     * Changing checkpoint state if order of state is correct.
+     *
+     * @param newState New checkpoint state.
+     */
+    public void transitTo(CheckpointState newState) {
+        CheckpointState state = this.state.get();
+
+        if (state.ordinal() < newState.ordinal()) {
+            this.state.compareAndSet(state, newState);
+
+            doFinishFuturesWhichLessOrEqualTo(newState);
+        }
+    }
+
+    /**
+     * Mark this checkpoint execution as failed.
+     *
+     * @param error Causal error of fail.
+     */
+    public void fail(Throwable error) {
+        failCause = error;
+
+        transitTo(FINISHED);
+    }
+
+    /**
+     * Returns {@code true} if current state equal or greater to given state.
+     *
+     * @param expectedState Expected state.
+     */
+    public boolean greaterOrEqualTo(CheckpointState expectedState) {
+        return state.get().ordinal() >= expectedState.ordinal();
+    }
+
+    /**
+     * Returns current state.
+     */
+    CheckpointState state() {
+        return state.get();
+    }
+
+    /**
+     * Finishing futures with correct result in direct state order until lastState(included).
+     *
+     * @param lastState State until which futures should be done.
+     */
+    private void doFinishFuturesWhichLessOrEqualTo(CheckpointState lastState) {
+        for (CheckpointState old : CheckpointState.values()) {
+            completeFuture(stateFutures.get(old), failCause);
+
+            if (old == lastState) {
+                return;
+            }
+        }
+    }
+
+    private static void completeFuture(@Nullable CompletableFuture<?> future, @Nullable Throwable throwable) {
+        if (future != null && !future.isDone()) {
+            if (throwable != null) {
+                future.completeExceptionally(throwable);
+            } else {
+                future.complete(null);
+            }
+        }
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java
index 339f9a28f..e83a542b5 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointReadWriteLock.java
@@ -30,7 +30,7 @@ public class CheckpointReadWriteLock {
      * Any thread with a such prefix is managed by the checkpoint. So some conditions can rely on it(ex. we don't need a checkpoint lock
      * there because checkpoint is already held write lock).
      */
-    // TODO: IGNITE-16898 I think it needs to be redone or relocated
+    // TODO: IGNITE-16935 I think it needs to be redone or relocated
     static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner";
 
     /** Checkpoint lock. */
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java
index b246e7736..17b33fe75 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointState.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 /**
  * Possible checkpoint states. Ordinal is important. Every next state follows the previous one.
  */
-// TODO: IGNITE-16898 Review states
 public enum CheckpointState {
     /** Checkpoint is waiting to execution. **/
     SCHEDULED,
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
new file mode 100644
index 000000000..8e94086b2
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflow.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static org.apache.ignite.lang.IgniteSystemProperties.getInteger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinWorkerThread;
+import java.util.concurrent.Future;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This class responsibility is to complement {@link Checkpointer} class with side logic of checkpointing like checkpoint listeners
+ * notifications, collect dirt pages etc.
+ *
+ * <p>It allows {@link Checkpointer} class is to focus on its main responsibility: synchronizing memory with disk.
+ *
+ * <p>Additional actions needed during checkpoint are implemented in this class.
+ *
+ * <p>Two main blocks of logic this class is responsible for:
+ *
+ * <p>{@link CheckpointWorkflow#markCheckpointBegin} - Initialization of next checkpoint. It collects all required info.
+ *
+ * <p>{@link CheckpointWorkflow#markCheckpointEnd} - Finalization of last checkpoint.
+ */
+class CheckpointWorkflow implements IgniteComponent {
+    /**
+     * Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])} in case
+     * of {@link CheckpointWriteOrder#SEQUENTIAL}.
+     */
+    public static final String CHECKPOINT_PARALLEL_SORT_THRESHOLD = "CHECKPOINT_PARALLEL_SORT_THRESHOLD";
+
+    /**
+     * Starting from this number of dirty pages in checkpoint, array will be sorted with {@link Arrays#parallelSort(Comparable[])} in case
+     * of {@link CheckpointWriteOrder#SEQUENTIAL}.
+     */
+    // TODO: IGNITE-16935 Move to configuration
+    private final int parallelSortThreshold = getInteger(CHECKPOINT_PARALLEL_SORT_THRESHOLD, 512 * 1024);
+
+    /** This number of threads will be created and used for parallel sorting. */
+    private static final int PARALLEL_SORT_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 8);
+
+    /** Checkpoint marker storage. */
+    private final CheckpointMarkersStorage checkpointMarkersStorage;
+
+    /** Checkpoint lock. */
+    private final CheckpointReadWriteLock checkpointReadWriteLock;
+
+    /** Persistent data regions for the checkpointing. */
+    private final Collection<PageMemoryDataRegion> dataRegions;
+
+    /** Checkpoint write order configuration. */
+    private final CheckpointWriteOrder checkpointWriteOrder;
+
+    /** Collections of checkpoint listeners. */
+    private final List<IgniteBiTuple<CheckpointListener, PageMemoryDataRegion>> listeners = new CopyOnWriteArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param checkpointMarkersStorage Checkpoint marker storage.
+     * @param checkpointReadWriteLock Checkpoint read write lock.
+     * @param checkpointWriteOrder Checkpoint write order.
+     * @param dataRegions Persistent data regions for the checkpointing, doesn't copy.
+     */
+    public CheckpointWorkflow(
+            CheckpointMarkersStorage checkpointMarkersStorage,
+            CheckpointReadWriteLock checkpointReadWriteLock,
+            CheckpointWriteOrder checkpointWriteOrder,
+            Collection<PageMemoryDataRegion> dataRegions
+    ) {
+        this.checkpointMarkersStorage = checkpointMarkersStorage;
+        this.checkpointReadWriteLock = checkpointReadWriteLock;
+        this.checkpointWriteOrder = checkpointWriteOrder;
+        this.dataRegions = dataRegions;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() {
+        listeners.clear();
+    }
+
+    /**
+     * First stage of checkpoint which collects demanded information (dirty pages mostly).
+     *
+     * @param startCheckpointTimestamp Checkpoint start timestamp.
+     * @param curr Current checkpoint event info.
+     * @return Checkpoint collected info.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public Checkpoint markCheckpointBegin(
+            long startCheckpointTimestamp,
+            CheckpointProgressImpl curr
+    ) throws IgniteInternalCheckedException {
+        List<CheckpointListener> listeners = collectCheckpointListeners(dataRegions);
+
+        checkpointReadWriteLock.readLock();
+
+        try {
+            for (CheckpointListener listener : listeners) {
+                listener.beforeCheckpointBegin(curr);
+            }
+        } finally {
+            checkpointReadWriteLock.readUnlock();
+        }
+
+        checkpointReadWriteLock.writeLock();
+
+        CheckpointDirtyPagesInfoHolder dirtyPages;
+
+        try {
+            curr.transitTo(LOCK_TAKEN);
+
+            for (CheckpointListener listener : listeners) {
+                listener.onMarkCheckpointBegin(curr);
+            }
+
+            // There are allowable to replace pages only after checkpoint entry was stored to disk.
+            dirtyPages = beginCheckpoint(dataRegions, curr.futureFor(MARKER_STORED_TO_DISK));
+
+            curr.currentCheckpointPagesCount(dirtyPages.dirtyPageCount);
+
+            curr.transitTo(PAGE_SNAPSHOT_TAKEN);
+        } finally {
+            checkpointReadWriteLock.writeUnlock();
+        }
+
+        curr.transitTo(LOCK_RELEASED);
+
+        for (CheckpointListener listener : listeners) {
+            listener.onCheckpointBegin(curr);
+        }
+
+        if (dirtyPages.dirtyPageCount > 0) {
+            checkpointMarkersStorage.onCheckpointBegin(curr.id());
+
+            curr.transitTo(MARKER_STORED_TO_DISK);
+
+            return new Checkpoint(splitAndSortCpPagesIfNeeded(dirtyPages), curr);
+        }
+
+        return new Checkpoint(EMPTY, curr);
+    }
+
+    /**
+     * Do some actions on checkpoint finish (After all pages were written to disk).
+     *
+     * @param chp Checkpoint snapshot.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public void markCheckpointEnd(Checkpoint chp) throws IgniteInternalCheckedException {
+        synchronized (this) {
+            chp.progress.clearCounters();
+
+            for (PageMemoryDataRegion dataRegion : dataRegions) {
+                assert dataRegion.persistent() : dataRegion;
+
+                ((PageMemoryImpl) dataRegion.pageMemory()).finishCheckpoint();
+            }
+        }
+
+        checkpointMarkersStorage.onCheckpointEnd(chp.progress.id());
+
+        for (CheckpointListener listener : collectCheckpointListeners(dataRegions)) {
+            listener.afterCheckpointEnd(chp.progress);
+        }
+
+        chp.progress.transitTo(FINISHED);
+    }
+
+    /**
+     * Adds a listener to be called for the corresponding persistent data region.
+     *
+     * @param listener Listener.
+     * @param dataRegion Persistent data region for which listener is corresponded to, {@code null} for all regions.
+     */
+    public void addCheckpointListener(CheckpointListener listener, @Nullable PageMemoryDataRegion dataRegion) {
+        assert dataRegion == null || (dataRegion.persistent() && dataRegions.contains(dataRegion)) : dataRegion;
+
+        listeners.add(new IgniteBiTuple<>(listener, dataRegion));
+    }
+
+    /**
+     * Removes the listener.
+     *
+     * @param listener Listener.
+     */
+    public void removeCheckpointListener(CheckpointListener listener) {
+        listeners.remove(new IgniteBiTuple<CheckpointListener, PageMemoryDataRegion>() {
+            /** {@inheritDoc} */
+            @Override
+            public boolean equals(Object o) {
+                return listener == ((IgniteBiTuple<?, ?>) o).getKey();
+            }
+        });
+    }
+
+    /**
+     * Returns the checkpoint listeners for the data regions.
+     *
+     * @param dataRegions Data regions.
+     */
+    public List<CheckpointListener> collectCheckpointListeners(Collection<PageMemoryDataRegion> dataRegions) {
+        return listeners.stream()
+                .filter(tuple -> tuple.getValue() == null || dataRegions.contains(tuple.getValue()))
+                .map(IgniteBiTuple::getKey)
+                .collect(toUnmodifiableList());
+    }
+
+    private CheckpointDirtyPagesInfoHolder beginCheckpoint(
+            Collection<PageMemoryDataRegion> dataRegions,
+            CompletableFuture<?> allowToReplace
+    ) {
+        Collection<IgniteBiTuple<PageMemoryImpl, Collection<FullPageId>>> pages = new ArrayList<>(dataRegions.size());
+
+        int pageCount = 0;
+
+        for (PageMemoryDataRegion dataRegion : dataRegions) {
+            assert dataRegion.persistent() : dataRegion;
+
+            Collection<FullPageId> dirtyPages = ((PageMemoryImpl) dataRegion.pageMemory()).beginCheckpoint(allowToReplace);
+
+            pageCount += dirtyPages.size();
+
+            pages.add(new IgniteBiTuple<>((PageMemoryImpl) dataRegion.pageMemory(), dirtyPages));
+        }
+
+        return new CheckpointDirtyPagesInfoHolder(pages, pageCount);
+    }
+
+    private static ForkJoinPool parallelSortInIsolatedPool(
+            FullPageId[] pagesArr,
+            Comparator<FullPageId> cmp,
+            @Nullable ForkJoinPool pool
+    ) throws IgniteInternalCheckedException {
+        ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool1 -> {
+            ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool1);
+
+            worker.setName("checkpoint-pages-sorter-" + worker.getPoolIndex());
+
+            return worker;
+        };
+
+        ForkJoinPool execPool = pool == null ? new ForkJoinPool(PARALLEL_SORT_THREADS + 1, factory, null, false) : pool;
+
+        Future<?> sortTask = execPool.submit(() -> Arrays.parallelSort(pagesArr, cmp));
+
+        try {
+            sortTask.get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new IgniteInternalCheckedException(
+                    "Failed to perform pages array parallel sort",
+                    e instanceof ExecutionException ? e.getCause() : e
+            );
+        }
+
+        return execPool;
+    }
+
+    private IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> splitAndSortCpPagesIfNeeded(
+            CheckpointDirtyPagesInfoHolder dirtyPages
+    ) throws IgniteInternalCheckedException {
+        Set<IgniteBiTuple<PageMemoryImpl, FullPageId[]>> cpPagesPerRegion = new HashSet<>();
+
+        int realPagesArrSize = 0;
+
+        for (IgniteBiTuple<PageMemoryImpl, Collection<FullPageId>> regPages : dirtyPages.dirtyPages) {
+            FullPageId[] pages = new FullPageId[regPages.getValue().size()];
+
+            int pagePos = 0;
+
+            for (FullPageId dirtyPage : regPages.getValue()) {
+                assert realPagesArrSize++ != dirtyPages.dirtyPageCount :
+                        "Incorrect estimated dirty pages number: " + dirtyPages.dirtyPageCount;
+
+                pages[pagePos++] = dirtyPage;
+            }
+
+            // Some pages may have been already replaced.
+            if (pagePos != pages.length) {
+                cpPagesPerRegion.add(new IgniteBiTuple<>(regPages.getKey(), Arrays.copyOf(pages, pagePos)));
+            } else {
+                cpPagesPerRegion.add(new IgniteBiTuple<>(regPages.getKey(), pages));
+            }
+        }
+
+        if (checkpointWriteOrder == CheckpointWriteOrder.SEQUENTIAL) {
+            Comparator<FullPageId> cmp = Comparator.comparingInt(FullPageId::groupId).thenComparingLong(FullPageId::effectivePageId);
+
+            ForkJoinPool pool = null;
+
+            for (IgniteBiTuple<PageMemoryImpl, FullPageId[]> pagesPerReg : cpPagesPerRegion) {
+                if (pagesPerReg.getValue().length >= parallelSortThreshold) {
+                    pool = parallelSortInIsolatedPool(pagesPerReg.get2(), cmp, pool);
+                } else {
+                    Arrays.sort(pagesPerReg.get2(), cmp);
+                }
+            }
+
+            if (pool != null) {
+                pool.shutdown();
+            }
+        }
+
+        return new IgniteConcurrentMultiPairQueue<>(cpPagesPerRegion);
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWriteOrder.java
similarity index 52%
copy from modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
copy to modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWriteOrder.java
index 94056b669..4b48aa2e4 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWriteOrder.java
@@ -20,21 +20,32 @@ package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Empty.
+ * This enum defines order of writing pages to disk storage during checkpoint.
  */
-// TODO: IGNITE-16898 Continue porting the code
-public abstract class Checkpointer {
+public enum CheckpointWriteOrder {
     /**
-     * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
-     *
-     * @param delayFromNow Delay from now in milliseconds.
-     * @param reason Wakeup reason.
-     * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
+     * Pages are written in order provided by checkpoint pages collection iterator (which is basically a hashtable).
+     */
+    RANDOM,
+
+    /**
+     * All checkpoint pages are collected into single list and sorted by page index. Provides almost sequential disk writes, which can be
+     * much faster on some SSD models.
      */
-    public abstract CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason);
+    SEQUENTIAL;
 
     /**
-     * Returns runner thread, {@code null} if the worker has not yet started executing.
+     * Enumerated values.
+     */
+    private static final CheckpointWriteOrder[] VALS = values();
+
+    /**
+     * Efficiently gets enumerated value from its ordinal.
+     *
+     * @param ord Ordinal value.
+     * @return Enumerated value or {@code null} if ordinal out of range.
      */
-    public abstract @Nullable Thread runner();
+    public static @Nullable CheckpointWriteOrder fromOrdinal(int ord) {
+        return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
+    }
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
index 94056b669..422feb5f4 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java
@@ -17,24 +17,12 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
-import org.jetbrains.annotations.Nullable;
-
 /**
  * Empty.
  */
-// TODO: IGNITE-16898 Continue porting the code
+// TODO: IGNITE-16935 Continue porting the code
 public abstract class Checkpointer {
-    /**
-     * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
-     *
-     * @param delayFromNow Delay from now in milliseconds.
-     * @param reason Wakeup reason.
-     * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
-     */
-    public abstract CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason);
+    public abstract Thread runner();
 
-    /**
-     * Returns runner thread, {@code null} if the worker has not yet started executing.
-     */
-    public abstract @Nullable Thread runner();
+    public abstract CheckpointProgress scheduleCheckpoint(long l, String s);
 }
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueue.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueue.java
new file mode 100644
index 000000000..534c8ec07
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueue.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.function.Predicate.not;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ * Concurrent queue that wraps collection of {@code Pair<K, V[]>}.
+ *
+ * <p>The guarantee {@link #next} provided is sequentially emptify values per key array. i.e. input like:
+ *
+ * <br> p1 = new Pair<1, [1, 3, 5, 7]>
+ * <br> p2 = new Pair<2, [2, 3]>
+ * <br> p3 = new Pair<3, [200, 100]>
+ * <br> and further sequence of {@code poll} or {@code forEach} calls may produce output like:
+ * <br> [1, 1], [1, 3], [1, 5], [1, 7], [2, 2], [2, 3], [3, 200], [3, 100]
+ *
+ * @param <K> The type of key in input pair collection.
+ * @param <V> The type of value array.
+ */
+public class IgniteConcurrentMultiPairQueue<K, V> {
+    /** Empty pair queue. */
+    public static final IgniteConcurrentMultiPairQueue EMPTY = new IgniteConcurrentMultiPairQueue<>(Map.of());
+
+    /** Inner holder. */
+    private final V[][] vals;
+
+    /** Storage for every array length. */
+    private final int[] lenSeq;
+
+    /** Current absolute position. */
+    private final AtomicInteger pos = new AtomicInteger();
+
+    /** Precalculated max position. */
+    private final int maxPos;
+
+    /** Keys array. */
+    private final K[] keysArr;
+
+    /**
+     * Constructor.
+     *
+     * @param items Items.
+     */
+    public IgniteConcurrentMultiPairQueue(Map<K, ? extends Collection<V>> items) {
+        int pairCnt = (int) items.entrySet().stream().map(Map.Entry::getValue).filter(not(Collection::isEmpty)).count();
+
+        vals = (V[][]) new Object[pairCnt][];
+
+        keysArr = (K[]) new Object[pairCnt];
+
+        lenSeq = new int[pairCnt];
+
+        int keyPos = 0;
+
+        int size = -1;
+
+        for (Map.Entry<K, ? extends Collection<V>> p : items.entrySet()) {
+            if (p.getValue().isEmpty()) {
+                continue;
+            }
+
+            keysArr[keyPos] = p.getKey();
+
+            lenSeq[keyPos] = size += p.getValue().size();
+
+            vals[keyPos++] = (V[]) p.getValue().toArray();
+        }
+
+        maxPos = size + 1;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param items Items.
+     */
+    public IgniteConcurrentMultiPairQueue(Collection<IgniteBiTuple<K, V[]>> items) {
+        int pairCnt = (int) items.stream().map(Map.Entry::getValue).filter(k -> k.length > 0).count();
+
+        vals = (V[][]) new Object[pairCnt][];
+
+        keysArr = (K[]) new Object[pairCnt];
+
+        lenSeq = new int[pairCnt];
+
+        int keyPos = 0;
+
+        int size = -1;
+
+        for (Map.Entry<K, V[]> p : items) {
+            if (p.getValue().length == 0) {
+                continue;
+            }
+
+            keysArr[keyPos] = p.getKey();
+
+            lenSeq[keyPos] = size += p.getValue().length;
+
+            vals[keyPos++] = p.getValue();
+        }
+
+        maxPos = size + 1;
+    }
+
+    /**
+     * Retrieves and removes the head of this queue, or returns {@code false} if this queue is empty.
+     *
+     * @param res State holder.
+     * @return {@code true} if not empty result, or {@code false} if this queue is empty.
+     */
+    public boolean next(Result<K, V> res) {
+        int absPos = pos.getAndIncrement();
+
+        if (absPos >= maxPos) {
+            res.set(null, null, 0);
+
+            return false;
+        }
+
+        int segment = res.getSegment();
+
+        if (absPos > lenSeq[segment]) {
+            segment = Arrays.binarySearch(lenSeq, segment, lenSeq.length - 1, absPos);
+
+            segment = segment < 0 ? -segment - 1 : segment;
+        }
+
+        int relPos = segment == 0 ? absPos : (absPos - lenSeq[segment - 1] - 1);
+
+        K key = keysArr[segment];
+
+        res.set(key, vals[segment][relPos], segment);
+
+        return true;
+    }
+
+    /**
+     * Returns {@code true} if empty.
+     */
+    public boolean isEmpty() {
+        return pos.get() >= maxPos;
+    }
+
+    /**
+     * Returns Constant initialisation size.
+     */
+    public int initialSize() {
+        return maxPos;
+    }
+
+    /**
+     * State holder.
+     */
+    public static class Result<K, V> {
+        /** Current segment. */
+        private int segment;
+
+        /** Key holder. */
+        private K key;
+
+        /** Value holder. */
+        private V val;
+
+        /**
+         * Current state setter.
+         *
+         * @param k Key.
+         * @param v Value.
+         * @param seg Segment.
+         */
+        public void set(K k, V v, int seg) {
+            key = k;
+            val = v;
+            segment = seg;
+        }
+
+        /**
+         * Returns current segment.
+         */
+        private int getSegment() {
+            return segment;
+        }
+
+        /**
+         * Returns current key.
+         */
+        public K getKey() {
+            return key;
+        }
+
+        /**
+         * Returns current value.
+         */
+        public V getValue() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String toString() {
+            return S.toString(Result.class, this);
+        }
+    }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
index 35ee3233d..cab9e42ae 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PageMemoryImplNoLoadTest.java
@@ -81,7 +81,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
 
             assertThat(memory.dirtyPages(), equalTo(dirtyPages));
 
-            // TODO: IGNITE-16898 After the checkpoint check that there are no dirty pages
+            // TODO: IGNITE-16935 After the checkpoint check that there are no dirty pages
         } finally {
             memory.stop(true);
         }
@@ -118,7 +118,7 @@ public class PageMemoryImplNoLoadTest extends PageMemoryNoLoadSelfTest {
                 assertFalse(memory.safeToUpdate(), "i=" + i);
             }
 
-            // TODO: IGNITE-16898 After the checkpoint check assertTrue(memory.safeToUpdate())
+            // TODO: IGNITE-16935 After the checkpoint check assertTrue(memory.safeToUpdate())
         } finally {
             memory.stop(true);
         }
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorageTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorageTest.java
new file mode 100644
index 000000000..1d4d8788a
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointMarkersStorageTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.nio.file.Files.createDirectories;
+import static java.nio.file.Files.createFile;
+import static java.nio.file.Files.list;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.util.IgniteUtils.deleteIfExists;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.FileWriter;
+import java.nio.file.Path;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * For {@link CheckpointMarkersStorage} testing.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(SystemPropertiesExtension.class)
+public class CheckpointMarkersStorageTest {
+    @WorkDirectory
+    private Path workDir;
+
+    @Test
+    void testFailCreateCheckpointDir() throws Exception {
+        Path testFile = createFile(workDir.resolve("testFile"));
+
+        try (FileWriter fileWriter = new FileWriter(testFile.toFile())) {
+            fileWriter.write("testString");
+
+            fileWriter.flush();
+        }
+
+        IgniteInternalCheckedException exception = assertThrows(
+                IgniteInternalCheckedException.class,
+                () -> new CheckpointMarkersStorage(testFile)
+        );
+
+        assertThat(exception.getMessage(), startsWith("Could not create directory for checkpoint metadata"));
+    }
+
+    @Test
+    void testNotOnlyMarkersFiles() throws Exception {
+        createFile(createDirectories(cpDir()).resolve("test"));
+
+        IgniteInternalCheckedException exception = assertThrows(
+                IgniteInternalCheckedException.class,
+                () -> new CheckpointMarkersStorage(workDir)
+        );
+
+        assertThat(exception.getMessage(), startsWith("Not checkpoint markers found, they need to be removed manually"));
+    }
+
+    @Test
+    void testTmpMarkersFiles() throws Exception {
+        createDirectories(cpDir());
+
+        UUID id = UUID.randomUUID();
+
+        createFile(cpDir().resolve(startMarkerFileName(id) + ".tmp"));
+        createFile(cpDir().resolve(endMarkerFileName(id) + ".tmp"));
+
+        IgniteInternalCheckedException exception = assertThrows(
+                IgniteInternalCheckedException.class,
+                () -> new CheckpointMarkersStorage(workDir)
+        );
+
+        assertThat(exception.getMessage(), startsWith("Not checkpoint markers found, they need to be removed manually"));
+    }
+
+    @Test
+    void testCheckpointWithoutEndMarker() throws Exception {
+        createDirectories(cpDir());
+
+        createFile(startMarkerFilePath(UUID.randomUUID()));
+
+        IgniteInternalCheckedException exception = assertThrows(
+                IgniteInternalCheckedException.class,
+                () -> new CheckpointMarkersStorage(workDir)
+        );
+
+        assertThat(exception.getMessage(), startsWith("Found incomplete checkpoints"));
+    }
+
+    @Test
+    void testCreateMarkers() throws Exception {
+        UUID id0 = UUID.randomUUID();
+
+        CheckpointMarkersStorage markersStorage = new CheckpointMarkersStorage(workDir);
+
+        markersStorage.onCheckpointBegin(id0);
+        markersStorage.onCheckpointEnd(id0);
+
+        assertThat(
+                list(cpDir()).collect(toSet()),
+                equalTo(Set.of(startMarkerFilePath(id0), endMarkerFilePath(id0)))
+        );
+
+        deleteIfExists(cpDir());
+
+        IgniteInternalCheckedException exception = assertThrows(
+                IgniteInternalCheckedException.class,
+                () -> markersStorage.onCheckpointBegin(UUID.randomUUID())
+        );
+
+        assertThat(exception.getMessage(), startsWith("Could not create start checkpoint marker"));
+
+        exception = assertThrows(
+                IgniteInternalCheckedException.class,
+                () -> markersStorage.onCheckpointEnd(id0)
+        );
+
+        assertThat(exception.getMessage(), startsWith("Could not create end checkpoint marker"));
+    }
+
+    @Test
+    void testCleanupMarkers() throws Exception {
+        CheckpointMarkersStorage markersStorage = new CheckpointMarkersStorage(workDir);
+
+        UUID id0 = UUID.randomUUID();
+        UUID id1 = UUID.randomUUID();
+        UUID id2 = UUID.randomUUID();
+
+        markersStorage.onCheckpointBegin(id0);
+        markersStorage.onCheckpointEnd(id0);
+
+        markersStorage.onCheckpointBegin(id1);
+        markersStorage.onCheckpointEnd(id1);
+
+        markersStorage.onCheckpointBegin(id2);
+        markersStorage.onCheckpointEnd(id2);
+
+        assertThat(
+                list(cpDir()).collect(toSet()),
+                equalTo(Set.of(startMarkerFilePath(id2), endMarkerFilePath(id2)))
+        );
+    }
+
+    private Path cpDir() {
+        return workDir.resolve("cp");
+    }
+
+    private Path startMarkerFilePath(UUID id) {
+        return cpDir().resolve(startMarkerFileName(id));
+    }
+
+    private Path endMarkerFilePath(UUID id) {
+        return cpDir().resolve(endMarkerFileName(id));
+    }
+
+    private static String startMarkerFileName(UUID id) {
+        return id + "-START.bin";
+    }
+
+    private static String endMarkerFileName(UUID id) {
+        return id + "-END.bin";
+    }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
new file mode 100644
index 000000000..5a457f715
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImplTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.lang.System.nanoTime;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.SCHEDULED;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link CheckpointProgressImpl} testing.
+ */
+public class CheckpointProgressImplTest {
+    @Test
+    void testId() {
+        assertNotNull(new CheckpointProgressImpl(0).id());
+    }
+
+    @Test
+    void testReason() {
+        CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+        assertNull(progressImpl.reason());
+
+        String reason = "test";
+
+        progressImpl.reason(reason);
+
+        assertEquals(reason, progressImpl.reason());
+    }
+
+    @Test
+    void testNextCheckpointNanos() {
+        long startNanos = nanoTime();
+
+        CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(10);
+
+        long endNanos = nanoTime();
+
+        assertThat(
+                progressImpl.nextCheckpointNanos() - startNanos,
+                greaterThanOrEqualTo(0L)
+        );
+
+        assertThat(
+                endNanos + 10 - progressImpl.nextCheckpointNanos(),
+                greaterThanOrEqualTo(0L)
+        );
+    }
+
+    @Test
+    void testCounters() {
+        CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+        assertEquals(0, progressImpl.currentCheckpointPagesCount());
+
+        assertNull(progressImpl.writtenPagesCounter());
+        assertNull(progressImpl.syncedPagesCounter());
+        assertNull(progressImpl.evictedPagesCounter());
+
+        progressImpl.initCounters(100500);
+
+        assertEquals(100500, progressImpl.currentCheckpointPagesCount());
+
+        assertNotNull(progressImpl.writtenPagesCounter());
+        assertNotNull(progressImpl.syncedPagesCounter());
+        assertNotNull(progressImpl.evictedPagesCounter());
+
+        progressImpl.clearCounters();
+
+        assertEquals(0, progressImpl.currentCheckpointPagesCount());
+
+        assertNull(progressImpl.writtenPagesCounter());
+        assertNull(progressImpl.syncedPagesCounter());
+        assertNull(progressImpl.evictedPagesCounter());
+
+        progressImpl.currentCheckpointPagesCount(42);
+
+        assertEquals(42, progressImpl.currentCheckpointPagesCount());
+    }
+
+    @Test
+    void testGreaterOrEqualTo() {
+        CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+        assertTrue(progressImpl.greaterOrEqualTo(SCHEDULED));
+        assertFalse(progressImpl.greaterOrEqualTo(LOCK_TAKEN));
+        assertFalse(progressImpl.greaterOrEqualTo(PAGE_SNAPSHOT_TAKEN));
+        assertFalse(progressImpl.greaterOrEqualTo(LOCK_RELEASED));
+        assertFalse(progressImpl.greaterOrEqualTo(MARKER_STORED_TO_DISK));
+        assertFalse(progressImpl.greaterOrEqualTo(FINISHED));
+
+        progressImpl.transitTo(LOCK_TAKEN);
+
+        assertTrue(progressImpl.greaterOrEqualTo(SCHEDULED));
+        assertTrue(progressImpl.greaterOrEqualTo(LOCK_TAKEN));
+        assertFalse(progressImpl.greaterOrEqualTo(PAGE_SNAPSHOT_TAKEN));
+        assertFalse(progressImpl.greaterOrEqualTo(LOCK_RELEASED));
+        assertFalse(progressImpl.greaterOrEqualTo(MARKER_STORED_TO_DISK));
+        assertFalse(progressImpl.greaterOrEqualTo(FINISHED));
+
+        progressImpl.transitTo(PAGE_SNAPSHOT_TAKEN);
+
+        assertTrue(progressImpl.greaterOrEqualTo(SCHEDULED));
+        assertTrue(progressImpl.greaterOrEqualTo(LOCK_TAKEN));
+        assertTrue(progressImpl.greaterOrEqualTo(PAGE_SNAPSHOT_TAKEN));
+        assertFalse(progressImpl.greaterOrEqualTo(LOCK_RELEASED));
+        assertFalse(progressImpl.greaterOrEqualTo(MARKER_STORED_TO_DISK));
+        assertFalse(progressImpl.greaterOrEqualTo(FINISHED));
+
+        progressImpl.transitTo(LOCK_RELEASED);
+
+        assertTrue(progressImpl.greaterOrEqualTo(SCHEDULED));
+        assertTrue(progressImpl.greaterOrEqualTo(LOCK_TAKEN));
+        assertTrue(progressImpl.greaterOrEqualTo(PAGE_SNAPSHOT_TAKEN));
+        assertTrue(progressImpl.greaterOrEqualTo(LOCK_RELEASED));
+        assertFalse(progressImpl.greaterOrEqualTo(MARKER_STORED_TO_DISK));
+        assertFalse(progressImpl.greaterOrEqualTo(FINISHED));
+
+        progressImpl.transitTo(MARKER_STORED_TO_DISK);
+
+        assertTrue(progressImpl.greaterOrEqualTo(SCHEDULED));
+        assertTrue(progressImpl.greaterOrEqualTo(LOCK_TAKEN));
+        assertTrue(progressImpl.greaterOrEqualTo(PAGE_SNAPSHOT_TAKEN));
+        assertTrue(progressImpl.greaterOrEqualTo(LOCK_RELEASED));
+        assertTrue(progressImpl.greaterOrEqualTo(MARKER_STORED_TO_DISK));
+        assertFalse(progressImpl.greaterOrEqualTo(FINISHED));
+
+        progressImpl.transitTo(FINISHED);
+
+        assertTrue(progressImpl.greaterOrEqualTo(SCHEDULED));
+        assertTrue(progressImpl.greaterOrEqualTo(LOCK_TAKEN));
+        assertTrue(progressImpl.greaterOrEqualTo(PAGE_SNAPSHOT_TAKEN));
+        assertTrue(progressImpl.greaterOrEqualTo(LOCK_RELEASED));
+        assertTrue(progressImpl.greaterOrEqualTo(MARKER_STORED_TO_DISK));
+        assertTrue(progressImpl.greaterOrEqualTo(FINISHED));
+    }
+
+    @Test
+    void testInProgress() {
+        CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+        assertFalse(progressImpl.inProgress());
+
+        progressImpl.transitTo(LOCK_TAKEN);
+        assertFalse(progressImpl.inProgress());
+
+        progressImpl.transitTo(LOCK_RELEASED);
+        assertTrue(progressImpl.inProgress());
+
+        progressImpl.transitTo(MARKER_STORED_TO_DISK);
+        assertTrue(progressImpl.inProgress());
+
+        progressImpl.transitTo(FINISHED);
+        assertFalse(progressImpl.inProgress());
+    }
+
+    @Test
+    void testFail() {
+        CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+        CompletableFuture<?> future = progressImpl.futureFor(PAGE_SNAPSHOT_TAKEN);
+
+        Exception failReason = new Exception("test");
+
+        progressImpl.fail(failReason);
+
+        assertEquals(FINISHED, progressImpl.state());
+
+        ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get(100, TimeUnit.MILLISECONDS));
+
+        assertSame(failReason, exception.getCause());
+    }
+
+    @Test
+    void testFutureFor() {
+        CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+        CompletableFuture<?> future0 = progressImpl.futureFor(SCHEDULED);
+
+        assertNotNull(future0);
+        assertTrue(future0.isDone());
+        assertSame(future0, progressImpl.futureFor(SCHEDULED));
+
+        CompletableFuture<?> future1 = progressImpl.futureFor(LOCK_TAKEN);
+
+        assertNotNull(future1);
+        assertFalse(future1.isDone());
+        assertSame(future1, progressImpl.futureFor(LOCK_TAKEN));
+
+        CompletableFuture<?> future2 = progressImpl.futureFor(PAGE_SNAPSHOT_TAKEN);
+
+        assertNotNull(future2);
+        assertFalse(future2.isDone());
+        assertSame(future2, progressImpl.futureFor(PAGE_SNAPSHOT_TAKEN));
+
+        CompletableFuture<?> future3 = progressImpl.futureFor(LOCK_RELEASED);
+
+        assertNotNull(future3);
+        assertFalse(future3.isDone());
+        assertSame(future3, progressImpl.futureFor(LOCK_RELEASED));
+
+        CompletableFuture<?> future4 = progressImpl.futureFor(MARKER_STORED_TO_DISK);
+
+        assertNotNull(future4);
+        assertFalse(future4.isDone());
+        assertSame(future4, progressImpl.futureFor(MARKER_STORED_TO_DISK));
+
+        CompletableFuture<?> future5 = progressImpl.futureFor(FINISHED);
+
+        assertNotNull(future5);
+        assertFalse(future5.isDone());
+        assertSame(future5, progressImpl.futureFor(FINISHED));
+
+        progressImpl.transitTo(LOCK_RELEASED);
+
+        assertDoesNotThrow(() -> future0.get(100, TimeUnit.MILLISECONDS));
+        assertDoesNotThrow(() -> future1.get(100, TimeUnit.MILLISECONDS));
+        assertDoesNotThrow(() -> future2.get(100, TimeUnit.MILLISECONDS));
+        assertDoesNotThrow(() -> future3.get(100, TimeUnit.MILLISECONDS));
+
+        assertFalse(future4.isDone());
+        assertFalse(future5.isDone());
+
+        assertSame(future0, progressImpl.futureFor(SCHEDULED));
+        assertSame(future1, progressImpl.futureFor(LOCK_TAKEN));
+        assertSame(future2, progressImpl.futureFor(PAGE_SNAPSHOT_TAKEN));
+        assertSame(future3, progressImpl.futureFor(LOCK_RELEASED));
+        assertSame(future4, progressImpl.futureFor(MARKER_STORED_TO_DISK));
+        assertSame(future5, progressImpl.futureFor(FINISHED));
+    }
+
+    @Test
+    void testTransitTo() {
+        CheckpointProgressImpl progressImpl = new CheckpointProgressImpl(0);
+
+        assertEquals(SCHEDULED, progressImpl.state());
+
+        // Transit to LOCK_TAKEN.
+
+        progressImpl.transitTo(LOCK_TAKEN);
+        assertEquals(LOCK_TAKEN, progressImpl.state());
+
+        progressImpl.transitTo(SCHEDULED);
+        assertEquals(LOCK_TAKEN, progressImpl.state());
+
+        // Transit to PAGE_SNAPSHOT_TAKEN.
+
+        progressImpl.transitTo(PAGE_SNAPSHOT_TAKEN);
+        assertEquals(PAGE_SNAPSHOT_TAKEN, progressImpl.state());
+
+        progressImpl.transitTo(LOCK_TAKEN);
+        assertEquals(PAGE_SNAPSHOT_TAKEN, progressImpl.state());
+
+        progressImpl.transitTo(SCHEDULED);
+        assertEquals(PAGE_SNAPSHOT_TAKEN, progressImpl.state());
+
+        // Transit to LOCK_RELEASED.
+
+        progressImpl.transitTo(LOCK_RELEASED);
+        assertEquals(LOCK_RELEASED, progressImpl.state());
+
+        progressImpl.transitTo(PAGE_SNAPSHOT_TAKEN);
+        assertEquals(LOCK_RELEASED, progressImpl.state());
+
+        progressImpl.transitTo(LOCK_TAKEN);
+        assertEquals(LOCK_RELEASED, progressImpl.state());
+
+        progressImpl.transitTo(SCHEDULED);
+        assertEquals(LOCK_RELEASED, progressImpl.state());
+
+        // Transit to LOCK_RELEASED.
+
+        progressImpl.transitTo(MARKER_STORED_TO_DISK);
+        assertEquals(MARKER_STORED_TO_DISK, progressImpl.state());
+
+        progressImpl.transitTo(LOCK_RELEASED);
+        assertEquals(MARKER_STORED_TO_DISK, progressImpl.state());
+
+        progressImpl.transitTo(PAGE_SNAPSHOT_TAKEN);
+        assertEquals(MARKER_STORED_TO_DISK, progressImpl.state());
+
+        progressImpl.transitTo(LOCK_TAKEN);
+        assertEquals(MARKER_STORED_TO_DISK, progressImpl.state());
+
+        progressImpl.transitTo(SCHEDULED);
+        assertEquals(MARKER_STORED_TO_DISK, progressImpl.state());
+
+        // Transit to FINISHED.
+
+        progressImpl.transitTo(FINISHED);
+        assertEquals(FINISHED, progressImpl.state());
+
+        progressImpl.transitTo(MARKER_STORED_TO_DISK);
+        assertEquals(FINISHED, progressImpl.state());
+
+        progressImpl.transitTo(LOCK_RELEASED);
+        assertEquals(FINISHED, progressImpl.state());
+
+        progressImpl.transitTo(PAGE_SNAPSHOT_TAKEN);
+        assertEquals(FINISHED, progressImpl.state());
+
+        progressImpl.transitTo(LOCK_TAKEN);
+        assertEquals(FINISHED, progressImpl.state());
+
+        progressImpl.transitTo(SCHEDULED);
+        assertEquals(FINISHED, progressImpl.state());
+    }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
new file mode 100644
index 000000000..df5ba882a
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link Checkpoint} testing.
+ */
+public class CheckpointTest {
+    @Test
+    void testHasDelta() {
+        CheckpointProgressImpl progress = mock(CheckpointProgressImpl.class);
+
+        assertFalse(new Checkpoint(EMPTY, progress).hasDelta());
+
+        IgniteBiTuple<PageMemoryImpl, FullPageId[]> biTuple = new IgniteBiTuple<>(
+                mock(PageMemoryImpl.class),
+                new FullPageId[]{new FullPageId(0, 1)}
+        );
+
+        assertTrue(new Checkpoint(new IgniteConcurrentMultiPairQueue<>(List.of(biTuple)), progress).hasDelta());
+    }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
new file mode 100644
index 000000000..1459eb2ea
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTestUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * Useful class for testing a checkpoint.
+ */
+class CheckpointTestUtils {
+    /**
+     * Returns new instance of {@link CheckpointReadWriteLock}.
+     *
+     * @param log Logger.
+     */
+    static CheckpointReadWriteLock newReadWriteLock(IgniteLogger log) {
+        return new CheckpointReadWriteLock(new ReentrantReadWriteLockWithTracking(log, 5_000));
+    }
+
+    /**
+     * Returns mocked instance of {@link PageMemoryDataRegion}.
+     *
+     * @param persistent Persistent data region.
+     * @param pageMemory Persistent page memory.
+     */
+    static PageMemoryDataRegion newDataRegion(boolean persistent, PageMemoryImpl pageMemory) {
+        PageMemoryDataRegion mock = mock(PageMemoryDataRegion.class);
+
+        when(mock.persistent()).thenReturn(persistent);
+
+        when(mock.pageMemory()).thenReturn(pageMemory);
+
+        return mock;
+    }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
new file mode 100644
index 000000000..a28b4c6a3
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointWorkflowTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Comparator.comparing;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.MARKER_STORED_TO_DISK;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGE_SNAPSHOT_TAKEN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.newDataRegion;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.newReadWriteLock;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.AFTER_CHECKPOINT_END;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.BEFORE_CHECKPOINT_BEGIN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_CHECKPOINT_BEGIN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWorkflowTest.TestCheckpointListener.ON_MARK_CHECKPOINT_BEGIN;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWriteOrder.RANDOM;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointWriteOrder.SEQUENTIAL;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.PageMemoryDataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.Result;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+/**
+ * For {@link CheckpointWorkflow} testing.
+ */
+public class CheckpointWorkflowTest {
+    private final IgniteLogger log = IgniteLogger.forClass(CheckpointWorkflowTest.class);
+
+    @Nullable
+    private CheckpointWorkflow workflow;
+
+    @AfterEach
+    void tearDown() {
+        if (workflow != null) {
+            workflow.stop();
+        }
+    }
+
+    @Test
+    void testListeners() {
+        PageMemoryDataRegion dataRegion0 = newDataRegion(true, mock(PageMemoryImpl.class));
+        PageMemoryDataRegion dataRegion1 = newDataRegion(true, mock(PageMemoryImpl.class));
+        PageMemoryDataRegion dataRegion2 = newDataRegion(true, mock(PageMemoryImpl.class));
+
+        workflow = new CheckpointWorkflow(
+                mock(CheckpointMarkersStorage.class),
+                newReadWriteLock(log),
+                RANDOM,
+                List.of(dataRegion0, dataRegion1)
+        );
+
+        workflow.start();
+
+        CheckpointListener listener0 = mock(CheckpointListener.class);
+        CheckpointListener listener1 = mock(CheckpointListener.class);
+        CheckpointListener listener2 = mock(CheckpointListener.class);
+        CheckpointListener listener3 = mock(CheckpointListener.class);
+        CheckpointListener listener4 = mock(CheckpointListener.class);
+
+        workflow.addCheckpointListener(listener0, dataRegion0);
+        workflow.addCheckpointListener(listener1, dataRegion0);
+        workflow.addCheckpointListener(listener2, dataRegion0);
+        workflow.addCheckpointListener(listener3, dataRegion1);
+        workflow.addCheckpointListener(listener4, null);
+
+        assertThat(
+                Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion0))),
+                equalTo(Set.of(listener0, listener1, listener2, listener4))
+        );
+
+        assertThat(
+                Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion1))),
+                equalTo(Set.of(listener3, listener4))
+        );
+
+        assertThat(
+                Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion2))),
+                equalTo(Set.of(listener4))
+        );
+
+        assertThat(
+                Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion0, dataRegion1))),
+                equalTo(Set.of(listener0, listener1, listener2, listener3, listener4))
+        );
+
+        // Checks remove listener.
+
+        workflow.removeCheckpointListener(listener0);
+        workflow.removeCheckpointListener(listener1);
+        workflow.removeCheckpointListener(listener3);
+
+        assertThat(
+                Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion0))),
+                equalTo(Set.of(listener2, listener4))
+        );
+
+        assertThat(
+                Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion1))),
+                equalTo(Set.of(listener4))
+        );
+
+        assertThat(
+                Set.copyOf(workflow.collectCheckpointListeners(List.of(dataRegion2))),
+                equalTo(Set.of(listener4))
+        );
+
+        // Checks empty listeners after stop.
+
+        workflow.stop();
+
+        assertThat(workflow.collectCheckpointListeners(List.of(dataRegion0)), empty());
+        assertThat(workflow.collectCheckpointListeners(List.of(dataRegion1)), empty());
+        assertThat(workflow.collectCheckpointListeners(List.of(dataRegion2)), empty());
+    }
+
+    @Test
+    void testMarkCheckpointBegin() throws Exception {
+        CheckpointMarkersStorage markersStorage = mock(CheckpointMarkersStorage.class);
+
+        CheckpointReadWriteLock readWriteLock = newReadWriteLock(log);
+
+        List<FullPageId> dirtyPages = List.of(new FullPageId(0, 0), new FullPageId(1, 0), new FullPageId(2, 0));
+
+        PageMemoryDataRegion dataRegion = newDataRegion(true, newPageMemoryImpl(dirtyPages));
+
+        workflow = new CheckpointWorkflow(markersStorage, readWriteLock, RANDOM, List.of(dataRegion));
+
+        workflow.start();
+
+        CheckpointProgressImpl progressImpl = mock(CheckpointProgressImpl.class);
+
+        ArgumentCaptor<CheckpointState> checkpointStateArgumentCaptor = ArgumentCaptor.forClass(CheckpointState.class);
+
+        ArgumentCaptor<Integer> pagesCountArgumentCaptor = ArgumentCaptor.forClass(Integer.class);
+
+        doNothing().when(progressImpl).transitTo(checkpointStateArgumentCaptor.capture());
+
+        doNothing().when(progressImpl).currentCheckpointPagesCount(pagesCountArgumentCaptor.capture());
+
+        when(progressImpl.futureFor(MARKER_STORED_TO_DISK)).thenReturn(completedFuture(null));
+
+        UUID checkpointId = UUID.randomUUID();
+
+        when(progressImpl.id()).thenReturn(checkpointId);
+
+        List<String> events = new ArrayList<>();
+
+        workflow.addCheckpointListener(new TestCheckpointListener(events) {
+            /** {@inheritDoc} */
+            @Override
+            public void beforeCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+                super.beforeCheckpointBegin(progress);
+
+                assertSame(progressImpl, progress);
+
+                assertEquals(readWriteLock.getReadHoldCount(), 1);
+
+                assertThat(checkpointStateArgumentCaptor.getAllValues(), empty());
+
+                verify(markersStorage, times(0)).onCheckpointBegin(checkpointId);
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void onMarkCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+                super.onMarkCheckpointBegin(progress);
+
+                assertSame(progressImpl, progress);
+
+                assertTrue(readWriteLock.isWriteLockHeldByCurrentThread());
+
+                assertThat(checkpointStateArgumentCaptor.getAllValues(), equalTo(List.of(LOCK_TAKEN)));
+
+                verify(markersStorage, times(0)).onCheckpointBegin(checkpointId);
+            }
+
+            /** {@inheritDoc} */
+            @Override
+            public void onCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+                super.onCheckpointBegin(progress);
+
+                assertSame(progressImpl, progress);
+
+                assertEquals(0, readWriteLock.getReadHoldCount());
+
+                assertFalse(readWriteLock.isWriteLockHeldByCurrentThread());
+
+                assertThat(checkpointStateArgumentCaptor.getAllValues(), equalTo(List.of(LOCK_TAKEN, PAGE_SNAPSHOT_TAKEN, LOCK_RELEASED)));
+
+                assertThat(pagesCountArgumentCaptor.getAllValues(), equalTo(List.of(3)));
+
+                verify(markersStorage, times(0)).onCheckpointBegin(checkpointId);
+            }
+        }, dataRegion);
+
+        Checkpoint checkpoint = workflow.markCheckpointBegin(coarseCurrentTimeMillis(), progressImpl);
+
+        List<IgniteBiTuple<PageMemoryImpl, FullPageId>> pairs = collect(checkpoint.dirtyPages);
+
+        assertThat(
+                pairs.stream().map(IgniteBiTuple::getKey).collect(toSet()),
+                equalTo(Set.of(dataRegion.pageMemory()))
+        );
+
+        assertThat(
+                pairs.stream().map(IgniteBiTuple::getValue).collect(toList()),
+                equalTo(dirtyPages)
+        );
+
+        assertThat(
+                events,
+                equalTo(List.of(BEFORE_CHECKPOINT_BEGIN, ON_MARK_CHECKPOINT_BEGIN, ON_CHECKPOINT_BEGIN))
+        );
+
+        assertThat(
+                checkpointStateArgumentCaptor.getAllValues(),
+                equalTo(List.of(LOCK_TAKEN, PAGE_SNAPSHOT_TAKEN, LOCK_RELEASED, MARKER_STORED_TO_DISK))
+        );
+
+        verify(markersStorage, times(1)).onCheckpointBegin(checkpointId);
+    }
+
+    @Test
+    void testMarkCheckpointBeginRandom() throws Exception {
+        List<FullPageId> dirtyPages = List.of(new FullPageId(1, 0), new FullPageId(0, 0), new FullPageId(2, 0));
+
+        PageMemoryDataRegion dataRegion = newDataRegion(true, newPageMemoryImpl(dirtyPages));
+
+        workflow = new CheckpointWorkflow(
+                mock(CheckpointMarkersStorage.class),
+                newReadWriteLock(log),
+                RANDOM,
+                List.of(dataRegion)
+        );
+
+        workflow.start();
+
+        CheckpointProgressImpl progressImpl = mock(CheckpointProgressImpl.class);
+
+        when(progressImpl.futureFor(MARKER_STORED_TO_DISK)).thenReturn(completedFuture(null));
+
+        Checkpoint checkpoint = workflow.markCheckpointBegin(coarseCurrentTimeMillis(), progressImpl);
+
+        assertThat(
+                collect(checkpoint.dirtyPages).stream().map(IgniteBiTuple::getValue).collect(toList()),
+                equalTo(dirtyPages)
+        );
+    }
+
+    @Test
+    void testMarkCheckpointBeginSequential() throws Exception {
+        List<FullPageId> dirtyPages = List.of(new FullPageId(1, 0), new FullPageId(0, 0), new FullPageId(2, 0));
+
+        PageMemoryDataRegion dataRegion = newDataRegion(true, newPageMemoryImpl(dirtyPages));
+
+        workflow = new CheckpointWorkflow(
+                mock(CheckpointMarkersStorage.class),
+                newReadWriteLock(log),
+                SEQUENTIAL,
+                List.of(dataRegion)
+        );
+
+        CheckpointProgressImpl progressImpl = mock(CheckpointProgressImpl.class);
+
+        when(progressImpl.futureFor(MARKER_STORED_TO_DISK)).thenReturn(completedFuture(null));
+
+        Checkpoint checkpoint = workflow.markCheckpointBegin(coarseCurrentTimeMillis(), progressImpl);
+
+        assertThat(
+                collect(checkpoint.dirtyPages).stream().map(IgniteBiTuple::getValue).collect(toList()),
+                equalTo(dirtyPages.stream().sorted(comparing(FullPageId::effectivePageId)).collect(toList()))
+        );
+    }
+
+    @Test
+    void testMarkCheckpointEnd() throws Exception {
+        CheckpointMarkersStorage markersStorage = mock(CheckpointMarkersStorage.class);
+
+        CheckpointReadWriteLock readWriteLock = newReadWriteLock(log);
+
+        PageMemoryImpl pageMemory = mock(PageMemoryImpl.class);
+
+        PageMemoryDataRegion dataRegion = newDataRegion(true, pageMemory);
+
+        workflow = new CheckpointWorkflow(markersStorage, readWriteLock, RANDOM, List.of(dataRegion));
+
+        workflow.start();
+
+        List<String> events = new ArrayList<>();
+
+        ArgumentCaptor<CheckpointState> checkpointStateArgumentCaptor = ArgumentCaptor.forClass(CheckpointState.class);
+
+        CheckpointProgressImpl progressImpl = mock(CheckpointProgressImpl.class);
+
+        doNothing().when(progressImpl).transitTo(checkpointStateArgumentCaptor.capture());
+
+        UUID checkpointId = UUID.randomUUID();
+
+        when(progressImpl.id()).thenReturn(checkpointId);
+
+        workflow.addCheckpointListener(new TestCheckpointListener(events) {
+            /** {@inheritDoc} */
+            @Override
+            public void afterCheckpointEnd(CheckpointProgress progress) throws IgniteInternalCheckedException {
+                super.afterCheckpointEnd(progress);
+
+                assertSame(progressImpl, progress);
+
+                assertFalse(readWriteLock.isWriteLockHeldByCurrentThread());
+
+                assertEquals(0, readWriteLock.getReadHoldCount());
+
+                assertThat(checkpointStateArgumentCaptor.getAllValues(), empty());
+
+                verify(markersStorage, times(1)).onCheckpointEnd(checkpointId);
+            }
+        }, dataRegion);
+
+        workflow.markCheckpointEnd(new Checkpoint(EMPTY, progressImpl));
+
+        assertThat(checkpointStateArgumentCaptor.getAllValues(), equalTo(List.of(FINISHED)));
+
+        assertThat(events, equalTo(List.of(AFTER_CHECKPOINT_END)));
+
+        verify(progressImpl, times(1)).clearCounters();
+
+        verify(pageMemory, times(1)).finishCheckpoint();
+    }
+
+    private List<IgniteBiTuple<PageMemoryImpl, FullPageId>> collect(IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> queue) {
+        List<IgniteBiTuple<PageMemoryImpl, FullPageId>> res = new ArrayList<>();
+
+        Result<PageMemoryImpl, FullPageId> result = new Result<>();
+
+        while (queue.next(result)) {
+            res.add(new IgniteBiTuple<>(result.getKey(), result.getValue()));
+        }
+
+        return res;
+    }
+
+    private PageMemoryImpl newPageMemoryImpl(Collection<FullPageId> dirtyPages) {
+        PageMemoryImpl mock = mock(PageMemoryImpl.class);
+
+        when(mock.beginCheckpoint(any(CompletableFuture.class))).thenReturn(dirtyPages);
+
+        return mock;
+    }
+
+    /**
+     * Test listener implementation that simply collects events.
+     */
+    static class TestCheckpointListener implements CheckpointListener {
+        static final String ON_MARK_CHECKPOINT_BEGIN = "onMarkCheckpointBegin";
+
+        static final String ON_CHECKPOINT_BEGIN = "onCheckpointBegin";
+
+        static final String BEFORE_CHECKPOINT_BEGIN = "beforeCheckpointBegin";
+
+        static final String AFTER_CHECKPOINT_END = "afterCheckpointEnd";
+
+        final List<String> events;
+
+        /**
+         * Constructor.
+         *
+         * @param events For recording events.
+         */
+        TestCheckpointListener(List<String> events) {
+            this.events = events;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onMarkCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+            events.add(ON_MARK_CHECKPOINT_BEGIN);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void onCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+            events.add(ON_CHECKPOINT_BEGIN);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void beforeCheckpointBegin(CheckpointProgress progress) throws IgniteInternalCheckedException {
+            events.add(BEFORE_CHECKPOINT_BEGIN);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void afterCheckpointEnd(CheckpointProgress progress) throws IgniteInternalCheckedException {
+            events.add(AFTER_CHECKPOINT_END);
+        }
+    }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueueTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueueTest.java
new file mode 100644
index 000000000..d1c817d67
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueueTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static java.util.Collections.synchronizedCollection;
+import static java.util.concurrent.ThreadLocalRandom.current;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.Result;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link IgniteConcurrentMultiPairQueue} testing.
+ */
+public class IgniteConcurrentMultiPairQueueTest {
+    private IgniteConcurrentMultiPairQueue<Integer, Integer> queue;
+
+    private IgniteConcurrentMultiPairQueue<Integer, Integer> queue2;
+
+    private Map<Integer, Collection<Integer>> mapForCheck;
+
+    private Map<Integer, Collection<Integer>> mapForCheck2;
+
+    private Integer[] arr2 = {2, 4};
+
+    private Integer[] arr1 = {1, 3, 5, 7, 9, 11, 13, 15, 17, 19};
+
+    private Integer[] arr4 = {};
+
+    private Integer[] arr5 = {};
+
+    private Integer[] arr3 = {100, 200, 300, 400, 500, 600, 600, 700};
+
+    private Integer[] arr6 = {};
+
+    @BeforeEach
+    void setUp() {
+        Collection<IgniteBiTuple<Integer, Integer[]>> keyWithArr = new HashSet<>();
+
+        mapForCheck = new ConcurrentHashMap<>();
+
+        mapForCheck2 = new ConcurrentHashMap<>();
+
+        keyWithArr.add(new IgniteBiTuple<>(10, arr2));
+        keyWithArr.add(new IgniteBiTuple<>(20, arr1));
+        keyWithArr.add(new IgniteBiTuple<>(30, arr4));
+        keyWithArr.add(new IgniteBiTuple<>(40, arr5));
+        keyWithArr.add(new IgniteBiTuple<>(50, arr3));
+        keyWithArr.add(new IgniteBiTuple<>(60, arr6));
+
+        mapForCheck.put(10, synchronizedCollection(new ArrayList<>(Arrays.asList(arr2))));
+        mapForCheck.put(20, synchronizedCollection(new ArrayList<>(Arrays.asList(arr1))));
+        mapForCheck.put(50, synchronizedCollection(new ArrayList<>(Arrays.asList(arr3))));
+
+        mapForCheck2.put(10, synchronizedCollection(new ArrayList<>(Arrays.asList(arr2))));
+        mapForCheck2.put(20, synchronizedCollection(new ArrayList<>(Arrays.asList(arr1))));
+        mapForCheck2.put(50, synchronizedCollection(new ArrayList<>(Arrays.asList(arr3))));
+
+        queue = new IgniteConcurrentMultiPairQueue<>(keyWithArr);
+
+        Map<Integer, Collection<Integer>> keyWithColl = new HashMap<>();
+
+        keyWithColl.put(10, synchronizedCollection(new ArrayList<>(Arrays.asList(arr2))));
+        keyWithColl.put(20, synchronizedCollection(new ArrayList<>(Arrays.asList(arr1))));
+        keyWithColl.put(30, synchronizedCollection(new ArrayList<>(Arrays.asList(arr4))));
+        keyWithColl.put(40, synchronizedCollection(new ArrayList<>(Arrays.asList(arr5))));
+        keyWithColl.put(50, synchronizedCollection(new ArrayList<>(Arrays.asList(arr3))));
+        keyWithColl.put(60, synchronizedCollection(new ArrayList<>(Arrays.asList(arr6))));
+
+        queue2 = new IgniteConcurrentMultiPairQueue<>(keyWithColl);
+    }
+
+    @Test
+    public void testGridConcurrentMultiPairQueueCorrectness() throws Exception {
+        runMultiThreaded(() -> {
+            Result<Integer, Integer> res = new Result<>();
+
+            while (queue.next(res)) {
+                assertTrue(mapForCheck.containsKey(res.getKey()));
+
+                assertTrue(mapForCheck.get(res.getKey()).remove(res.getValue()));
+
+                Collection<Integer> coll = mapForCheck.get(res.getKey());
+
+                if (coll != null && coll.isEmpty()) {
+                    mapForCheck.remove(res.getKey(), coll);
+                }
+            }
+
+            return null;
+        }, current().nextInt(1, 20), "GridConcurrentMultiPairQueue arr test");
+
+        assertTrue(mapForCheck.isEmpty());
+
+        assertTrue(queue.isEmpty());
+
+        assertEquals(queue.initialSize(), arr1.length + arr2.length + arr3.length + arr4.length);
+
+        runMultiThreaded(() -> {
+            Result<Integer, Integer> res = new Result<>();
+
+            while (queue2.next(res)) {
+                assertTrue(mapForCheck2.containsKey(res.getKey()));
+
+                assertTrue(mapForCheck2.get(res.getKey()).remove(res.getValue()));
+
+                Collection<Integer> coll = mapForCheck2.get(res.getKey());
+
+                if (coll != null && coll.isEmpty()) {
+                    mapForCheck2.remove(res.getKey(), coll);
+                }
+            }
+
+            return null;
+        }, current().nextInt(1, 20), "GridConcurrentMultiPairQueue coll test");
+
+        assertTrue(mapForCheck2.isEmpty());
+
+        assertTrue(queue2.isEmpty());
+
+        assertEquals(queue2.initialSize(), arr1.length + arr2.length + arr3.length + arr4.length);
+    }
+}