You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/05/17 09:03:53 UTC

[GitHub] [ignite-3] ibessonov commented on a diff in pull request #806: IGNITE-16935 [Native Persistence 3.0] Porting a checkpoint and related code, part 3

ibessonov commented on code in PR #806:
URL: https://github.com/apache/ignite-3/pull/806#discussion_r873729830


##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointProgressImpl.java:
##########
@@ -262,4 +259,12 @@ private static void completeFuture(@Nullable CompletableFuture<?> future, @Nulla
             }
         }
     }
+
+    /**
+     * Returns delay in nanos before next checkpoint is to be executed. Value is from {@code 0} to {@code 365} days.
+     */
+    @TestOnly
+    long nextCheckpointDelayNanos() {

Review Comment:
   Why do you need this? Every "test only" entity should be closely examined. I don't like them, they're for most desperate cases only.



##########
modules/core/src/main/java/org/apache/ignite/internal/components/LongJvmPauseDetector.java:
##########
@@ -51,7 +49,7 @@ public class LongJvmPauseDetector implements IgniteComponent {
     private static final int PRECISION = getInteger("IGNITE_JVM_PAUSE_DETECTOR_PRECISION", DFLT_JVM_PAUSE_DETECTOR_PRECISION);
 
     /** Threshold. */
-    private static final int THRESHOLD = getInteger("IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD", DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD);
+    private final int threshold = getInteger("IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD", 500);

Review Comment:
   Why did you inline the default constant?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Persistent store of pages.
+ */
+public interface PageStore extends Closeable {
+    /** Type for affinity partitions. */
+    byte TYPE_DATA = 1;
+
+    /** Type for index partition. */
+    byte TYPE_IDX = 2;
+
+    /**
+     * Adds page write listener.
+     *
+     * @param listener Page write listener.
+     */
+    void addWriteListener(PageWriteListener listener);
+
+    /**
+     * Removes page write listener.
+     *
+     * @param listener Page write listener.
+     */
+    void removeWriteListener(PageWriteListener listener);
+
+    /**
+     * Returns {@code true} if the page store exists.
+     */
+    boolean exists();
+
+    /**
+     * Allocates next page index.
+     *
+     * @return Next page index.
+     * @throws IgniteInternalCheckedException If failed to allocate.
+     */
+    long allocatePage() throws IgniteInternalCheckedException;
+
+    /**
+     * Returns number of allocated pages.
+     */
+    long pages();
+
+    /**
+     * Reads a page.
+     *
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer to read into.
+     * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code
+     * keepCrc}.
+     * @return {@code True} if page has been read successfully, {@code false} if page hasn't been written yet.
+     * @throws IgniteInternalCheckedException If reading failed (IO error occurred).
+     */
+    boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException;
+
+    /**
+     * Reads a page store header.
+     *
+     * @param buf Buffer to write to.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    void readHeader(ByteBuffer buf) throws IgniteInternalCheckedException;
+
+    /**
+     * Writes a page.
+     *
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer to write from.
+     * @param tag Partition page store version, 1-based incrementing counter. For outdated pages {@code tag} has lower value,
+     *      and write does nothing.
+     * @param calculateCrc If {@code false} crc calculation will be forcibly skipped.
+     * @throws IgniteInternalCheckedException If page writing failed (IO error occurred).
+     */
+    void write(long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteInternalCheckedException;
+
+    /**
+     * Returns page offset within the page store.
+     *
+     * @param pageId Page ID.
+     */
+    long pageOffset(long pageId);

Review Comment:
   Why is this a public method? Who uses it?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -17,12 +17,766 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
+import static java.lang.Math.max;
+import static java.lang.System.nanoTime;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_RUNNER_THREAD_PREFIX;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.worker.IgniteWorker;
+import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
+import org.apache.ignite.internal.util.worker.WorkProgressDispatcher;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
 /**
- * Empty.
+ * Checkpointer object is used for notification on checkpoint begin, predicate is {@code nextCheckpointTimestamps - now > 0}.
+ *
+ * <p>Method {@link #scheduleCheckpoint} uses {@link Object#notifyAll()}, {@link #waitCheckpointEvent} uses {@link Object#wait(long)}.
+ *
+ * <p>Checkpointer is one threaded which means that only one checkpoint at the one moment possible.
+ *
+ * <p>Responsiblity:
+ * <ul>
+ * <li>Provide the API for schedule/trigger the checkpoint.</li>
+ * <li>Schedule new checkpoint after current one according to checkpoint frequency.</li>
+ * <li>Failure handling.</li>
+ * <li>Managing of page write threads.</li>
+ * <li>Logging and metrics of checkpoint.</li>
+ * </ul>
+ *
+ * <p>Checkpointer steps:
+ * <ul>
+ * <li>Awaiting checkpoint event.</li>
+ * <li>Collect all dirty pages from page memory under checkpoint write lock.</li>
+ * <li>Start to write dirty pages to page store.</li>
+ * <li>Finish the checkpoint.
+ * </ul>
  */
-// TODO: IGNITE-16935 Continue porting the code
-public abstract class Checkpointer {
-    public abstract Thread runner();
+public class Checkpointer extends IgniteWorker implements IgniteComponent {
+    private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint started ["
+            + "checkpointId=%s, "
+            + "checkpointBeforeWriteLockTime=%dms, "
+            + "checkpointWriteLockWait=%dms, "
+            + "checkpointListenersExecuteTime=%dms, "
+            + "checkpointWriteLockHoldTime=%dms, "
+            + "splitAndSortPagesDuration=%dms, "
+            + "%s"
+            + "pages=%d, "
+            + "reason='%s']";
+
+    /** Avoid the start checkpoint if checkpointer was canceled. */
+    // TODO: IGNITE-16984 Move to config
+    private volatile boolean skipCheckpointOnNodeStop = getBoolean("IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP", false);
+
+    /** Pause detector. */
+    @Nullable
+    private final LongJvmPauseDetector pauseDetector;
+
+    /** Supplier interval in ms after which the checkpoint is triggered if there are no other events. */
+    private final LongSupplier checkpointFrequencySupplier;
+
+    /** Strategy of where and how to get the pages. */
+    private final CheckpointWorkflow checkpointWorkflow;
+
+    /** Factory for the creation of page-write workers. */
+    private final CheckpointPagesWriterFactory checkpointPagesWriterFactory;
+
+    /** Checkpoint runner thread pool. If {@code null} tasks are to be run in single thread. */
+    @Nullable
+    private final ThreadPoolExecutor checkpointWritePagesPool;
+
+    /** Next scheduled checkpoint progress. */
+    private volatile CheckpointProgressImpl scheduledCheckpointProgress;
+
+    /** Current checkpoint progress. This field is updated only by checkpoint thread. */
+    @Nullable
+    private volatile CheckpointProgressImpl currentCheckpointProgress;
+
+    /** Shutdown now. */
+    private volatile boolean shutdownNow;
+
+    /** Last checkpoint timestamp, read/update only in checkpoint thread. */
+    private long lastCheckpointTimestamp;
+
+    @TestOnly
+    @Nullable
+    private volatile CompletableFuture<?> enableChangeAppliedFuture;
+
+    @TestOnly
+    private volatile boolean checkpointsEnabled = true;
+
+    /**
+     * Constructor.
+     *
+     * @param log Logger.
+     * @param igniteInstanceName Name of the Ignite instance.
+     * @param workerListener Listener for life-cycle worker events.
+     * @param detector Long JVM pause detector.
+     * @param checkpointWorkFlow Implementation of checkpoint.
+     * @param factory Page writer factory.
+     * @param checkpointFrequencySupplier Supplier interval in ms after which the checkpoint is triggered if there are no other events.
+     * @param checkpointWritePageThreads The number of IO-bound threads which will write pages to disk.
+     */
+    Checkpointer(
+            IgniteLogger log,
+            String igniteInstanceName,
+            @Nullable IgniteWorkerListener workerListener,
+            @Nullable LongJvmPauseDetector detector,
+            CheckpointWorkflow checkpointWorkFlow,
+            CheckpointPagesWriterFactory factory,
+            int checkpointWritePageThreads,
+            LongSupplier checkpointFrequencySupplier
+    ) {
+        super(log, igniteInstanceName, "checkpoint-thread", workerListener);
+
+        this.pauseDetector = detector;
+        // TODO: IGNITE-16984 Move to config: checkpointFrequency * checkpointFrequencyDeviation see 2.0
+        this.checkpointFrequencySupplier = checkpointFrequencySupplier;
+        this.checkpointWorkflow = checkpointWorkFlow;
+        this.checkpointPagesWriterFactory = factory;
+
+        scheduledCheckpointProgress = new CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));
+
+        // TODO: IGNITE-16984 Move checkpointWritePageThreads to config
+        if (checkpointWritePageThreads > 1) {
+            checkpointWritePagesPool = new ThreadPoolExecutor(
+                    checkpointWritePageThreads,
+                    checkpointWritePageThreads,
+                    30_000,
+                    MILLISECONDS,
+                    new LinkedBlockingQueue<>(),
+                    new NamedThreadFactory(CHECKPOINT_RUNNER_THREAD_PREFIX + "-IO")
+            );
+        } else {
+            checkpointWritePagesPool = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void body() {
+        try {
+            while (!isCancelled()) {
+                waitCheckpointEvent();
+
+                if (skipCheckpointOnNodeStop && (isCancelled() || shutdownNow)) {
+                    if (log.isInfoEnabled()) {
+                        log.warn("Skipping last checkpoint because node is stopping.");
+                    }
+
+                    return;
+                }
+
+                CompletableFuture<?> enableChangeAppliedFuture = this.enableChangeAppliedFuture;
+
+                if (enableChangeAppliedFuture != null) {
+                    enableChangeAppliedFuture.complete(null);
+
+                    this.enableChangeAppliedFuture = null;
+                }
+
+                if (checkpointsEnabled) {
+                    doCheckpoint();
+                } else {
+                    synchronized (this) {
+                        scheduledCheckpointProgress.nextCheckpointNanos(MILLISECONDS.toNanos(nextCheckpointInterval()));
+                    }
+                }
+            }
+
+            // Final run after the cancellation.
+            if (checkpointsEnabled && !shutdownNow) {
+                doCheckpoint();
+            }
+
+            if (!isCancelled.get()) {
+                throw new IllegalStateException("Thread is terminated unexpectedly: " + name());
+            }
+
+            scheduledCheckpointProgress.fail(new NodeStoppingException("Node is stopping."));
+        } catch (Throwable t) {
+            scheduledCheckpointProgress.fail(t);
+
+            // TODO: IGNITE-16899 By analogy with 2.0, we need to handle the exception (err) by the FailureProcessor
+            // We need to handle OutOfMemoryError and the rest in different ways
+
+            throw new IgniteInternalException(t);
+        }
+    }
+
+    /**
+     * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
+     *
+     * @param delayFromNow Delay from now in milliseconds.
+     * @param reason Wakeup reason.
+     * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
+     */
+    public CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason) {
+        return scheduleCheckpoint(delayFromNow, reason, null);
+    }
+
+    /**
+     * Changes the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do nothing otherwise.
+     *
+     * @param delayFromNow Delay from now in milliseconds.
+     * @param reason Wakeup reason.
+     * @param finishFutureListener Checkpoint finish listener.
+     * @return Nearest scheduled checkpoint which is not started yet (dirty pages weren't collected yet).
+     */
+    public CheckpointProgress scheduleCheckpoint(
+            long delayFromNow,
+            String reason,
+            @Nullable BiConsumer<Void, Throwable> finishFutureListener
+    ) {
+        CheckpointProgressImpl current = currentCheckpointProgress;
+
+        // If checkpoint haven't taken write lock yet it shouldn't trigger a new checkpoint but should return current one.
+        if (finishFutureListener == null && current != null && !current.greaterOrEqualTo(LOCK_TAKEN)) {
+            return current;
+        }
+
+        if (finishFutureListener != null) {
+            // To be sure finishFutureListener will always be executed in checkpoint thread.
+            synchronized (this) {
+                current = scheduledCheckpointProgress;
+
+                current.futureFor(FINISHED).whenComplete(finishFutureListener);
+            }
+        } else {
+            current = scheduledCheckpointProgress;
+        }
+
+        long nextNanos = nanoTime() + MILLISECONDS.toNanos(delayFromNow);
+
+        if (current.nextCheckpointNanos() - nextNanos <= 0) {
+            return current;
+        }
+
+        synchronized (this) {
+            current = scheduledCheckpointProgress;
+
+            if (current.nextCheckpointNanos() - nextNanos > 0) {
+                current.reason(reason);
+
+                current.nextCheckpointNanos(MILLISECONDS.toNanos(delayFromNow));
+            }
+
+            notifyAll();
+        }
+
+        return current;
+    }
+
+    /**
+     * Executes a checkpoint.
+     *
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    void doCheckpoint() throws IgniteInternalCheckedException {
+        Checkpoint chp = null;
+
+        try {
+            CheckpointMetricsTracker tracker = new CheckpointMetricsTracker();
+
+            startCheckpointProgress();
+
+            try {
+                chp = checkpointWorkflow.markCheckpointBegin(lastCheckpointTimestamp, currentCheckpointProgress, tracker);
+            } catch (Exception e) {
+                if (currentCheckpointProgress != null) {
+                    currentCheckpointProgress.fail(e);
+                }
+
+                // TODO: IGNITE-16899 By analogy with 2.0, we need to handle the exception by the FailureProcessor
+                // In case of checkpoint initialization error node should be invalidated and stopped.
+
+                // Re-throw as unchecked exception to force stopping checkpoint thread.
+                throw new IgniteInternalCheckedException(e);
+            }
+
+            updateHeartbeat();
+
+            currentCheckpointProgress.initCounters(chp.dirtyPagesSize);
+
+            if (chp.hasDelta()) {
+                if (log.isInfoEnabled()) {
+                    long possibleJvmPauseDuration = possibleLongJvmPauseDuration(tracker);
+
+                    if (log.isInfoEnabled()) {
+                        log.info(String.format(
+                                CHECKPOINT_STARTED_LOG_FORMAT,
+                                chp.progress.id(),
+                                tracker.beforeWriteLockDuration(),
+                                tracker.writeLockWaitDuration(),
+                                tracker.onMarkCheckpointBeginDuration(),
+                                tracker.writeLockHoldDuration(),
+                                tracker.splitAndSortCheckpointPagesDuration(),
+                                possibleJvmPauseDuration > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDuration + "ms, " : "",
+                                chp.dirtyPagesSize,
+                                chp.progress.reason()
+                        ));
+                    }
+                }
+
+                if (!writePages(tracker, chp.dirtyPages, chp.progress, this, this::isShutdownNow)) {
+                    return;
+                }
+            } else {
+                if (log.isInfoEnabled()) {
+                    log.info(String.format(
+                            "Skipping checkpoint (no pages were modified) ["
+                                    + "checkpointBeforeWriteLockTime=%dms, checkpointWriteLockWait=%dms, "
+                                    + "checkpointListenersExecuteTime=%dms, checkpointWriteLockHoldTime=%dms, reason='%s']",
+                            tracker.beforeWriteLockDuration(),
+                            tracker.writeLockWaitDuration(),
+                            tracker.onMarkCheckpointBeginDuration(),
+                            tracker.writeLockHoldDuration(),
+                            chp.progress.reason()
+                    ));
+                }
+
+                tracker.onPagesWriteStart();
+                tracker.onFsyncStart();
+            }
+
+            // Must mark successful checkpoint only if there are no exceptions or interrupts.
+            checkpointWorkflow.markCheckpointEnd(chp);
+
+            tracker.onCheckpointEnd();
+
+            if (chp.hasDelta()) {
+                if (log.isInfoEnabled()) {
+                    log.info(String.format(
+                            "Checkpoint finished [checkpointId=%s, pages=%d, pagesWriteTime=%dms, fsyncTime=%dms, totalTime=%dms]",
+                            chp.progress.id(),
+                            chp.dirtyPagesSize,
+                            tracker.pagesWriteDuration(),
+                            tracker.fsyncDuration(),
+                            tracker.totalDuration()
+                    ));
+                }
+            }
+        } catch (IgniteInternalCheckedException e) {
+            if (chp != null) {
+                chp.progress.fail(e);
+            }
+
+            // TODO: IGNITE-16899 By analogy with 2.0, we need to handle the exception by the FailureProcessor
+
+            throw e;
+        }
+    }
+
+    /**
+     * Writes dirty pages to the appropriate stores.
+     *
+     * @param tracker Checkpoint metrics tracker.
+     * @param checkpointPages Checkpoint pages to write.
+     * @param currentCheckpointProgress Current checkpoint progress.
+     * @param workProgressDispatcher Work progress dispatcher.
+     * @param shutdownNow Checker of stop operation.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    boolean writePages(
+            CheckpointMetricsTracker tracker,
+            IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> checkpointPages,
+            CheckpointProgressImpl currentCheckpointProgress,
+            WorkProgressDispatcher workProgressDispatcher,
+            BooleanSupplier shutdownNow
+    ) throws IgniteInternalCheckedException {
+        ThreadPoolExecutor pageWritePool = checkpointWritePagesPool;
+
+        int checkpointWritePageThreads = pageWritePool == null ? 1 : pageWritePool.getMaximumPoolSize();
+
+        // Identity stores set.
+        ConcurrentMap<PageStore, LongAdder> updStores = new ConcurrentHashMap<>();
+
+        CompletableFuture<?>[] futures = new CompletableFuture[checkpointWritePageThreads];
+
+        tracker.onPagesWriteStart();
+
+        for (int i = 0; i < checkpointWritePageThreads; i++) {
+            CheckpointPagesWriter write = checkpointPagesWriterFactory.build(
+                    tracker,
+                    checkpointPages,
+                    updStores,
+                    futures[i] = new CompletableFuture<>(),
+                    workProgressDispatcher::updateHeartbeat,
+                    currentCheckpointProgress,
+                    shutdownNow
+            );
+
+            if (pageWritePool == null) {
+                write.run();
+            } else {
+                try {
+                    pageWritePool.execute(write);
+                } catch (RejectedExecutionException ignore) {
+                    // Run the task synchronously.
+                    write.run();
+                }
+            }
+        }
+
+        workProgressDispatcher.updateHeartbeat();
+
+        // Wait and check for errors.
+        CompletableFuture.allOf(futures).join();
+
+        // Must re-check shutdown flag here because threads may have skipped some pages.
+        // If so, we should not put finish checkpoint mark.
+        if (shutdownNow.getAsBoolean()) {
+            currentCheckpointProgress.fail(new NodeStoppingException("Node is stopping."));
+
+            return false;
+        }
+
+        tracker.onFsyncStart();
+
+        syncUpdatedStores(updStores);
+
+        if (shutdownNow.getAsBoolean()) {
+            currentCheckpointProgress.fail(new NodeStoppingException("Node is stopping."));
+
+            return false;
+        }
+
+        return true;
+    }
+
+    private void syncUpdatedStores(
+            ConcurrentMap<PageStore, LongAdder> updatedStoresForSync
+    ) throws IgniteInternalCheckedException {
+        ThreadPoolExecutor pageWritePool = checkpointWritePagesPool;
+
+        if (pageWritePool == null) {
+            for (Map.Entry<PageStore, LongAdder> updStoreEntry : updatedStoresForSync.entrySet()) {
+                if (shutdownNow) {
+                    return;
+                }
+
+                blockingSectionBegin();
+
+                try {
+                    updStoreEntry.getKey().sync();
+                } finally {
+                    blockingSectionEnd();
+                }
+
+                currentCheckpointProgress.syncedPagesCounter().addAndGet(updStoreEntry.getValue().intValue());
+            }
+        } else {
+            int checkpointThreads = pageWritePool.getMaximumPoolSize();
+
+            CompletableFuture<?>[] futures = new CompletableFuture[checkpointThreads];
+
+            for (int i = 0; i < checkpointThreads; i++) {
+                futures[i] = new CompletableFuture<>();
+            }
+
+            BlockingQueue<Entry<PageStore, LongAdder>> queue = new LinkedBlockingQueue<>(updatedStoresForSync.entrySet());
+
+            for (int i = 0; i < checkpointThreads; i++) {
+                int threadIdx = i;
+
+                pageWritePool.execute(() -> {
+                    Map.Entry<PageStore, LongAdder> updStoreEntry = queue.poll();
+
+                    try {
+                        while (updStoreEntry != null) {
+                            if (shutdownNow) {
+                                return;
+                            }
+
+                            blockingSectionBegin();
+
+                            try {
+                                updStoreEntry.getKey().sync();
+                            } finally {
+                                blockingSectionEnd();
+                            }
+
+                            currentCheckpointProgress.syncedPagesCounter().addAndGet(updStoreEntry.getValue().intValue());
+
+                            updStoreEntry = queue.poll();
+                        }
+
+                        futures[threadIdx].complete(null);
+                    } catch (Throwable t) {
+                        futures[threadIdx].completeExceptionally(t);
+                    }
+                });
+            }
+
+            blockingSectionBegin();
+
+            try {
+                CompletableFuture.allOf(futures).join();
+            } finally {
+                blockingSectionEnd();
+            }
+        }
+    }
+
+    /**
+     * Waiting until the next checkpoint time.
+     */
+    void waitCheckpointEvent() {
+        try {
+            synchronized (this) {
+                long remaining = NANOSECONDS.toMillis(scheduledCheckpointProgress.nextCheckpointNanos() - nanoTime());
+
+                while (remaining > 0 && !isCancelled()) {
+                    blockingSectionBegin();
+
+                    try {
+                        wait(remaining);
+
+                        remaining = NANOSECONDS.toMillis(scheduledCheckpointProgress.nextCheckpointNanos() - nanoTime());
+                    } finally {
+                        blockingSectionEnd();
+                    }
+                }
+            }
+        } catch (InterruptedException ignored) {
+            Thread.currentThread().interrupt();
+
+            isCancelled.set(true);
+        }
+    }
+
+    /**
+     * Returns duration of possible JVM pause, if it was detected, or {@code -1} otherwise.
+     *
+     * @param tracker Checkpoint metrics tracker.
+     */
+    private long possibleLongJvmPauseDuration(CheckpointMetricsTracker tracker) {
+        if (pauseDetector != null) {
+            if (tracker.writeLockWaitDuration() + tracker.writeLockHoldDuration() > pauseDetector.longJvmPauseThreshold()) {
+                long now = coarseCurrentTimeMillis();
+
+                // We must get last wake-up time before search possible pause in events map.
+                long wakeUpTime = pauseDetector.getLastWakeUpTime();
+
+                IgniteBiTuple<Long, Long> lastLongPause = pauseDetector.getLastLongPause();
+
+                if (lastLongPause != null && tracker.checkpointStartTime() < lastLongPause.get1()) {
+                    return lastLongPause.get2();
+                }
+
+                if (now - wakeUpTime > pauseDetector.longJvmPauseThreshold()) {
+                    return now - wakeUpTime;
+                }
+            }
+        }
+
+        return -1L;
+    }
+
+    /**
+     * Update the current checkpoint info from the scheduled one.
+     */
+    void startCheckpointProgress() {
+        long checkpointStartTimestamp = coarseCurrentTimeMillis();
+
+        // This can happen in an unlikely event of two checkpoints happening within a currentTimeMillis() granularity window.
+        if (checkpointStartTimestamp == lastCheckpointTimestamp) {
+            checkpointStartTimestamp++;
+        }
+
+        lastCheckpointTimestamp = checkpointStartTimestamp;
+
+        synchronized (this) {
+            CheckpointProgressImpl curr = scheduledCheckpointProgress;
+
+            if (curr.reason() == null) {
+                curr.reason("timeout");
+            }
+
+            // It is important that we assign a new progress object before checkpoint mark in page memory.
+            scheduledCheckpointProgress = new CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));
+
+            currentCheckpointProgress = curr;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void cancel() {
+        if (log.isDebugEnabled()) {
+            log.debug("Cancelling grid runnable: " + this);
+        }
+
+        // Do not interrupt runner thread.
+        isCancelled.set(true);
+
+        synchronized (this) {
+            notifyAll();
+        }
+    }
+
+    /**
+     * Stopping all checkpoint activity immediately even if the current checkpoint is in progress.
+     */
+    public void shutdownNow() {
+        shutdownNow = true;
+
+        if (!isCancelled.get()) {
+            cancel();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        if (runner() != null) {
+            return;
+        }
+
+        assert runner() == null : "Checkpointer is running.";
+
+        new IgniteThread(igniteInstanceName(), name(), this).start();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        // Let's write the data.
+        shutdownCheckpointer(true);
+    }
+
+    /**
+     * Shutdown checkpointer.
+     *
+     * @param cancel Cancel flag.
+     */
+    public void shutdownCheckpointer(boolean cancel) {
+        if (cancel) {
+            shutdownNow();
+        } else {
+            cancel();
+        }
+
+        try {
+            join();
+        } catch (InterruptedException ignore) {
+            log.warn("Was interrupted while waiting for checkpointer shutdown, will not wait for checkpoint to finish.");
+
+            Thread.currentThread().interrupt();
+
+            shutdownNow();
+
+            while (true) {
+                try {
+                    join();
+
+                    scheduledCheckpointProgress.fail(new NodeStoppingException("Checkpointer is stopped during node stop."));
+
+                    break;
+                } catch (InterruptedException ignored) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+
+            Thread.currentThread().interrupt();
+        }
+
+        if (checkpointWritePagesPool != null) {
+            shutdownAndAwaitTermination(checkpointWritePagesPool, 2, MINUTES);
+        }
+    }
+
+    /**
+     * Returns progress of current checkpoint, last finished one or {@code null}, if checkpoint has never started.
+     */
+    public @Nullable CheckpointProgress currentProgress() {
+        return currentCheckpointProgress;
+    }
+
+    /**
+     * Returns progress of scheduled checkpoint.
+     */
+    CheckpointProgress scheduledProgress() {
+        return scheduledCheckpointProgress;
+    }
+
+    /**
+     * Returns {@code true} if checkpoint should be stopped immediately.
+     */
+    boolean isShutdownNow() {
+        return shutdownNow;
+    }
+
+    /**
+     * Skip checkpoint on node stop.
+     *
+     * @param skip If {@code true} skips checkpoint on node stop.
+     */
+    public void skipCheckpointOnNodeStop(boolean skip) {
+        skipCheckpointOnNodeStop = skip;
+    }
+
+    /**
+     * Gets a checkpoint interval with a randomized delay in mills.
+     *
+     * <p>It helps when the cluster makes a checkpoint in the same time in every node.
+     */
+    private long nextCheckpointInterval() {
+        long checkpointFrequency = checkpointFrequencySupplier.getAsLong();
+
+        long startDelay = ThreadLocalRandom.current().nextLong(max(safeAbs(checkpointFrequency) / 100, 1))
+                - max(safeAbs(checkpointFrequency) / 200, 1);
+
+        return safeAbs(checkpointFrequency + startDelay);
+    }
+
+    /**
+     * For test use only.
+     *
+     * @deprecated Should be rewritten to public API.
+     */
+    @TestOnly
+    public CompletableFuture<?> enableCheckpoints(boolean enable) {

Review Comment:
   Can we avoid custom test code that ADDS FIELDS only required for testing? This is a bad design, we should fight it, please add TODO for removing it



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.TRY_AGAIN_TAG;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.flag;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * Implementation of page writer which able to store pages to disk during checkpoint.
+ */
+public class CheckpointPagesWriter implements Runnable {
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Checkpoint specific metrics tracker. */
+    private final CheckpointMetricsTracker tracker;
+
+    /** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection. */
+    private final IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> writePageIds;
+
+    /** Page store used to write -> Count of written pages. */
+    private final ConcurrentMap<PageStore, LongAdder> updStores;
+
+    /** Future which should be finished when all pages would be written. */
+    private final CompletableFuture<?> doneFut;
+
+    /** Some action which will be executed every time before page will be written. */
+    private final Runnable beforePageWrite;
+
+    /** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */
+    private final ThreadLocal<ByteBuffer> threadBuf;
+
+    /** Current checkpoint. This field is updated only by checkpoint thread. */
+    private final CheckpointProgressImpl checkpointProgress;
+
+    /** Writer which able to write one page. */
+    private final CheckpointPageWriter pageWriter;
+
+    /** Shutdown now. */
+    private final BooleanSupplier shutdownNow;
+
+    /**
+     * Creates task for write pages.
+     *
+     * @param tracker Checkpoint metrics tracker.
+     * @param writePageIds Collection of page IDs to write.
+     * @param updStores Updating storage.
+     * @param doneFut Done future.
+     * @param beforePageWrite Action to be performed before every page write.
+     * @param log Logger.
+     * @param threadBuf Thread local byte buffer.
+     * @param checkpointProgress Checkpoint progress.
+     * @param pageWriter File page store manager.
+     * @param shutdownNow Shutdown supplier.
+     */
+    CheckpointPagesWriter(
+            IgniteLogger log,
+            CheckpointMetricsTracker tracker,
+            IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> writePageIds,
+            ConcurrentMap<PageStore, LongAdder> updStores,
+            CompletableFuture<?> doneFut,
+            Runnable beforePageWrite,
+            ThreadLocal<ByteBuffer> threadBuf,
+            CheckpointProgressImpl checkpointProgress,
+            CheckpointPageWriter pageWriter,
+            BooleanSupplier shutdownNow
+    ) {
+        this.log = log;
+        this.tracker = tracker;
+        this.writePageIds = writePageIds;
+        this.updStores = updStores;
+        this.doneFut = doneFut;
+        this.beforePageWrite = beforePageWrite;
+        this.threadBuf = threadBuf;
+        this.checkpointProgress = checkpointProgress;
+        this.pageWriter = pageWriter;
+        this.shutdownNow = shutdownNow;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void run() {
+        try {
+            IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> pagesToRetry = writePages(writePageIds);
+
+            if (pagesToRetry.isEmpty()) {
+                doneFut.complete(null);
+            } else {
+                if (log.isInfoEnabled()) {
+                    log.info(pagesToRetry.initialSize() + " checkpoint pages were not written yet due to "
+                            + "unsuccessful page write lock acquisition and will be retried");
+                }
+
+                while (!pagesToRetry.isEmpty()) {
+                    pagesToRetry = writePages(pagesToRetry);
+                }
+
+                doneFut.complete(null);
+            }
+        } catch (Throwable e) {
+            doneFut.completeExceptionally(e);
+        }
+    }
+
+    /**
+     * Writes pages.
+     *
+     * @param writePageIds Collections of pages to write.
+     * @return pagesToRetry Pages which should be retried.
+     */
+    private IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> writePages(
+            IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> writePageIds
+    ) throws IgniteInternalCheckedException {
+        Map<PageMemoryImpl, List<FullPageId>> pagesToRetry = new HashMap<>();
+
+        Map<PageMemoryImpl, PageStoreWriter> pageStoreWriters = new HashMap<>();
+
+        ByteBuffer tmpWriteBuf = threadBuf.get();
+
+        IgniteConcurrentMultiPairQueue.Result<PageMemoryImpl, FullPageId> res = new IgniteConcurrentMultiPairQueue.Result<>();
+
+        while (!shutdownNow.getAsBoolean() && writePageIds.next(res)) {
+            beforePageWrite.run();
+
+            FullPageId fullId = res.getValue();
+
+            PageMemoryImpl pageMemory = res.getKey();
+
+            tmpWriteBuf.rewind();
+
+            PageStoreWriter pageStoreWriter = pageStoreWriters.computeIfAbsent(pageMemory, pm -> createPageStoreWriter(pm, pagesToRetry));
+
+            pageMemory.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker);
+        }
+
+        return pagesToRetry.isEmpty() ? EMPTY : new IgniteConcurrentMultiPairQueue<>(pagesToRetry);

Review Comment:
   Just comment. Maybe we don't write end marker if node is stopping and that's enough. I really hope so



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.TRY_AGAIN_TAG;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.flag;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * Implementation of page writer which able to store pages to disk during checkpoint.
+ */
+public class CheckpointPagesWriter implements Runnable {
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Checkpoint specific metrics tracker. */
+    private final CheckpointMetricsTracker tracker;
+
+    /** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection. */
+    private final IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> writePageIds;
+
+    /** Page store used to write -> Count of written pages. */
+    private final ConcurrentMap<PageStore, LongAdder> updStores;
+
+    /** Future which should be finished when all pages would be written. */
+    private final CompletableFuture<?> doneFut;
+
+    /** Some action which will be executed every time before page will be written. */
+    private final Runnable beforePageWrite;
+
+    /** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */
+    private final ThreadLocal<ByteBuffer> threadBuf;
+
+    /** Current checkpoint. This field is updated only by checkpoint thread. */
+    private final CheckpointProgressImpl checkpointProgress;
+
+    /** Writer which able to write one page. */
+    private final CheckpointPageWriter pageWriter;
+
+    /** Shutdown now. */
+    private final BooleanSupplier shutdownNow;

Review Comment:
   I think that in the future we should replace it with busy lock, what do you think? All these suppliers look ugly



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.TRY_AGAIN_TAG;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.flag;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * Implementation of page writer which able to store pages to disk during checkpoint.
+ */
+public class CheckpointPagesWriter implements Runnable {
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Checkpoint specific metrics tracker. */
+    private final CheckpointMetricsTracker tracker;
+
+    /** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection. */
+    private final IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> writePageIds;
+
+    /** Page store used to write -> Count of written pages. */
+    private final ConcurrentMap<PageStore, LongAdder> updStores;
+
+    /** Future which should be finished when all pages would be written. */
+    private final CompletableFuture<?> doneFut;
+
+    /** Some action which will be executed every time before page will be written. */
+    private final Runnable beforePageWrite;

Review Comment:
   Is this really needed?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointPagesWriter.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.checkpoint;
+
+import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getType;
+import static org.apache.ignite.internal.pagememory.io.PageIo.getVersion;
+import static org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl.TRY_AGAIN_TAG;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.IgniteConcurrentMultiPairQueue.EMPTY;
+import static org.apache.ignite.internal.pagememory.util.PageIdUtils.flag;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.internal.pagememory.persistence.PageStoreWriter;
+import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * Implementation of page writer which able to store pages to disk during checkpoint.
+ */
+public class CheckpointPagesWriter implements Runnable {
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Checkpoint specific metrics tracker. */
+    private final CheckpointMetricsTracker tracker;
+
+    /** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection. */
+    private final IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> writePageIds;
+
+    /** Page store used to write -> Count of written pages. */
+    private final ConcurrentMap<PageStore, LongAdder> updStores;
+
+    /** Future which should be finished when all pages would be written. */
+    private final CompletableFuture<?> doneFut;
+
+    /** Some action which will be executed every time before page will be written. */
+    private final Runnable beforePageWrite;
+
+    /** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */
+    private final ThreadLocal<ByteBuffer> threadBuf;
+
+    /** Current checkpoint. This field is updated only by checkpoint thread. */
+    private final CheckpointProgressImpl checkpointProgress;
+
+    /** Writer which able to write one page. */
+    private final CheckpointPageWriter pageWriter;
+
+    /** Shutdown now. */
+    private final BooleanSupplier shutdownNow;
+
+    /**
+     * Creates task for write pages.
+     *
+     * @param tracker Checkpoint metrics tracker.
+     * @param writePageIds Collection of page IDs to write.
+     * @param updStores Updating storage.
+     * @param doneFut Done future.
+     * @param beforePageWrite Action to be performed before every page write.
+     * @param log Logger.
+     * @param threadBuf Thread local byte buffer.
+     * @param checkpointProgress Checkpoint progress.
+     * @param pageWriter File page store manager.
+     * @param shutdownNow Shutdown supplier.
+     */
+    CheckpointPagesWriter(
+            IgniteLogger log,
+            CheckpointMetricsTracker tracker,
+            IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> writePageIds,
+            ConcurrentMap<PageStore, LongAdder> updStores,
+            CompletableFuture<?> doneFut,
+            Runnable beforePageWrite,
+            ThreadLocal<ByteBuffer> threadBuf,
+            CheckpointProgressImpl checkpointProgress,
+            CheckpointPageWriter pageWriter,
+            BooleanSupplier shutdownNow
+    ) {
+        this.log = log;
+        this.tracker = tracker;
+        this.writePageIds = writePageIds;
+        this.updStores = updStores;
+        this.doneFut = doneFut;
+        this.beforePageWrite = beforePageWrite;
+        this.threadBuf = threadBuf;
+        this.checkpointProgress = checkpointProgress;
+        this.pageWriter = pageWriter;
+        this.shutdownNow = shutdownNow;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void run() {
+        try {
+            IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> pagesToRetry = writePages(writePageIds);
+
+            if (pagesToRetry.isEmpty()) {
+                doneFut.complete(null);
+            } else {
+                if (log.isInfoEnabled()) {
+                    log.info(pagesToRetry.initialSize() + " checkpoint pages were not written yet due to "
+                            + "unsuccessful page write lock acquisition and will be retried");
+                }
+
+                while (!pagesToRetry.isEmpty()) {
+                    pagesToRetry = writePages(pagesToRetry);
+                }
+
+                doneFut.complete(null);
+            }
+        } catch (Throwable e) {
+            doneFut.completeExceptionally(e);
+        }
+    }
+
+    /**
+     * Writes pages.
+     *
+     * @param writePageIds Collections of pages to write.
+     * @return pagesToRetry Pages which should be retried.
+     */
+    private IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> writePages(
+            IgniteConcurrentMultiPairQueue<PageMemoryImpl, FullPageId> writePageIds
+    ) throws IgniteInternalCheckedException {
+        Map<PageMemoryImpl, List<FullPageId>> pagesToRetry = new HashMap<>();
+
+        Map<PageMemoryImpl, PageStoreWriter> pageStoreWriters = new HashMap<>();
+
+        ByteBuffer tmpWriteBuf = threadBuf.get();
+
+        IgniteConcurrentMultiPairQueue.Result<PageMemoryImpl, FullPageId> res = new IgniteConcurrentMultiPairQueue.Result<>();
+
+        while (!shutdownNow.getAsBoolean() && writePageIds.next(res)) {
+            beforePageWrite.run();
+
+            FullPageId fullId = res.getValue();
+
+            PageMemoryImpl pageMemory = res.getKey();
+
+            tmpWriteBuf.rewind();
+
+            PageStoreWriter pageStoreWriter = pageStoreWriters.computeIfAbsent(pageMemory, pm -> createPageStoreWriter(pm, pagesToRetry));
+
+            pageMemory.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker);
+        }
+
+        return pagesToRetry.isEmpty() ? EMPTY : new IgniteConcurrentMultiPairQueue<>(pagesToRetry);

Review Comment:
   Shouldn't we throw an exception if node is stopping? It shouldn't look like checkpoint is completed when it's not



##########
modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/IgniteConcurrentMultiPairQueueTest.java:
##########
@@ -47,16 +50,16 @@ public class IgniteConcurrentMultiPairQueueTest {
 
     private Map<Integer, Collection<Integer>> mapForCheck2;
 
+    private Integer[] arr1 = {1, 3, 5, 7, 9, 11, 13, 15, 17, 19};

Review Comment:
   Interesting change, what was the motivation?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Persistent store of pages.
+ */
+public interface PageStore extends Closeable {
+    /** Type for affinity partitions. */
+    byte TYPE_DATA = 1;
+
+    /** Type for index partition. */
+    byte TYPE_IDX = 2;
+
+    /**
+     * Adds page write listener.
+     *
+     * @param listener Page write listener.
+     */
+    void addWriteListener(PageWriteListener listener);
+
+    /**
+     * Removes page write listener.
+     *
+     * @param listener Page write listener.
+     */
+    void removeWriteListener(PageWriteListener listener);
+
+    /**
+     * Returns {@code true} if the page store exists.
+     */
+    boolean exists();
+
+    /**
+     * Allocates next page index.
+     *
+     * @return Next page index.
+     * @throws IgniteInternalCheckedException If failed to allocate.
+     */
+    long allocatePage() throws IgniteInternalCheckedException;
+
+    /**
+     * Returns number of allocated pages.
+     */
+    long pages();
+
+    /**
+     * Reads a page.
+     *
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer to read into.
+     * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code
+     * keepCrc}.
+     * @return {@code True} if page has been read successfully, {@code false} if page hasn't been written yet.
+     * @throws IgniteInternalCheckedException If reading failed (IO error occurred).
+     */
+    boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException;
+
+    /**
+     * Reads a page store header.
+     *
+     * @param buf Buffer to write to.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    void readHeader(ByteBuffer buf) throws IgniteInternalCheckedException;
+
+    /**
+     * Writes a page.
+     *
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer to write from.
+     * @param tag Partition page store version, 1-based incrementing counter. For outdated pages {@code tag} has lower value,
+     *      and write does nothing.
+     * @param calculateCrc If {@code false} crc calculation will be forcibly skipped.
+     * @throws IgniteInternalCheckedException If page writing failed (IO error occurred).
+     */
+    void write(long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteInternalCheckedException;
+
+    /**
+     * Returns page offset within the page store.
+     *
+     * @param pageId Page ID.
+     */
+    long pageOffset(long pageId);
+
+    /**
+     * Sync method used to ensure that the given pages are guaranteed to be written to the page store.
+     *
+     * @throws IgniteInternalCheckedException If sync failed (IO error occurred).
+     */
+    void sync() throws IgniteInternalCheckedException;
+
+    /**
+     * Initializes the page store if it hasn't already.
+     *
+     * @throws IgniteInternalCheckedException If initialization failed (IO error occurred).
+     */
+    void ensure() throws IgniteInternalCheckedException;
+
+    /**
+     * Returns page store version.
+     */
+    int version();
+
+    /**
+     * Stops the page store.
+     *
+     * @param clean {@code True} to clean page store.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    void stop(boolean clean) throws IgniteInternalCheckedException;
+
+    /**
+     * Truncates and deletes page store.
+     *
+     * @param tag New partition tag.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    void truncate(int tag) throws IgniteInternalCheckedException;
+
+    /**
+     * Returns page size in bytes.
+     */
+    int pageSize();
+
+    /**
+     * Returns page store block size or negative value if unknown or not supported.
+     */
+    int blockSize();
+
+    /**
+     * Returns size of the page store in bytes.
+     *
+     * <p>May differ from {@link #pages} * {@link #pageSize} due to delayed writes or due to other implementation specific details.
+     */
+    long size();

Review Comment:
   Is this for logging only?



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/store/PageStore.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.persistence.store;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * Persistent store of pages.
+ */
+public interface PageStore extends Closeable {
+    /** Type for affinity partitions. */
+    byte TYPE_DATA = 1;
+
+    /** Type for index partition. */
+    byte TYPE_IDX = 2;
+
+    /**
+     * Adds page write listener.
+     *
+     * @param listener Page write listener.
+     */
+    void addWriteListener(PageWriteListener listener);
+
+    /**
+     * Removes page write listener.
+     *
+     * @param listener Page write listener.
+     */
+    void removeWriteListener(PageWriteListener listener);
+
+    /**
+     * Returns {@code true} if the page store exists.
+     */
+    boolean exists();
+
+    /**
+     * Allocates next page index.
+     *
+     * @return Next page index.
+     * @throws IgniteInternalCheckedException If failed to allocate.
+     */
+    long allocatePage() throws IgniteInternalCheckedException;
+
+    /**
+     * Returns number of allocated pages.
+     */
+    long pages();
+
+    /**
+     * Reads a page.
+     *
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer to read into.
+     * @param keepCrc By default, reading zeroes CRC which was on page store, but you can keep it in {@code pageBuf} if set {@code
+     * keepCrc}.
+     * @return {@code True} if page has been read successfully, {@code false} if page hasn't been written yet.
+     * @throws IgniteInternalCheckedException If reading failed (IO error occurred).
+     */
+    boolean read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteInternalCheckedException;
+
+    /**
+     * Reads a page store header.
+     *
+     * @param buf Buffer to write to.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    void readHeader(ByteBuffer buf) throws IgniteInternalCheckedException;
+
+    /**
+     * Writes a page.
+     *
+     * @param pageId Page ID.
+     * @param pageBuf Page buffer to write from.
+     * @param tag Partition page store version, 1-based incrementing counter. For outdated pages {@code tag} has lower value,
+     *      and write does nothing.
+     * @param calculateCrc If {@code false} crc calculation will be forcibly skipped.
+     * @throws IgniteInternalCheckedException If page writing failed (IO error occurred).
+     */
+    void write(long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteInternalCheckedException;
+
+    /**
+     * Returns page offset within the page store.
+     *
+     * @param pageId Page ID.
+     */
+    long pageOffset(long pageId);
+
+    /**
+     * Sync method used to ensure that the given pages are guaranteed to be written to the page store.
+     *
+     * @throws IgniteInternalCheckedException If sync failed (IO error occurred).
+     */
+    void sync() throws IgniteInternalCheckedException;
+
+    /**
+     * Initializes the page store if it hasn't already.
+     *
+     * @throws IgniteInternalCheckedException If initialization failed (IO error occurred).
+     */
+    void ensure() throws IgniteInternalCheckedException;
+
+    /**
+     * Returns page store version.
+     */
+    int version();
+
+    /**
+     * Stops the page store.
+     *
+     * @param clean {@code True} to clean page store.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    void stop(boolean clean) throws IgniteInternalCheckedException;
+
+    /**
+     * Truncates and deletes page store.
+     *
+     * @param tag New partition tag.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    void truncate(int tag) throws IgniteInternalCheckedException;
+
+    /**
+     * Returns page size in bytes.
+     */
+    int pageSize();
+
+    /**
+     * Returns page store block size or negative value if unknown or not supported.
+     */
+    int blockSize();

Review Comment:
   I don't think we need this method



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -17,12 +17,766 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
+import static java.lang.Math.max;
+import static java.lang.System.nanoTime;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_RUNNER_THREAD_PREFIX;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.worker.IgniteWorker;
+import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
+import org.apache.ignite.internal.util.worker.WorkProgressDispatcher;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
 /**
- * Empty.
+ * Checkpointer object is used for notification on checkpoint begin, predicate is {@code nextCheckpointTimestamps - now > 0}.
+ *
+ * <p>Method {@link #scheduleCheckpoint} uses {@link Object#notifyAll()}, {@link #waitCheckpointEvent} uses {@link Object#wait(long)}.
+ *
+ * <p>Checkpointer is one threaded which means that only one checkpoint at the one moment possible.
+ *
+ * <p>Responsiblity:
+ * <ul>
+ * <li>Provide the API for schedule/trigger the checkpoint.</li>
+ * <li>Schedule new checkpoint after current one according to checkpoint frequency.</li>
+ * <li>Failure handling.</li>
+ * <li>Managing of page write threads.</li>
+ * <li>Logging and metrics of checkpoint.</li>
+ * </ul>
+ *
+ * <p>Checkpointer steps:
+ * <ul>
+ * <li>Awaiting checkpoint event.</li>
+ * <li>Collect all dirty pages from page memory under checkpoint write lock.</li>
+ * <li>Start to write dirty pages to page store.</li>
+ * <li>Finish the checkpoint.
+ * </ul>
  */
-// TODO: IGNITE-16935 Continue porting the code
-public abstract class Checkpointer {
-    public abstract Thread runner();
+public class Checkpointer extends IgniteWorker implements IgniteComponent {
+    private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint started ["
+            + "checkpointId=%s, "
+            + "checkpointBeforeWriteLockTime=%dms, "
+            + "checkpointWriteLockWait=%dms, "
+            + "checkpointListenersExecuteTime=%dms, "
+            + "checkpointWriteLockHoldTime=%dms, "
+            + "splitAndSortPagesDuration=%dms, "
+            + "%s"
+            + "pages=%d, "
+            + "reason='%s']";
+
+    /** Avoid the start checkpoint if checkpointer was canceled. */
+    // TODO: IGNITE-16984 Move to config
+    private volatile boolean skipCheckpointOnNodeStop = getBoolean("IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP", false);
+
+    /** Pause detector. */
+    @Nullable
+    private final LongJvmPauseDetector pauseDetector;
+
+    /** Supplier interval in ms after which the checkpoint is triggered if there are no other events. */
+    private final LongSupplier checkpointFrequencySupplier;
+
+    /** Strategy of where and how to get the pages. */
+    private final CheckpointWorkflow checkpointWorkflow;
+
+    /** Factory for the creation of page-write workers. */
+    private final CheckpointPagesWriterFactory checkpointPagesWriterFactory;
+
+    /** Checkpoint runner thread pool. If {@code null} tasks are to be run in single thread. */
+    @Nullable
+    private final ThreadPoolExecutor checkpointWritePagesPool;
+
+    /** Next scheduled checkpoint progress. */
+    private volatile CheckpointProgressImpl scheduledCheckpointProgress;
+
+    /** Current checkpoint progress. This field is updated only by checkpoint thread. */
+    @Nullable
+    private volatile CheckpointProgressImpl currentCheckpointProgress;
+
+    /** Shutdown now. */
+    private volatile boolean shutdownNow;
+
+    /** Last checkpoint timestamp, read/update only in checkpoint thread. */
+    private long lastCheckpointTimestamp;
+
+    @TestOnly
+    @Nullable
+    private volatile CompletableFuture<?> enableChangeAppliedFuture;
+
+    @TestOnly
+    private volatile boolean checkpointsEnabled = true;
+
+    /**
+     * Constructor.
+     *
+     * @param log Logger.
+     * @param igniteInstanceName Name of the Ignite instance.
+     * @param workerListener Listener for life-cycle worker events.
+     * @param detector Long JVM pause detector.
+     * @param checkpointWorkFlow Implementation of checkpoint.
+     * @param factory Page writer factory.
+     * @param checkpointFrequencySupplier Supplier interval in ms after which the checkpoint is triggered if there are no other events.
+     * @param checkpointWritePageThreads The number of IO-bound threads which will write pages to disk.
+     */
+    Checkpointer(
+            IgniteLogger log,
+            String igniteInstanceName,
+            @Nullable IgniteWorkerListener workerListener,
+            @Nullable LongJvmPauseDetector detector,
+            CheckpointWorkflow checkpointWorkFlow,
+            CheckpointPagesWriterFactory factory,
+            int checkpointWritePageThreads,
+            LongSupplier checkpointFrequencySupplier
+    ) {
+        super(log, igniteInstanceName, "checkpoint-thread", workerListener);
+
+        this.pauseDetector = detector;
+        // TODO: IGNITE-16984 Move to config: checkpointFrequency * checkpointFrequencyDeviation see 2.0
+        this.checkpointFrequencySupplier = checkpointFrequencySupplier;
+        this.checkpointWorkflow = checkpointWorkFlow;
+        this.checkpointPagesWriterFactory = factory;
+
+        scheduledCheckpointProgress = new CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));
+
+        // TODO: IGNITE-16984 Move checkpointWritePageThreads to config
+        if (checkpointWritePageThreads > 1) {
+            checkpointWritePagesPool = new ThreadPoolExecutor(
+                    checkpointWritePageThreads,
+                    checkpointWritePageThreads,
+                    30_000,
+                    MILLISECONDS,
+                    new LinkedBlockingQueue<>(),
+                    new NamedThreadFactory(CHECKPOINT_RUNNER_THREAD_PREFIX + "-IO")
+            );
+        } else {
+            checkpointWritePagesPool = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    protected void body() {
+        try {
+            while (!isCancelled()) {
+                waitCheckpointEvent();
+
+                if (skipCheckpointOnNodeStop && (isCancelled() || shutdownNow)) {
+                    if (log.isInfoEnabled()) {
+                        log.warn("Skipping last checkpoint because node is stopping.");

Review Comment:
   warn if info enabled, please fix it



##########
modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java:
##########
@@ -17,12 +17,766 @@
 
 package org.apache.ignite.internal.pagememory.persistence.checkpoint;
 
+import static java.lang.Math.max;
+import static java.lang.System.nanoTime;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_RUNNER_THREAD_PREFIX;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
+import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_TAKEN;
+import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
+import static org.apache.ignite.internal.util.IgniteUtils.safeAbs;
+import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.lang.IgniteSystemProperties.getBoolean;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiConsumer;
+import java.util.function.BooleanSupplier;
+import java.util.function.LongSupplier;
+import org.apache.ignite.internal.components.LongJvmPauseDetector;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.persistence.PageMemoryImpl;
+import org.apache.ignite.internal.pagememory.persistence.store.PageStore;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.worker.IgniteWorker;
+import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
+import org.apache.ignite.internal.util.worker.WorkProgressDispatcher;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
 /**
- * Empty.
+ * Checkpointer object is used for notification on checkpoint begin, predicate is {@code nextCheckpointTimestamps - now > 0}.
+ *
+ * <p>Method {@link #scheduleCheckpoint} uses {@link Object#notifyAll()}, {@link #waitCheckpointEvent} uses {@link Object#wait(long)}.
+ *
+ * <p>Checkpointer is one threaded which means that only one checkpoint at the one moment possible.
+ *
+ * <p>Responsiblity:
+ * <ul>
+ * <li>Provide the API for schedule/trigger the checkpoint.</li>
+ * <li>Schedule new checkpoint after current one according to checkpoint frequency.</li>
+ * <li>Failure handling.</li>
+ * <li>Managing of page write threads.</li>
+ * <li>Logging and metrics of checkpoint.</li>
+ * </ul>
+ *
+ * <p>Checkpointer steps:
+ * <ul>
+ * <li>Awaiting checkpoint event.</li>
+ * <li>Collect all dirty pages from page memory under checkpoint write lock.</li>
+ * <li>Start to write dirty pages to page store.</li>
+ * <li>Finish the checkpoint.
+ * </ul>
  */
-// TODO: IGNITE-16935 Continue porting the code
-public abstract class Checkpointer {
-    public abstract Thread runner();
+public class Checkpointer extends IgniteWorker implements IgniteComponent {
+    private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint started ["
+            + "checkpointId=%s, "
+            + "checkpointBeforeWriteLockTime=%dms, "
+            + "checkpointWriteLockWait=%dms, "
+            + "checkpointListenersExecuteTime=%dms, "
+            + "checkpointWriteLockHoldTime=%dms, "
+            + "splitAndSortPagesDuration=%dms, "
+            + "%s"
+            + "pages=%d, "
+            + "reason='%s']";
+
+    /** Avoid the start checkpoint if checkpointer was canceled. */
+    // TODO: IGNITE-16984 Move to config
+    private volatile boolean skipCheckpointOnNodeStop = getBoolean("IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP", false);
+
+    /** Pause detector. */
+    @Nullable
+    private final LongJvmPauseDetector pauseDetector;
+
+    /** Supplier interval in ms after which the checkpoint is triggered if there are no other events. */
+    private final LongSupplier checkpointFrequencySupplier;
+
+    /** Strategy of where and how to get the pages. */
+    private final CheckpointWorkflow checkpointWorkflow;
+
+    /** Factory for the creation of page-write workers. */
+    private final CheckpointPagesWriterFactory checkpointPagesWriterFactory;
+
+    /** Checkpoint runner thread pool. If {@code null} tasks are to be run in single thread. */
+    @Nullable
+    private final ThreadPoolExecutor checkpointWritePagesPool;
+
+    /** Next scheduled checkpoint progress. */
+    private volatile CheckpointProgressImpl scheduledCheckpointProgress;
+
+    /** Current checkpoint progress. This field is updated only by checkpoint thread. */
+    @Nullable
+    private volatile CheckpointProgressImpl currentCheckpointProgress;
+
+    /** Shutdown now. */
+    private volatile boolean shutdownNow;
+
+    /** Last checkpoint timestamp, read/update only in checkpoint thread. */
+    private long lastCheckpointTimestamp;
+
+    @TestOnly
+    @Nullable
+    private volatile CompletableFuture<?> enableChangeAppliedFuture;
+
+    @TestOnly
+    private volatile boolean checkpointsEnabled = true;
+
+    /**
+     * Constructor.
+     *
+     * @param log Logger.
+     * @param igniteInstanceName Name of the Ignite instance.
+     * @param workerListener Listener for life-cycle worker events.
+     * @param detector Long JVM pause detector.
+     * @param checkpointWorkFlow Implementation of checkpoint.
+     * @param factory Page writer factory.
+     * @param checkpointFrequencySupplier Supplier interval in ms after which the checkpoint is triggered if there are no other events.
+     * @param checkpointWritePageThreads The number of IO-bound threads which will write pages to disk.
+     */
+    Checkpointer(
+            IgniteLogger log,
+            String igniteInstanceName,
+            @Nullable IgniteWorkerListener workerListener,
+            @Nullable LongJvmPauseDetector detector,
+            CheckpointWorkflow checkpointWorkFlow,
+            CheckpointPagesWriterFactory factory,
+            int checkpointWritePageThreads,
+            LongSupplier checkpointFrequencySupplier
+    ) {
+        super(log, igniteInstanceName, "checkpoint-thread", workerListener);
+
+        this.pauseDetector = detector;
+        // TODO: IGNITE-16984 Move to config: checkpointFrequency * checkpointFrequencyDeviation see 2.0
+        this.checkpointFrequencySupplier = checkpointFrequencySupplier;
+        this.checkpointWorkflow = checkpointWorkFlow;
+        this.checkpointPagesWriterFactory = factory;
+
+        scheduledCheckpointProgress = new CheckpointProgressImpl(MILLISECONDS.toNanos(nextCheckpointInterval()));
+
+        // TODO: IGNITE-16984 Move checkpointWritePageThreads to config
+        if (checkpointWritePageThreads > 1) {
+            checkpointWritePagesPool = new ThreadPoolExecutor(
+                    checkpointWritePageThreads,
+                    checkpointWritePageThreads,
+                    30_000,
+                    MILLISECONDS,
+                    new LinkedBlockingQueue<>(),
+                    new NamedThreadFactory(CHECKPOINT_RUNNER_THREAD_PREFIX + "-IO")

Review Comment:
   Why upper case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org