You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/08/10 09:37:17 UTC

[ignite-3] branch main updated: IGNITE-17077 persistedIndex implementation for PDS (#989)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d8e694eaa IGNITE-17077 persistedIndex implementation for PDS (#989)
d8e694eaa is described below

commit d8e694eaa1c4ff2dd709bb9001b5d32c9ebd43de
Author: ibessonov <be...@gmail.com>
AuthorDate: Wed Aug 10 12:37:13 2022 +0300

    IGNITE-17077 persistedIndex implementation for PDS (#989)
---
 .../PageMemoryCheckpointConfigurationSchema.java   |   5 +
 .../pagememory/persistence/PartitionMeta.java      |  38 ++++++-
 .../persistence/checkpoint/CheckpointManager.java  |  11 +++
 .../pagememory/persistence/io/PartitionMetaIo.java |  29 +++++-
 .../persistence/PartitionMetaManagerTest.java      |   2 +-
 .../pagememory/persistence/PartitionMetaTest.java  |   2 +-
 .../persistence/checkpoint/CheckpointerTest.java   |   2 +-
 .../internal/storage/MvPartitionStorage.java       |   8 +-
 .../storage/AbstractMvPartitionStorageTest.java    |  28 ++++++
 .../TestConcurrentHashMapMvPartitionStorage.java   |  13 +++
 .../pagememory/AbstractPageMemoryTableStorage.java |   2 +-
 .../PersistentPageMemoryStorageEngine.java         |   9 +-
 .../PersistentPageMemoryTableStorage.java          |  22 ++++-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   |  25 +----
 .../mv/PersistentPageMemoryMvPartitionStorage.java | 110 +++++++++++++++++++--
 .../mv/VolatilePageMemoryMvPartitionStorage.java   |  37 +++++++
 ...PersistentPageMemoryMvPartitionStorageTest.java |   5 +-
 .../RocksDbStorageEngineConfigurationSchema.java   |   3 +
 .../rocksdb/RocksDbMvPartitionStorageTest.java     |   5 +-
 19 files changed, 304 insertions(+), 52 deletions(-)

diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
index 2352b360a..69dbe516a 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/configuration/schema/PageMemoryCheckpointConfigurationSchema.java
@@ -36,6 +36,11 @@ public class PageMemoryCheckpointConfigurationSchema {
     @Value(hasDefault = true)
     public int frequencyDeviation = 40;
 
+    /** Delay before executing a checkpoint triggered by RAFT. */
+    @Range(min = 0)
+    @Value(hasDefault = true)
+    public int checkpointDelayMillis = 200;
+
     /** Number of checkpoint threads. */
     @Range(min = 1)
     @Value(hasDefault = true)
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java
index 7720cf23a..1b8c9aad1 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/PartitionMeta.java
@@ -45,6 +45,8 @@ public class PartitionMeta {
         }
     }
 
+    private volatile long lastAppliedIndex;
+
     // TODO: IGNITE-17466 Delete it
     private volatile long treeRootPageId;
 
@@ -72,6 +74,7 @@ public class PartitionMeta {
      * Constructor.
      *
      * @param checkpointId Checkpoint ID.
+     * @param lastAppliedIndex Last applied index value.
      * @param treeRootPageId Tree root page ID.
      * @param reuseListRootPageId Reuse list root page ID.
      * @param versionChainTreeRootPageId Version chain tree root page ID.
@@ -81,6 +84,7 @@ public class PartitionMeta {
      */
     public PartitionMeta(
             @Nullable UUID checkpointId,
+            long lastAppliedIndex,
             long treeRootPageId,
             long reuseListRootPageId,
             long versionChainTreeRootPageId,
@@ -88,6 +92,7 @@ public class PartitionMeta {
             long rowVersionFreeListRootPageId,
             int pageCount
     ) {
+        this.lastAppliedIndex = lastAppliedIndex;
         this.treeRootPageId = treeRootPageId;
         this.reuseListRootPageId = reuseListRootPageId;
         this.versionChainTreeRootPageId = versionChainTreeRootPageId;
@@ -108,6 +113,7 @@ public class PartitionMeta {
     PartitionMeta(@Nullable UUID checkpointId, PartitionMetaIo metaIo, long pageAddr) {
         this(
                 checkpointId,
+                metaIo.getLastAppliedIndex(pageAddr),
                 metaIo.getTreeRootPageId(pageAddr),
                 metaIo.getReuseListRootPageId(pageAddr),
                 metaIo.getVersionChainTreeRootPageId(pageAddr),
@@ -117,6 +123,25 @@ public class PartitionMeta {
         );
     }
 
+    /**
+     * Returns a last applied index value.
+     */
+    public long lastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    /**
+     * Sets a last applied index value.
+     *
+     * @param checkpointId Checkpoint ID.
+     * @param lastAppliedIndex Last applied index value.
+     */
+    public void lastAppliedIndex(@Nullable UUID checkpointId, long lastAppliedIndex) {
+        updateSnapshot(checkpointId);
+
+        this.lastAppliedIndex = lastAppliedIndex;
+    }
+
     /**
      * Returns tree root page ID.
      */
@@ -246,7 +271,7 @@ public class PartitionMeta {
      * @param checkpointId Checkpoint ID.
      */
     private void updateSnapshot(@Nullable UUID checkpointId) {
-        PartitionMetaSnapshot current = this.metaSnapshot;
+        PartitionMetaSnapshot current = metaSnapshot;
 
         if (current.checkpointId != checkpointId) {
             META_SNAPSHOT.compareAndSet(this, current, new PartitionMetaSnapshot(checkpointId, this));
@@ -265,6 +290,8 @@ public class PartitionMeta {
     public static class PartitionMetaSnapshot {
         private final @Nullable UUID checkpointId;
 
+        private final long lastAppliedIndex;
+
         private final long treeRootPageId;
 
         private final long reuseListRootPageId;
@@ -285,6 +312,7 @@ public class PartitionMeta {
          */
         private PartitionMetaSnapshot(@Nullable UUID checkpointId, PartitionMeta partitionMeta) {
             this.checkpointId = checkpointId;
+            this.lastAppliedIndex = partitionMeta.lastAppliedIndex;
             this.treeRootPageId = partitionMeta.treeRootPageId;
             this.reuseListRootPageId = partitionMeta.reuseListRootPageId;
             this.versionChainTreeRootPageId = partitionMeta.versionChainTreeRootPageId;
@@ -293,6 +321,13 @@ public class PartitionMeta {
             this.pageCount = partitionMeta.pageCount;
         }
 
+        /**
+         * Returns a last applied index value.
+         */
+        public long lastAppliedIndex() {
+            return lastAppliedIndex;
+        }
+
         /**
          * Returns tree root page ID.
          */
@@ -342,6 +377,7 @@ public class PartitionMeta {
          * @param pageAddr Address of the page with the partition meta.
          */
         void writeTo(PartitionMetaIo metaIo, long pageAddr) {
+            metaIo.setLastAppliedIndex(pageAddr, lastAppliedIndex);
             metaIo.setTreeRootPageId(pageAddr, treeRootPageId);
             metaIo.setReuseListRootPageId(pageAddr, reuseListRootPageId);
             metaIo.setVersionChainTreeRootPageId(pageAddr, versionChainTreeRootPageId);
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
index 2af3d2956..b23a253a2 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointManager.java
@@ -228,6 +228,17 @@ public class CheckpointManager {
         return checkpointer.scheduleCheckpoint(0, reason);
     }
 
+    /**
+     * Schedules a checkpoint in the future.
+     *
+     * @param delayMillis Delay in milliseconds from the curent moment.
+     * @param reason Checkpoint reason.
+     * @return Triggered checkpoint progress.
+     */
+    public CheckpointProgress scheduleCheckpoint(long delayMillis, String reason) {
+        return checkpointer.scheduleCheckpoint(delayMillis, reason);
+    }
+
     /**
      * Returns the progress of the last checkpoint, or the current checkpoint if in progress, {@code null} if no checkpoint has occurred.
      */
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java
index c86a7f45f..5e3baa6a2 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/io/PartitionMetaIo.java
@@ -31,7 +31,9 @@ import org.apache.ignite.lang.IgniteStringBuilder;
  * Io for partition metadata pages.
  */
 public class PartitionMetaIo extends PageIo {
-    private static final int TREE_ROOT_PAGE_ID_OFF = COMMON_HEADER_END;
+    private static final int LAST_APPLIED_INDEX_OFF = COMMON_HEADER_END;
+
+    private static final int TREE_ROOT_PAGE_ID_OFF = LAST_APPLIED_INDEX_OFF + Long.BYTES;
 
     private static final int REUSE_LIST_ROOT_PAGE_ID_OFF = TREE_ROOT_PAGE_ID_OFF + Long.BYTES;
 
@@ -63,6 +65,7 @@ public class PartitionMetaIo extends PageIo {
     public void initNewPage(long pageAddr, long pageId, int pageSize) {
         super.initNewPage(pageAddr, pageId, pageSize);
 
+        setLastAppliedIndex(pageAddr, 0);
         setTreeRootPageId(pageAddr, 0);
         setReuseListRootPageId(pageAddr, 0);
         setVersionChainTreeRootPageId(pageAddr, 0);
@@ -71,6 +74,27 @@ public class PartitionMetaIo extends PageIo {
         setPageCount(pageAddr, 0);
     }
 
+    /**
+     * Sets a last applied index value.
+     *
+     * @param pageAddr Page address.
+     * @param lastAppliedIndex Last applied index value.
+     */
+    public void setLastAppliedIndex(long pageAddr, long lastAppliedIndex) {
+        assertPageType(pageAddr);
+
+        putLong(pageAddr, LAST_APPLIED_INDEX_OFF, lastAppliedIndex);
+    }
+
+    /**
+     * Returns a last applied index value.
+     *
+     * @param pageAddr Page address.
+     */
+    public long getLastAppliedIndex(long pageAddr) {
+        return getLong(pageAddr, LAST_APPLIED_INDEX_OFF);
+    }
+
     /**
      * Sets tree root page ID.
      *
@@ -205,7 +229,8 @@ public class PartitionMetaIo extends PageIo {
     @Override
     protected void printPage(long addr, int pageSize, IgniteStringBuilder sb) {
         sb.app("TablePartitionMeta [").nl()
-                .app("treeRootPageId=").appendHex(getTreeRootPageId(addr)).nl()
+                .app("lastAppliedIndex=").app(getLastAppliedIndex(addr)).nl()
+                .app(", treeRootPageId=").appendHex(getTreeRootPageId(addr)).nl()
                 .app(", reuseListRootPageId=").appendHex(getReuseListRootPageId(addr)).nl()
                 .app(", versionChainTreeRootPageId=").appendHex(getVersionChainTreeRootPageId(addr)).nl()
                 .app(", versionChainFreeListRootPageId=").appendHex(getVersionChainFreeListRootPageId(addr)).nl()
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java
index 974a65378..4ca28e2fb 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaManagerTest.java
@@ -139,7 +139,7 @@ public class PartitionMetaManagerTest {
             try (FilePageStore filePageStore = createFilePageStore(testFilePath)) {
                 manager.writeMetaToBuffer(
                         partId,
-                        new PartitionMeta(UUID.randomUUID(), 200, 1000, 300, 600, 900, 4).metaSnapshot(null),
+                        new PartitionMeta(UUID.randomUUID(), 0, 200, 1000, 300, 600, 900, 4).metaSnapshot(null),
                         buffer.rewind()
                 );
 
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java
index 9b21b7e7f..69e82272b 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/PartitionMetaTest.java
@@ -129,7 +129,7 @@ public class PartitionMetaTest {
     void testSnapshot() {
         UUID checkpointId = null;
 
-        PartitionMeta meta = new PartitionMeta(checkpointId, 0, 0, 0, 0, 0, 0);
+        PartitionMeta meta = new PartitionMeta(checkpointId, 0, 0, 0, 0, 0, 0, 0);
 
         checkSnapshot(meta.metaSnapshot(checkpointId), 0, 0, 0, 0, 0, 0);
         checkSnapshot(meta.metaSnapshot(checkpointId = UUID.randomUUID()), 0, 0, 0, 0, 0, 0);
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
index 6a47e317f..fb4710434 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointerTest.java
@@ -351,7 +351,7 @@ public class CheckpointerTest {
 
         partitionMetaManager.addMeta(
                 new GroupPartitionId(0, 0),
-                new PartitionMeta(null, 0, 0, 0, 0, 0, 3)
+                new PartitionMeta(null, 0, 0, 0, 0, 0, 0, 3)
         );
 
         FilePageStore filePageStore = mock(FilePageStore.class);
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index ed59c62ea..3f540f6d4 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -52,9 +52,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      * @return Closure result.
      * @throws StorageException If failed to write data to the storage.
      */
-    default <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
-        return closure.execute();
-    }
+    <V> V runConsistently(WriteClosure<V> closure) throws StorageException;
 
     /**
      * Flushes current state of the data or <i>the state from the nearest future</i> to the storage. It means that the future can be
@@ -63,9 +61,7 @@ public interface MvPartitionStorage extends AutoCloseable {
      *
      * @return Future that's completed when flushing of the data is completed.
      */
-    default CompletableFuture<Void> flush() {
-        return CompletableFuture.completedFuture(null);
-    }
+    CompletableFuture<Void> flush();
 
     /**
      * Index of the highest write command applied to the storage. {@code 0} if index is unknown.
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 6e8cfc48a..b346b859b 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -18,10 +18,12 @@
 package org.apache.ignite.internal.storage;
 
 import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -35,6 +37,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
 import java.util.stream.StreamSupport;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -679,4 +682,29 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
 
         assertThrows(TxIdMismatchException.class, cursor::next);
     }
+
+    /**
+     * Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be successfully read and that it's returned from
+     * {@link MvPartitionStorage#persistedIndex()} after the {@link MvPartitionStorage#flush()}.
+     */
+    @Test
+    void testAppliedIndex() {
+        storage.runConsistently(() -> {
+            assertEquals(0, storage.lastAppliedIndex());
+            assertEquals(0, storage.persistedIndex());
+
+            storage.lastAppliedIndex(1);
+
+            assertEquals(1, storage.lastAppliedIndex());
+            assertThat(storage.persistedIndex(), is(lessThanOrEqualTo(1L)));
+
+            return null;
+        });
+
+        CompletableFuture<Void> flushFuture = storage.flush();
+
+        assertThat(flushFuture, willCompleteSuccessfully());
+
+        assertEquals(1, storage.persistedIndex());
+    }
 }
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
index 5716d8a27..2677c70b2 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiConsumer;
@@ -74,6 +75,18 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora
         }
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
+        return closure.execute();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> flush() {
+        return CompletableFuture.completedFuture(null);
+    }
+
     /** {@inheritDoc} */
     @Override
     public long lastAppliedIndex() {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 36190248a..6210ba4c9 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -58,7 +58,7 @@ public abstract class AbstractPageMemoryTableStorage implements TableStorage, Mv
      *
      * @param tableCfg Table configuration.
      */
-    public AbstractPageMemoryTableStorage(TableConfiguration tableCfg) {
+    protected AbstractPageMemoryTableStorage(TableConfiguration tableCfg) {
         this.tableCfg = tableCfg;
     }
 
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index ee80383e7..0eafd238b 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -101,6 +101,13 @@ public class PersistentPageMemoryStorageEngine implements StorageEngine {
         this.longJvmPauseDetector = longJvmPauseDetector;
     }
 
+    /**
+     * Returns a storage engine configuration.
+     */
+    public PersistentPageMemoryStorageEngineConfiguration configuration() {
+        return engineConfig;
+    }
+
     /** {@inheritDoc} */
     @Override
     public void start() throws StorageException {
@@ -188,7 +195,7 @@ public class PersistentPageMemoryStorageEngine implements StorageEngine {
 
         PersistentPageMemoryDataStorageView dataStorageView = (PersistentPageMemoryDataStorageView) tableView.dataStorage();
 
-        return new PersistentPageMemoryTableStorage(tableCfg, regions.get(dataStorageView.dataRegion()));
+        return new PersistentPageMemoryTableStorage(this, tableCfg, regions.get(dataStorageView.dataRegion()));
     }
 
     /** {@inheritDoc} */
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 5add88f72..f578728bc 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -45,23 +45,37 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
  * Implementation of {@link AbstractPageMemoryTableStorage} for persistent case.
  */
 public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableStorage {
+    /** Storage engine instance. */
+    private final PersistentPageMemoryStorageEngine engine;
+
+    /** Data region instance. */
     private final PersistentPageMemoryDataRegion dataRegion;
 
     /**
      * Constructor.
      *
+     * @param engine Storage engine instance.
      * @param tableCfg Table configuration.
      * @param dataRegion Data region for the table.
      */
     public PersistentPageMemoryTableStorage(
+            PersistentPageMemoryStorageEngine engine,
             TableConfiguration tableCfg,
             PersistentPageMemoryDataRegion dataRegion
     ) {
         super(tableCfg);
 
+        this.engine = engine;
         this.dataRegion = dataRegion;
     }
 
+    /**
+     * Returns a storage engine instance.
+     */
+    public PersistentPageMemoryStorageEngine engine() {
+        return engine;
+    }
+
     /** {@inheritDoc} */
     @Override
     public boolean isVolatile() {
@@ -259,13 +273,15 @@ public class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableSto
             );
 
             return new PersistentPageMemoryMvPartitionStorage(
+                    this,
                     partitionId,
                     tableView,
-                    dataRegion.pageMemory(),
+                    dataRegion,
+                    checkpointManager,
+                    meta,
                     versionChainFreeList,
                     rowVersionFreeList,
-                    versionChainTree,
-                    checkpointTimeoutLock
+                    versionChainTree
             );
         } catch (IgniteInternalCheckedException e) {
             throw new StorageException(
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 904251a3a..b9b1e5773 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -67,11 +67,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
             ScanVersionChainByTimestamp::new
     );
 
-    /**
-     * Last applied index value.
-     */
-    private volatile long lastAppliedIndex = 0;
-
     /**
      * Constructor.
      *
@@ -82,7 +77,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
      * @param rowVersionFreeList Free list for {@link RowVersion}.
      * @param versionChainTree Table tree for {@link VersionChain}.
      */
-    public AbstractPageMemoryMvPartitionStorage(
+    protected AbstractPageMemoryMvPartitionStorage(
             int partId,
             TableView tableView,
             PageMemory pageMemory,
@@ -102,24 +97,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio
         rowVersionDataPageReader = new DataPageReader(pageMemory, groupId, IoStatisticsHolderNoOp.INSTANCE);
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public long lastAppliedIndex() {
-        return lastAppliedIndex;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void lastAppliedIndex(long lastAppliedIndex) throws StorageException {
-        this.lastAppliedIndex = lastAppliedIndex;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public long persistedIndex() {
-        return lastAppliedIndex;
-    }
-
     /** {@inheritDoc} */
     @Override
     public @Nullable BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 70ff1fd28..945e1e0a9 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -17,42 +17,85 @@
 
 package org.apache.ignite.internal.storage.pagememory.mv;
 
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.internal.pagememory.DataRegion;
+import org.apache.ignite.internal.pagememory.persistence.PartitionMeta;
 import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
+import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState;
 import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTimeoutLock;
 import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryTableStorage;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineView;
 
 /**
  * Implementation of {@link MvPartitionStorage} based on a {@link BplusTree} for persistent case.
  */
 public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMvPartitionStorage {
+    /** Table storage instance. */
+    private final PersistentPageMemoryTableStorage tableStorage;
+
+    /** Checkpoint manager instance. */
+    private final CheckpointManager checkpointManager;
+
+    /** Checkpoint lock instance. */
     private final CheckpointTimeoutLock checkpointTimeoutLock;
 
+    /** Partition meta instance. */
+    private final PartitionMeta meta;
+
+    /** Value of currently persisted last applied index. */
+    private volatile long persistedIndex;
+
+    /** Checkpoint listener that updates the value of {@link #persistedIndex}. */
+    private final CheckpointListener checkpointListener = new CheckpointListener() {
+        /** {@inheritDoc} */
+        @Override
+        public void afterCheckpointEnd(CheckpointProgress progress) {
+            persistedIndex = meta.metaSnapshot(progress.id()).lastAppliedIndex();
+        }
+    };
+
     /**
      * Constructor.
      *
-     * @param partId Partition id.
+     * @param tableStorage Table storage.
+     * @param partitionId Partition id.
      * @param tableView Table configuration.
-     * @param pageMemory Page memory.
+     * @param dataRegion Data region.
+     * @param checkpointManager Checkpoint manager.
+     * @param meta Partition meta.
      * @param versionChainFreeList Free list for {@link VersionChain}.
      * @param rowVersionFreeList Free list for {@link RowVersion}.
      * @param versionChainTree Table tree for {@link VersionChain}.
-     * @param checkpointTimeoutLock Checkpoint timeout lock.
      */
     public PersistentPageMemoryMvPartitionStorage(
-            int partId,
+            PersistentPageMemoryTableStorage tableStorage,
+            int partitionId,
             TableView tableView,
-            PersistentPageMemory pageMemory,
+            DataRegion<PersistentPageMemory> dataRegion,
+            CheckpointManager checkpointManager,
+            PartitionMeta meta,
             VersionChainFreeList versionChainFreeList,
             RowVersionFreeList rowVersionFreeList,
-            VersionChainTree versionChainTree,
-            CheckpointTimeoutLock checkpointTimeoutLock
+            VersionChainTree versionChainTree
     ) {
-        super(partId, tableView, pageMemory, versionChainFreeList, rowVersionFreeList, versionChainTree);
+        super(partitionId, tableView, dataRegion.pageMemory(), versionChainFreeList, rowVersionFreeList, versionChainTree);
+
+        this.tableStorage = tableStorage;
+
+        this.checkpointManager = checkpointManager;
+        checkpointTimeoutLock = checkpointManager.checkpointTimeoutLock();
 
-        this.checkpointTimeoutLock = checkpointTimeoutLock;
+        checkpointManager.addCheckpointListener(checkpointListener, dataRegion);
+
+        this.meta = meta;
     }
 
     /** {@inheritDoc} */
@@ -66,4 +109,53 @@ public class PersistentPageMemoryMvPartitionStorage extends AbstractPageMemoryMv
             checkpointTimeoutLock.checkpointReadUnlock();
         }
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> flush() {
+        CheckpointProgress lastCheckpoint = checkpointManager.lastCheckpointProgress();
+
+        CheckpointProgress scheduledCheckpoint;
+
+        if (lastCheckpoint != null && meta.metaSnapshot(lastCheckpoint.id()).lastAppliedIndex() == meta.lastAppliedIndex()) {
+            scheduledCheckpoint = lastCheckpoint;
+        } else {
+            PersistentPageMemoryStorageEngineView engineCfg = tableStorage.engine().configuration().value();
+
+            int checkpointDelayMillis = engineCfg.checkpoint().checkpointDelayMillis();
+            scheduledCheckpoint = checkpointManager.scheduleCheckpoint(checkpointDelayMillis, "Triggered by replicator");
+        }
+
+        return scheduledCheckpoint.futureFor(CheckpointState.FINISHED).thenApply(res -> null);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long lastAppliedIndex() {
+        return meta.lastAppliedIndex();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void lastAppliedIndex(long lastAppliedIndex) throws StorageException {
+        assert checkpointTimeoutLock.checkpointLockIsHeldByThread();
+
+        CheckpointProgress lastCheckpoint = checkpointManager.lastCheckpointProgress();
+
+        UUID lastCheckpointId = lastCheckpoint == null ? null : lastCheckpoint.id();
+
+        meta.lastAppliedIndex(lastCheckpointId, lastAppliedIndex);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long persistedIndex() {
+        return persistedIndex;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close() {
+        checkpointManager.removeCheckpointListener(checkpointListener);
+    }
 }
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
index c076d1ccd..ef49c7989 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/VolatilePageMemoryMvPartitionStorage.java
@@ -17,15 +17,22 @@
 
 package org.apache.ignite.internal.storage.pagememory.mv;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.internal.pagememory.inmemory.VolatilePageMemory;
 import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.StorageException;
 
 /**
  * Implementation of {@link MvPartitionStorage} based on a {@link BplusTree} for in-memory case.
  */
 public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPartitionStorage {
+    /**
+     * Last applied index value.
+     */
+    private volatile long lastAppliedIndex;
+
     /**
      * Constructor.
      *
@@ -46,4 +53,34 @@ public class VolatilePageMemoryMvPartitionStorage extends AbstractPageMemoryMvPa
     ) {
         super(partId, tableView, pageMemory, versionChainFreeList, rowVersionFreeList, versionChainTree);
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public <V> V runConsistently(WriteClosure<V> closure) throws StorageException {
+        return closure.execute();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> flush() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long lastAppliedIndex() {
+        return lastAppliedIndex;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void lastAppliedIndex(long lastAppliedIndex) throws StorageException {
+        this.lastAppliedIndex = lastAppliedIndex;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long persistedIndex() {
+        return lastAppliedIndex;
+    }
 }
diff --git a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
index 23f677003..3058d4670 100644
--- a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
+++ b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
@@ -45,7 +45,10 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 class PersistentPageMemoryMvPartitionStorageTest extends AbstractPageMemoryMvPartitionStorageTest {
-    @InjectConfiguration(polymorphicExtensions = UnsafeMemoryAllocatorConfigurationSchema.class)
+    @InjectConfiguration(
+            value = "mock.checkpoint.checkpointDelayMillis = 0",
+            polymorphicExtensions = UnsafeMemoryAllocatorConfigurationSchema.class
+    )
     private PersistentPageMemoryStorageEngineConfiguration engineConfig;
 
     @InjectConfiguration(
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/configuration/schema/RocksDbStorageEngineConfigurationSchema.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/configuration/schema/RocksDbStorageEngineConfigurationSchema.java
index 5e1c75b48..34cc74084 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/configuration/schema/RocksDbStorageEngineConfigurationSchema.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/configuration/schema/RocksDbStorageEngineConfigurationSchema.java
@@ -25,6 +25,7 @@ import org.apache.ignite.configuration.annotation.Name;
 import org.apache.ignite.configuration.annotation.NamedConfigValue;
 import org.apache.ignite.configuration.annotation.Value;
 import org.apache.ignite.configuration.validation.ExceptKeys;
+import org.apache.ignite.configuration.validation.Range;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
 
 /**
@@ -35,6 +36,8 @@ public class RocksDbStorageEngineConfigurationSchema {
     /** Name of the default data region. */
     public static final String DEFAULT_DATA_REGION_NAME = "default";
 
+    /** Delay before executing a flush triggered by RAFT. */
+    @Range(min = 0)
     @Value(hasDefault = true)
     public int flushDelayMillis = 100;
 
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index 0291ea553..b1ec8a7bc 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -73,7 +73,10 @@ public class RocksDbMvPartitionStorageTest extends AbstractMvPartitionStorageTes
 
         assertThat(((RocksDbDataStorageView) tableCfg.dataStorage().value()).dataRegion(), equalTo(DEFAULT_DATA_REGION_NAME));
 
-        engineConfig.defaultRegion().change(c -> c.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024)).get(1, SECONDS);
+        engineConfig.change(cfg -> cfg
+                .changeFlushDelayMillis(0)
+                .changeDefaultRegion(c -> c.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024))
+        ).get(1, SECONDS);
 
         engine = new RocksDbStorageEngine(engineConfig, workDir);