You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/08/25 07:18:39 UTC

[ignite-3] branch main updated: IGNITE-15298 RAFT snapshots implementation for persistent partitions storage - Fixes #285.

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e73d124  IGNITE-15298 RAFT snapshots implementation for persistent partitions storage - Fixes #285.
e73d124 is described below

commit e73d1241464dc07b506eb2c088c739b516101d1e
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Aug 25 10:17:51 2021 +0300

    IGNITE-15298 RAFT snapshots implementation for persistent partitions storage - Fixes #285.
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../ITMetaStorageServicePersistenceTest.java       | 391 +++------------------
 modules/metastorage-server/pom.xml                 |   4 +-
 .../server/persistence/RocksDBKeyValueStorage.java |  55 +--
 .../server/persistence/RocksStorageUtils.java      |  86 -----
 .../server/persistence/WatchCursor.java            |   5 +-
 .../raft/client/service/RaftGroupListener.java     |   2 +-
 modules/raft/pom.xml                               |   6 +
 .../service/ITAbstractListenerSnapshotTest.java}   | 237 +++++--------
 .../{storage-rocksdb => rocksdb-common}/pom.xml    |  31 +-
 .../ignite/internal/rocksdb}/ColumnFamily.java     |  55 ++-
 .../ignite/internal/rocksdb/RocksBiConsumer.java   |  35 ++
 .../ignite/internal/rocksdb/RocksBiPredicate.java  |  36 ++
 .../apache/ignite/internal/rocksdb/RocksUtils.java | 119 +++++++
 .../ignite/internal/storage/InvokeClosure.java     |   2 +-
 .../apache/ignite/internal/storage/Storage.java    |  19 +
 .../storage/basic/ConcurrentHashMapStorage.java    |  12 +
 modules/storage-rocksdb/pom.xml                    |   5 +-
 .../internal/storage/rocksdb/RocksDbStorage.java   | 178 ++++++++--
 modules/table/pom.xml                              |   7 +
 .../ignite/distributed/ITTablePersistenceTest.java | 156 ++++++++
 .../table/distributed/raft/PartitionListener.java  |  17 +-
 parent/pom.xml                                     |   6 +
 pom.xml                                            |   1 +
 23 files changed, 776 insertions(+), 689 deletions(-)

diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
index fc5632b..f1aa1a4 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
@@ -17,276 +17,112 @@
 
 package org.apache.ignite.internal.metastorage.client;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.persistence.RocksDBKeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
-import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl.DelegatingStateMachine;
-import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.ClusterServiceFactory;
-import org.apache.ignite.network.MessageSerializationRegistryImpl;
-import org.apache.ignite.network.NetworkAddress;
-import org.apache.ignite.network.StaticNodeFinder;
-import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
-import org.apache.ignite.network.serialization.MessageSerializationRegistry;
-import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.ITAbstractListenerSnapshotTest;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
 import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
-import org.apache.ignite.utils.ClusterServiceTestUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
 
-import static java.lang.Thread.sleep;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
- * Persistent (rocksdb-based) meta storage client tests.
+ * Persistent (rocksdb-based) meta storage raft group snapshots tests.
  */
 @ExtendWith(WorkDirectoryExtension.class)
-public class ITMetaStorageServicePersistenceTest {
-    /** Starting server port. */
-    private static final int PORT = 5003;
-
-    /** Starting client port. */
-    private static final int CLIENT_PORT = 6003;
-
-    /**
-     * Peers list.
-     */
-    private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
-        .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
-        .map(Peer::new)
-        .collect(Collectors.toUnmodifiableList());
-
+public class ITMetaStorageServicePersistenceTest extends ITAbstractListenerSnapshotTest<MetaStorageListener> {
     /** */
-    private static final String METASTORAGE_RAFT_GROUP_NAME = "METASTORAGE_RAFT_GROUP";
-
-    /** Factory. */
-    private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
-
-    /** Network factory. */
-    private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
+    private static final ByteArray FIRST_KEY = ByteArray.fromString("first");
 
     /** */
-    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
+    private static final byte[] FIRST_VALUE = "firstValue".getBytes(StandardCharsets.UTF_8);
 
     /** */
-    @WorkDirectory
-    private Path workDir;
-
-    /** Cluster. */
-    private final List<ClusterService> cluster = new ArrayList<>();
-
-    /** Servers. */
-    private final List<JRaftServerImpl> servers = new ArrayList<>();
-
-    /** Clients. */
-    private final List<RaftGroupService> clients = new ArrayList<>();
-
-    /**
-     * Shutdown raft server and stop all cluster nodes.
-     *
-     * @throws Exception If failed to shutdown raft server,
-     */
-    @AfterEach
-    public void afterTest() throws Exception {
-        for (RaftGroupService client : clients)
-            client.shutdown();
-
-        for (JRaftServerImpl server : servers)
-            server.stop();
-
-        for (ClusterService service : cluster)
-            service.stop();
-    }
-
-    /**
-     * Test parameters for {@link #testSnapshot}.
-     */
-    private static class TestData {
-        /** Delete raft group folder. */
-        private final boolean deleteFolder;
-
-        /** Write to meta storage after a snapshot. */
-        private final boolean writeAfterSnapshot;
-
-        /** */
-        private TestData(boolean deleteFolder, boolean writeAfterSnapshot) {
-            this.deleteFolder = deleteFolder;
-            this.writeAfterSnapshot = writeAfterSnapshot;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return String.format("deleteFolder=%s, writeAfterSnapshot=%s", deleteFolder, writeAfterSnapshot);
-        }
-    }
-
-    /**
-     * @return {@link #testSnapshot} parameters.
-     */
-    private static List<TestData> testSnapshotData() {
-        return List.of(
-            new TestData(false, false),
-            new TestData(true, true),
-            new TestData(false, true),
-            new TestData(true, false)
-        );
-    }
+    private static final ByteArray SECOND_KEY = ByteArray.fromString("second");
 
-    /**
-     * Tests that a joining raft node successfully restores a snapshot.
-     *
-     * @param testData Test parameters.
-     * @throws Exception If failed.
-     */
-    @ParameterizedTest
-    @MethodSource("testSnapshotData")
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-15298")
-    public void testSnapshot(TestData testData) throws Exception {
-        ByteArray firstKey = ByteArray.fromString("first");
-        byte[] firstValue = "firstValue".getBytes(StandardCharsets.UTF_8);
+    /** */
+    private static final byte[] SECOND_VALUE = "secondValue".getBytes(StandardCharsets.UTF_8);
 
-        // Setup a metastorage raft service
-        RaftGroupService metaStorageSvc = prepareMetaStorage();
+    /** */
+    private MetaStorageServiceImpl metaStorage;
 
-        MetaStorageServiceImpl metaStorage = new MetaStorageServiceImpl(metaStorageSvc, null);
+    /** {@inheritDoc} */
+    @Override public void beforeFollowerStop(RaftGroupService service) throws Exception {
+        metaStorage = new MetaStorageServiceImpl(service, null);
 
         // Put some data in the metastorage
-        metaStorage.put(firstKey, firstValue).get();
+        metaStorage.put(FIRST_KEY, FIRST_VALUE).get();
 
         // Check that data has been written successfully
-        check(metaStorage, new EntryImpl(firstKey, firstValue, 1, 1));;
-
-        // Select any node that is not the leader of the group
-        JRaftServerImpl toStop = servers.stream()
-            .filter(server -> !server.localPeer(METASTORAGE_RAFT_GROUP_NAME).equals(metaStorageSvc.leader()))
-            .findAny()
-            .orElseThrow();
-
-        // Get the path to that node's raft directory
-        Path serverDataPath = toStop.getServerDataPath(METASTORAGE_RAFT_GROUP_NAME);
-
-        // Get the path to that node's RocksDB key-value storage
-        Path dbPath = getStorage(toStop).getDbPath();
-
-        int stopIdx = servers.indexOf(toStop);
-
-        // Remove that node from the list of servers
-        servers.remove(stopIdx);
-
-        // Shutdown that node
-        toStop.stop();
-
-        // Create a raft snapshot of the metastorage service
-        metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
+        check(metaStorage, new EntryImpl(FIRST_KEY, FIRST_VALUE, 1, 1));;
+    }
 
+    /** {@inheritDoc} */
+    @Override public void afterFollowerStop(RaftGroupService service) throws Exception {
         // Remove the first key from the metastorage
-        metaStorage.remove(firstKey).get();
+        metaStorage.remove(FIRST_KEY).get();
 
         // Check that data has been removed
-        check(metaStorage, new EntryImpl(firstKey, null, 2, 2));
+        check(metaStorage, new EntryImpl(FIRST_KEY, null, 2, 2));
 
         // Put same data again
-        metaStorage.put(firstKey, firstValue).get();
+        metaStorage.put(FIRST_KEY, FIRST_VALUE).get();
 
         // Check that it has been written
-        check(metaStorage, new EntryImpl(firstKey, firstValue, 3, 3));
-
-        // Create another raft snapshot
-        metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
-
-        byte[] lastKey = firstKey.bytes();
-        byte[] lastValue = firstValue;
-
-        if (testData.deleteFolder) {
-            // Delete stopped node's raft directory and key-value storage directory
-            // to check if snapshot could be restored by the restarted node
-            IgniteUtils.deleteIfExists(dbPath);
-            IgniteUtils.deleteIfExists(serverDataPath);
-        }
-
-        if (testData.writeAfterSnapshot) {
-            // Put new data after the second snapshot to check if after
-            // the snapshot restore operation restarted node will receive it
-            ByteArray secondKey = ByteArray.fromString("second");
-            byte[] secondValue = "secondValue".getBytes(StandardCharsets.UTF_8);
-
-            metaStorage.put(secondKey, secondValue).get();
-
-            lastKey = secondKey.bytes();
-            lastValue = secondValue;
-        }
-
-        // Restart the node
-        JRaftServerImpl restarted = startServer(stopIdx, new RocksDBKeyValueStorage(dbPath));
+        check(metaStorage, new EntryImpl(FIRST_KEY, FIRST_VALUE, 3, 3));
+    }
 
-        assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
+    /** {@inheritDoc} */
+    @Override public void afterSnapshot(RaftGroupService service) throws Exception {
+        metaStorage.put(SECOND_KEY, SECOND_VALUE).get();
+    }
 
-        KeyValueStorage storage = getStorage(restarted);
+    /** {@inheritDoc} */
+    @Override public BooleanSupplier snapshotCheckClosure(JRaftServerImpl restarted, boolean interactedAfterSnapshot) {
+        KeyValueStorage storage = getListener(restarted, raftGroupId()).getStorage();
 
-        byte[] finalLastKey = lastKey;
+        byte[] lastKey = interactedAfterSnapshot ? SECOND_KEY.bytes() : FIRST_KEY.bytes();
+        byte[] lastValue = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
 
-        int expectedRevision = testData.writeAfterSnapshot ? 4 : 3;
-        int expectedUpdateCounter = testData.writeAfterSnapshot ? 4 : 3;
+        int expectedRevision = interactedAfterSnapshot ? 4 : 3;
+        int expectedUpdateCounter = interactedAfterSnapshot ? 4 : 3;
 
         EntryImpl expectedLastEntry = new EntryImpl(new ByteArray(lastKey), lastValue, expectedRevision, expectedUpdateCounter);
 
-        // Wait until the snapshot is restored
-        boolean success = waitForCondition(() -> {
-            org.apache.ignite.internal.metastorage.server.Entry e = storage.get(finalLastKey);
+        return () -> {
+            org.apache.ignite.internal.metastorage.server.Entry e = storage.get(lastKey);
             return e.empty() == expectedLastEntry.empty()
                 && e.tombstone() == expectedLastEntry.tombstone()
                 && e.revision() == expectedLastEntry.revision()
                 && e.updateCounter() == expectedLastEntry.revision()
                 && Arrays.equals(e.key(), expectedLastEntry.key().bytes())
                 && Arrays.equals(e.value(), expectedLastEntry.value());
-        }, 3_000);
-
-        assertTrue(success);
-
-        // Check that the last value has been written successfully
-        check(metaStorage, expectedLastEntry);
+        };
     }
 
-    /**
-     * Get the meta store's key-value storage of the jraft server.
-     *
-     * @param server Server.
-     * @return Meta store's key value storage.
-     */
-    private static RocksDBKeyValueStorage getStorage(JRaftServerImpl server) {
-        org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(METASTORAGE_RAFT_GROUP_NAME);
-
-        DelegatingStateMachine fsm = (DelegatingStateMachine) svc.getRaftNode().getOptions().getFsm();
-
-        MetaStorageListener listener = (MetaStorageListener) fsm.getListener();
+    /** {@inheritDoc} */
+    @Override public Path getListenerPersistencePath(MetaStorageListener listener) {
+        return ((RocksDBKeyValueStorage) listener.getStorage()).getDbPath();
+    }
 
-        KeyValueStorage storage = listener.getStorage();
+    /** {@inheritDoc} */
+    @Override public RaftGroupListener createListener(Path workDir) {
+        return new MetaStorageListener(new RocksDBKeyValueStorage(workDir.resolve(UUID.randomUUID().toString())));
+    }
 
-        return (RocksDBKeyValueStorage) storage;
+    /** {@inheritDoc} */
+    @Override public String raftGroupId() {
+        return "metastorage";
     }
 
     /**
@@ -303,133 +139,4 @@ public class ITMetaStorageServicePersistenceTest {
 
         assertEquals(expected, entry);
     }
-
-    /** */
-    @SuppressWarnings("BusyWait") private static boolean waitForCondition(BooleanSupplier cond, long timeout) {
-        long stop = System.currentTimeMillis() + timeout;
-
-        while (System.currentTimeMillis() < stop) {
-            if (cond.getAsBoolean())
-                return true;
-
-            try {
-                sleep(50);
-            }
-            catch (InterruptedException e) {
-                return false;
-            }
-        }
-
-        return false;
-    }
-
-    /**
-     * @param cluster The cluster.
-     * @param exp Expected count.
-     * @param timeout The timeout in millis.
-     * @return {@code True} if topology size is equal to expected.
-     */
-    private boolean waitForTopology(ClusterService cluster, int exp, int timeout) {
-        return waitForCondition(() -> cluster.topologyService().allMembers().size() >= exp, timeout);
-    }
-
-    /**
-     * @return Local address.
-     */
-    private static String getLocalAddress() {
-        try {
-            return InetAddress.getLocalHost().getHostAddress();
-        }
-        catch (UnknownHostException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Creates a cluster service.
-     */
-    private ClusterService clusterService(String name, int port, NetworkAddress otherPeer) {
-        var nodeFinder = new StaticNodeFinder(List.of(otherPeer));
-
-        var network = ClusterServiceTestUtils.clusterService(
-            name,
-            port,
-            nodeFinder,
-            SERIALIZATION_REGISTRY,
-            NETWORK_FACTORY
-        );
-
-        network.start();
-
-        cluster.add(network);
-
-        return network;
-    }
-
-    /**
-     * Starts a raft server.
-     *
-     * @param idx Server index (affects port of the server).
-     * @param storage KeyValueStorage for the MetaStorage.
-     * @return Server.
-     */
-    private JRaftServerImpl startServer(int idx, KeyValueStorage storage) {
-        var addr = new NetworkAddress(getLocalAddress(), PORT);
-
-        ClusterService service = clusterService("server" + idx, PORT + idx, addr);
-
-        Path jraft = workDir.resolve("jraft" + idx);
-
-        JRaftServerImpl server = new JRaftServerImpl(service, jraft) {
-            @Override public void stop() {
-                super.stop();
-
-                service.stop();
-            }
-        };
-
-        server.start();
-
-        server.startRaftGroup(
-            METASTORAGE_RAFT_GROUP_NAME,
-            new MetaStorageListener(storage),
-            INITIAL_CONF
-        );
-
-        servers.add(server);
-
-        return server;
-    }
-
-    /**
-     * Prepares meta storage by instantiating corresponding raft server with {@link MetaStorageListener} and
-     * a client.
-     *
-     * @return Meta storage raft group service instance.
-     * @throws Exception If failed.
-     */
-    private RaftGroupService prepareMetaStorage() throws Exception {
-        for (int i = 0; i < INITIAL_CONF.size(); i++)
-            startServer(i, new RocksDBKeyValueStorage(workDir.resolve(UUID.randomUUID().toString())));
-
-        assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
-
-        return startClient(METASTORAGE_RAFT_GROUP_NAME, new NetworkAddress(getLocalAddress(), PORT));
-    }
-
-    /**
-     * Starts a client with a specific address.
-     *
-     * @throws Exception If failed.
-     */
-    private RaftGroupService startClient(String groupId, NetworkAddress addr) throws Exception {
-        ClusterService clientNode = clusterService("client_" + groupId + "_", CLIENT_PORT + clients.size(), addr);
-
-        RaftGroupService client = RaftGroupServiceImpl.start(groupId, clientNode, FACTORY, 10_000,
-            List.of(new Peer(addr)), false, 200).get(3, TimeUnit.SECONDS);
-
-        clients.add(client);
-
-        return client;
-    }
 }
diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml
index 07bb1df..d8a5c76 100644
--- a/modules/metastorage-server/pom.xml
+++ b/modules/metastorage-server/pom.xml
@@ -44,8 +44,8 @@
         </dependency>
 
         <dependency>
-            <groupId>org.rocksdb</groupId>
-            <artifactId>rocksdbjni</artifactId>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-rocksdb-common</artifactId>
         </dependency>
 
         <dependency>
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java
index 507307d..8aea029 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java
@@ -42,6 +42,8 @@ import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.Operation;
 import org.apache.ignite.internal.metastorage.server.Value;
 import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksBiPredicate;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -53,7 +55,6 @@ import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
-import org.rocksdb.EnvOptions;
 import org.rocksdb.IngestExternalFileOptions;
 import org.rocksdb.Options;
 import org.rocksdb.ReadOptions;
@@ -61,7 +62,6 @@ import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 import org.rocksdb.Snapshot;
-import org.rocksdb.SstFileWriter;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 
@@ -69,14 +69,15 @@ import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
 import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.appendLong;
 import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToLong;
 import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToValue;
-import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.find;
-import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.forEach;
 import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.getAsLongs;
 import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.keyToRocksKey;
 import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longToBytes;
 import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes;
 import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA;
 import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.createSstFile;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.find;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.forEach;
 import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
 
 /**
@@ -187,9 +188,9 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
 
             this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString(), descriptors, handles);
 
-            data = new ColumnFamily(db, handles.get(0), DATA, dataFamilyOptions, dataOptions);
+            data = new ColumnFamily(db, handles.get(0), DATA.name(), dataFamilyOptions, dataOptions);
 
-            index = new ColumnFamily(db, handles.get(1), INDEX, indexFamilyOptions, indexOptions);
+            index = new ColumnFamily(db, handles.get(1), INDEX.name(), indexFamilyOptions, indexOptions);
         }
         catch (Exception e) {
             try {
@@ -246,8 +247,11 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
             }
         }, snapshotExecutor).thenCompose(aVoid ->
             // Create futures for capturing SST snapshots of the column families
-            CompletableFuture.allOf(createSstFile(data, snapshot, tempPath), createSstFile(index, snapshot, tempPath)))
-        .whenComplete((aVoid, throwable) -> {
+            CompletableFuture.allOf(
+                CompletableFuture.runAsync(() -> createSstFile(data, snapshot, tempPath), snapshotExecutor),
+                CompletableFuture.runAsync(() -> createSstFile(index, snapshot, tempPath), snapshotExecutor)
+            )
+        ).whenComplete((aVoid, throwable) -> {
             // Release a snapshot
             db.releaseSnapshot(snapshot);
 
@@ -271,39 +275,6 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
         });
     }
 
-    /**
-     * Create an SST file for the column family.
-     *
-     * @param columnFamily Column family.
-     * @param snapshot Point-in-time snapshot.
-     * @param path Directory to put the SST file in.
-     * @return Future representing pending completion of the operation.
-     */
-    private CompletableFuture<Void> createSstFile(ColumnFamily columnFamily, Snapshot snapshot, Path path) {
-        return CompletableFuture.runAsync(() -> {
-            try (
-                EnvOptions envOptions = new EnvOptions();
-                Options options = new Options();
-                ReadOptions readOptions = new ReadOptions().setSnapshot(snapshot);
-                RocksIterator it = columnFamily.newIterator(readOptions);
-                SstFileWriter sstFileWriter = new SstFileWriter(envOptions, options)
-            ) {
-                Path sstFile = path.resolve(columnFamily.name());
-
-                sstFileWriter.open(sstFile.toString());
-
-                it.seekToFirst();
-
-                forEach(it, sstFileWriter::put);
-
-                sstFileWriter.finish();
-            }
-            catch (Throwable t) {
-                throw new IgniteInternalException("Failed to write snapshot: " + t.getMessage(), t);
-            }
-        }, snapshotExecutor);
-    }
-
     /** {@inheritDoc} */
     @Override public void restoreSnapshot(Path path) {
         rwLock.writeLock().lock();
@@ -1034,7 +1005,7 @@ public class RocksDBKeyValueStorage implements KeyValueStorage {
         try (RocksIterator iterator = index.newIterator()) {
             iterator.seek(key);
 
-            RocksStorageUtils.RocksBiPredicate predicate = strictlyHigher ?
+            RocksBiPredicate predicate = strictlyHigher ?
                 (k, v) -> CMP.compare(k, key) > 0 :
                 (k, v) -> CMP.compare(k, key) >= 0;
 
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
index 54d13d7..0374ef1 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
@@ -23,11 +23,8 @@ import java.nio.ByteOrder;
 import java.util.Arrays;
 import java.util.stream.IntStream;
 import org.apache.ignite.internal.metastorage.server.Value;
-import org.apache.ignite.lang.IgniteInternalException;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
 
 import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
 
@@ -181,87 +178,4 @@ class RocksStorageUtils {
 
         return result;
     }
-
-    /**
-     * Iterates over the given iterator passing key-value pairs to the given consumer and
-     * checks the iterator's status afterwards.
-     *
-     * @param iterator Iterator.
-     * @param consumer Consumer of key-value pairs.
-     * @throws RocksDBException If failed.
-     */
-    static void forEach(RocksIterator iterator, RocksBiConsumer consumer) throws RocksDBException {
-        for (; iterator.isValid(); iterator.next())
-            consumer.accept(iterator.key(), iterator.value());
-
-        checkIterator(iterator);
-    }
-
-    /**
-     * Iterates over the given iterator testing key-value pairs with the given predicate and checks
-     * the iterator's status afterwards.
-     *
-     * @param iterator Iterator.
-     * @param consumer Consumer of key-value pairs.
-     * @return {@code true} if a matching key-value pair has been found, {@code false} otherwise.
-     * @throws RocksDBException If failed.
-     */
-    static boolean find(RocksIterator iterator, RocksBiPredicate consumer) throws RocksDBException {
-        for (; iterator.isValid(); iterator.next()) {
-            boolean result = consumer.test(iterator.key(), iterator.value());
-
-            if (result)
-                return true;
-        }
-
-        checkIterator(iterator);
-
-        return false;
-    }
-
-    /**
-     * Checks the status of the iterator and throws an exception if it is not correct.
-     *
-     * @param it RocksDB iterator.
-     * @throws IgniteInternalException if the iterator has an incorrect status.
-     */
-    static void checkIterator(RocksIterator it) {
-        try {
-            it.status();
-        }
-        catch (RocksDBException e) {
-            throw new IgniteInternalException(e);
-        }
-    }
-
-    /**
-     * BiConsumer that can throw {@link RocksDBException}.
-     */
-    @FunctionalInterface
-    interface RocksBiConsumer {
-        /**
-         * Accepts the key and the value of the entry.
-         *
-         * @param key Key.
-         * @param value Value.
-         * @throws RocksDBException If failed to process the key-value pair.
-         */
-        void accept(byte[] key, byte[] value) throws RocksDBException;
-    }
-
-    /**
-     * BiPredicate that can throw {@link RocksDBException}.
-     */
-    @FunctionalInterface
-    interface RocksBiPredicate {
-        /**
-         * Evaluates the predicate on the given key and the given value.
-         *
-         * @param key Key.
-         * @param value Value.
-         * @return {@code true} if the input argument matches the predicate, otherwise {@code false}.
-         * @throws RocksDBException If failed to test the key-value pair.
-         */
-        boolean test(byte[] key, byte[] value) throws RocksDBException;
-    }
 }
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
index 90ff5f6..54a790f 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.metastorage.server.Entry;
 import org.apache.ignite.internal.metastorage.server.EntryEvent;
 import org.apache.ignite.internal.metastorage.server.Value;
 import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -35,9 +36,9 @@ import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
 
 import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToValue;
-import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.checkIterator;
 import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longToBytes;
 import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.rocksKeyToBytes;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.checkIterator;
 
 /**
  * Subscription on updates of entries corresponding to the given keys range (where the upper bound is unlimited)
@@ -185,7 +186,7 @@ class WatchCursor implements Cursor<WatchEvent> {
                         List<EntryEvent> evts = new ArrayList<>();
 
                         // Iterate over the keys of the current revision and get all matching entries.
-                        RocksStorageUtils.forEach(nativeIterator, (k, v) -> {
+                        RocksUtils.forEach(nativeIterator, (k, v) -> {
                             ref.noItemsInRevision = false;
 
                             byte[] key = rocksKeyToBytes(k);
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
index e186fdf..64f8edc 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
@@ -57,7 +57,7 @@ public interface RaftGroupListener {
      * @param doneClo The closure to call on finish. Pass the not null exception if the snapshot has not been created or
      *                null on successful creation.
      */
-    public void onSnapshotSave(Path path, Consumer<Throwable> doneClo);
+    void onSnapshotSave(Path path, Consumer<Throwable> doneClo);
 
     /**
      * The callback to load a snapshot.
diff --git a/modules/raft/pom.xml b/modules/raft/pom.xml
index 73ad792..1682f5e 100644
--- a/modules/raft/pom.xml
+++ b/modules/raft/pom.xml
@@ -84,6 +84,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-junit-jupiter</artifactId>
             <scope>test</scope>
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
similarity index 55%
copy from modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
copy to modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
index fc5632b..0f55992 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ITAbstractListenerSnapshotTest.java
@@ -15,30 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.client;
+package org.apache.ignite.raft.client.service;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
-import org.apache.ignite.internal.metastorage.server.persistence.RocksDBKeyValueStorage;
-import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
 import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
-import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl.DelegatingStateMachine;
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.ClusterServiceFactory;
 import org.apache.ignite.network.MessageSerializationRegistryImpl;
@@ -48,24 +39,23 @@ import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
-import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import static java.lang.Thread.sleep;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
- * Persistent (rocksdb-based) meta storage client tests.
+ * Base class for persistent raft group's snapshots tests.
+ *
+ * @param <T> Type of the raft group listener.
  */
 @ExtendWith(WorkDirectoryExtension.class)
-public class ITMetaStorageServicePersistenceTest {
+public abstract class ITAbstractListenerSnapshotTest<T extends RaftGroupListener> {
     /** Starting server port. */
     private static final int PORT = 5003;
 
@@ -80,9 +70,6 @@ public class ITMetaStorageServicePersistenceTest {
         .map(Peer::new)
         .collect(Collectors.toUnmodifiableList());
 
-    /** */
-    private static final String METASTORAGE_RAFT_GROUP_NAME = "METASTORAGE_RAFT_GROUP";
-
     /** Factory. */
     private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
 
@@ -126,21 +113,27 @@ public class ITMetaStorageServicePersistenceTest {
      * Test parameters for {@link #testSnapshot}.
      */
     private static class TestData {
-        /** Delete raft group folder. */
+        /**
+         * {@code true} if the raft group's persistence must be cleared before the follower's restart,
+         * {@code false} otherwise.
+         */
         private final boolean deleteFolder;
 
-        /** Write to meta storage after a snapshot. */
-        private final boolean writeAfterSnapshot;
+        /**
+         * {@code true} if test should interact with the raft group after a snapshot has been captured.
+         * In this case, the follower node should catch up with the leader using raft log.
+         */
+        private final boolean interactAfterSnapshot;
 
         /** */
-        private TestData(boolean deleteFolder, boolean writeAfterSnapshot) {
+        private TestData(boolean deleteFolder, boolean interactAfterSnapshot) {
             this.deleteFolder = deleteFolder;
-            this.writeAfterSnapshot = writeAfterSnapshot;
+            this.interactAfterSnapshot = interactAfterSnapshot;
         }
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return String.format("deleteFolder=%s, writeAfterSnapshot=%s", deleteFolder, writeAfterSnapshot);
+            return String.format("deleteFolder=%s, interactAfterSnapshot=%s", deleteFolder, interactAfterSnapshot);
         }
     }
 
@@ -164,33 +157,23 @@ public class ITMetaStorageServicePersistenceTest {
      */
     @ParameterizedTest
     @MethodSource("testSnapshotData")
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-15298")
     public void testSnapshot(TestData testData) throws Exception {
-        ByteArray firstKey = ByteArray.fromString("first");
-        byte[] firstValue = "firstValue".getBytes(StandardCharsets.UTF_8);
-
-        // Setup a metastorage raft service
-        RaftGroupService metaStorageSvc = prepareMetaStorage();
-
-        MetaStorageServiceImpl metaStorage = new MetaStorageServiceImpl(metaStorageSvc, null);
-
-        // Put some data in the metastorage
-        metaStorage.put(firstKey, firstValue).get();
+        // Set up a raft group service
+        RaftGroupService service = prepareRaftGroup();
 
-        // Check that data has been written successfully
-        check(metaStorage, new EntryImpl(firstKey, firstValue, 1, 1));;
+        beforeFollowerStop(service);
 
         // Select any node that is not the leader of the group
         JRaftServerImpl toStop = servers.stream()
-            .filter(server -> !server.localPeer(METASTORAGE_RAFT_GROUP_NAME).equals(metaStorageSvc.leader()))
+            .filter(server -> !server.localPeer(raftGroupId()).equals(service.leader()))
             .findAny()
             .orElseThrow();
 
         // Get the path to that node's raft directory
-        Path serverDataPath = toStop.getServerDataPath(METASTORAGE_RAFT_GROUP_NAME);
+        Path serverDataPath = toStop.getServerDataPath(raftGroupId());
 
         // Get the path to that node's RocksDB key-value storage
-        Path dbPath = getStorage(toStop).getDbPath();
+        Path dbPath = getListenerPersistencePath(getListener(toStop, raftGroupId()));
 
         int stopIdx = servers.indexOf(toStop);
 
@@ -200,127 +183,106 @@ public class ITMetaStorageServicePersistenceTest {
         // Shutdown that node
         toStop.stop();
 
-        // Create a raft snapshot of the metastorage service
-        metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
+        // Create a snapshot of the raft group
+        service.snapshot(service.leader()).get();
 
-        // Remove the first key from the metastorage
-        metaStorage.remove(firstKey).get();
-
-        // Check that data has been removed
-        check(metaStorage, new EntryImpl(firstKey, null, 2, 2));
-
-        // Put same data again
-        metaStorage.put(firstKey, firstValue).get();
-
-        // Check that it has been written
-        check(metaStorage, new EntryImpl(firstKey, firstValue, 3, 3));
+        afterFollowerStop(service);
 
         // Create another raft snapshot
-        metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
-
-        byte[] lastKey = firstKey.bytes();
-        byte[] lastValue = firstValue;
+        service.snapshot(service.leader()).get();
 
         if (testData.deleteFolder) {
-            // Delete stopped node's raft directory and key-value storage directory
+            // Delete a stopped node's raft directory and key-value storage directory
             // to check if snapshot could be restored by the restarted node
             IgniteUtils.deleteIfExists(dbPath);
             IgniteUtils.deleteIfExists(serverDataPath);
         }
 
-        if (testData.writeAfterSnapshot) {
-            // Put new data after the second snapshot to check if after
-            // the snapshot restore operation restarted node will receive it
-            ByteArray secondKey = ByteArray.fromString("second");
-            byte[] secondValue = "secondValue".getBytes(StandardCharsets.UTF_8);
-
-            metaStorage.put(secondKey, secondValue).get();
-
-            lastKey = secondKey.bytes();
-            lastValue = secondValue;
+        if (testData.interactAfterSnapshot) {
+            // Interact with the raft group after the second snapshot to check if the restarted node would see these
+            // interactions after restoring a snapshot and raft logs
+            afterSnapshot(service);
         }
 
         // Restart the node
-        JRaftServerImpl restarted = startServer(stopIdx, new RocksDBKeyValueStorage(dbPath));
+        JRaftServerImpl restarted = startServer(stopIdx);
 
         assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
 
-        KeyValueStorage storage = getStorage(restarted);
+        BooleanSupplier closure = snapshotCheckClosure(restarted, testData.interactAfterSnapshot);
 
-        byte[] finalLastKey = lastKey;
-
-        int expectedRevision = testData.writeAfterSnapshot ? 4 : 3;
-        int expectedUpdateCounter = testData.writeAfterSnapshot ? 4 : 3;
-
-        EntryImpl expectedLastEntry = new EntryImpl(new ByteArray(lastKey), lastValue, expectedRevision, expectedUpdateCounter);
-
-        // Wait until the snapshot is restored
-        boolean success = waitForCondition(() -> {
-            org.apache.ignite.internal.metastorage.server.Entry e = storage.get(finalLastKey);
-            return e.empty() == expectedLastEntry.empty()
-                && e.tombstone() == expectedLastEntry.tombstone()
-                && e.revision() == expectedLastEntry.revision()
-                && e.updateCounter() == expectedLastEntry.revision()
-                && Arrays.equals(e.key(), expectedLastEntry.key().bytes())
-                && Arrays.equals(e.value(), expectedLastEntry.value());
-        }, 3_000);
+        boolean success = waitForCondition(closure, 10_000);
 
         assertTrue(success);
-
-        // Check that the last value has been written successfully
-        check(metaStorage, expectedLastEntry);
     }
 
     /**
-     * Get the meta store's key-value storage of the jraft server.
+     * Interacts with the raft group before a follower is stopped.
      *
-     * @param server Server.
-     * @return Meta store's key value storage.
+     * @param service Raft group service.
+     * @throws Exception If failed.
      */
-    private static RocksDBKeyValueStorage getStorage(JRaftServerImpl server) {
-        org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(METASTORAGE_RAFT_GROUP_NAME);
+    public abstract void beforeFollowerStop(RaftGroupService service) throws Exception;
 
-        DelegatingStateMachine fsm = (DelegatingStateMachine) svc.getRaftNode().getOptions().getFsm();
+    /**
+     * Interacts with the raft group after a follower is stopped.
+     *
+     * @param service Raft group service.
+     * @throws Exception If failed.
+     */
+    public abstract void afterFollowerStop(RaftGroupService service) throws Exception;
 
-        MetaStorageListener listener = (MetaStorageListener) fsm.getListener();
+    /**
+     * Interacts with a raft group after the leader has captured a snapshot.
+     *
+     * @param service Raft group service.
+     * @throws Exception If failed.
+     */
+    public abstract void afterSnapshot(RaftGroupService service) throws Exception;
 
-        KeyValueStorage storage = listener.getStorage();
+    /**
+     * Creates a closure that will be executed periodically to check if the snapshot and (conditionally on the
+     * {@link TestData#interactAfterSnapshot}) the raft log was successfully restored by the follower node.
+     *
+     * @param restarted Restarted follower node.
+     * @param interactedAfterSnapshot {@code true} whether raft group was interacted with after the snapshot operation.
+     * @return Closure.
+     */
+    public abstract BooleanSupplier snapshotCheckClosure(JRaftServerImpl restarted, boolean interactedAfterSnapshot);
 
-        return (RocksDBKeyValueStorage) storage;
-    }
+    /**
+     * @param listener Raft group listener.
+     * @return Path to the group's persistence.
+     */
+    public abstract Path getListenerPersistencePath(T listener);
 
     /**
-     * Check meta storage entry.
+     * Creates raft group listener.
      *
-     * @param metaStorage Meta storage service.
-     * @param expected Expected entry.
-     * @throws ExecutionException If failed.
-     * @throws InterruptedException If failed.
+     * @param workDir Work directory.
+     * @return Raft group listener.
      */
-    private void check(MetaStorageServiceImpl metaStorage, EntryImpl expected)
-        throws ExecutionException, InterruptedException {
-        Entry entry = metaStorage.get(expected.key()).get();
-
-        assertEquals(expected, entry);
-    }
+    public abstract RaftGroupListener createListener(Path workDir);
 
-    /** */
-    @SuppressWarnings("BusyWait") private static boolean waitForCondition(BooleanSupplier cond, long timeout) {
-        long stop = System.currentTimeMillis() + timeout;
+    /**
+     * @return Raft group id for tests.
+     */
+    public abstract String raftGroupId();
 
-        while (System.currentTimeMillis() < stop) {
-            if (cond.getAsBoolean())
-                return true;
+    /**
+     * Get the raft group listener from the jraft server.
+     *
+     * @param server Server.
+     * @param grpId Raft group id.
+     * @return Raft group listener.
+     */
+    protected T getListener(JRaftServerImpl server, String grpId) {
+        org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(grpId);
 
-            try {
-                sleep(50);
-            }
-            catch (InterruptedException e) {
-                return false;
-            }
-        }
+        JRaftServerImpl.DelegatingStateMachine fsm =
+            (JRaftServerImpl.DelegatingStateMachine) svc.getRaftNode().getOptions().getFsm();
 
-        return false;
+        return (T) fsm.getListener();
     }
 
     /**
@@ -329,7 +291,7 @@ public class ITMetaStorageServicePersistenceTest {
      * @param timeout The timeout in millis.
      * @return {@code True} if topology size is equal to expected.
      */
-    private boolean waitForTopology(ClusterService cluster, int exp, int timeout) {
+    private boolean waitForTopology(ClusterService cluster, int exp, int timeout) throws InterruptedException {
         return waitForCondition(() -> cluster.topologyService().allMembers().size() >= exp, timeout);
     }
 
@@ -370,10 +332,9 @@ public class ITMetaStorageServicePersistenceTest {
      * Starts a raft server.
      *
      * @param idx Server index (affects port of the server).
-     * @param storage KeyValueStorage for the MetaStorage.
      * @return Server.
      */
-    private JRaftServerImpl startServer(int idx, KeyValueStorage storage) {
+    private JRaftServerImpl startServer(int idx) {
         var addr = new NetworkAddress(getLocalAddress(), PORT);
 
         ClusterService service = clusterService("server" + idx, PORT + idx, addr);
@@ -391,8 +352,8 @@ public class ITMetaStorageServicePersistenceTest {
         server.start();
 
         server.startRaftGroup(
-            METASTORAGE_RAFT_GROUP_NAME,
-            new MetaStorageListener(storage),
+            raftGroupId(),
+            createListener(workDir),
             INITIAL_CONF
         );
 
@@ -402,25 +363,21 @@ public class ITMetaStorageServicePersistenceTest {
     }
 
     /**
-     * Prepares meta storage by instantiating corresponding raft server with {@link MetaStorageListener} and
-     * a client.
+     * Prepares raft group service by instantiating raft servers and a client.
      *
-     * @return Meta storage raft group service instance.
-     * @throws Exception If failed.
+     * @return Raft group service instance.
      */
-    private RaftGroupService prepareMetaStorage() throws Exception {
+    private RaftGroupService prepareRaftGroup() throws Exception {
         for (int i = 0; i < INITIAL_CONF.size(); i++)
-            startServer(i, new RocksDBKeyValueStorage(workDir.resolve(UUID.randomUUID().toString())));
+            startServer(i);
 
         assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
 
-        return startClient(METASTORAGE_RAFT_GROUP_NAME, new NetworkAddress(getLocalAddress(), PORT));
+        return startClient(raftGroupId(), new NetworkAddress(getLocalAddress(), PORT));
     }
 
     /**
      * Starts a client with a specific address.
-     *
-     * @throws Exception If failed.
      */
     private RaftGroupService startClient(String groupId, NetworkAddress addr) throws Exception {
         ClusterService clientNode = clusterService("client_" + groupId + "_", CLIENT_PORT + clients.size(), addr);
diff --git a/modules/storage-rocksdb/pom.xml b/modules/rocksdb-common/pom.xml
similarity index 66%
copy from modules/storage-rocksdb/pom.xml
copy to modules/rocksdb-common/pom.xml
index 84ee9ad..a05ce57 100644
--- a/modules/storage-rocksdb/pom.xml
+++ b/modules/rocksdb-common/pom.xml
@@ -29,13 +29,13 @@
         <relativePath>../../parent/pom.xml</relativePath>
     </parent>
 
-    <artifactId>ignite-storage-rocksdb</artifactId>
+    <artifactId>ignite-rocksdb-common</artifactId>
     <version>3.0.0-SNAPSHOT</version>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-storage-api</artifactId>
+            <artifactId>ignite-core</artifactId>
         </dependency>
 
         <!-- 3rd party dependencies -->
@@ -43,32 +43,5 @@
             <groupId>org.rocksdb</groupId>
             <artifactId>rocksdbjni</artifactId>
         </dependency>
-
-        <!-- Test dependencies -->
-        <dependency>
-            <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest-library</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-engine</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-core</artifactId>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-storage-api</artifactId>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 </project>
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/ColumnFamily.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
similarity index 73%
rename from modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/ColumnFamily.java
rename to modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
index 4052aeb..fcdc676 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/ColumnFamily.java
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.metastorage.server.persistence;
+package org.apache.ignite.internal.rocksdb;
 
 import java.util.List;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -34,12 +34,12 @@ import org.rocksdb.WriteBatch;
 /**
  * Wrapper for the column family that encapsulates {@link ColumnFamilyHandle} and RocksDB's operations with it.
  */
-class ColumnFamily implements AutoCloseable {
+public class ColumnFamily implements AutoCloseable {
     /** RocksDB instance. */
     private final RocksDB db;
 
-    /** Column family type. */
-    private final StorageColumnFamilyType cfType;
+    /** Column family name. */
+    private final String cfName;
 
     /** Column family handle. */
     private final ColumnFamilyHandle cfHandle;
@@ -55,20 +55,20 @@ class ColumnFamily implements AutoCloseable {
      *
      * @param db Db.
      * @param handle Column family handle.
-     * @param cfType Column family type.
+     * @param cfName Column family name.
      * @param cfOptions Column family options.
      * @param options Options for the column family options.
      * @throws RocksDBException If failed.
      */
-    ColumnFamily(
+    public ColumnFamily(
         RocksDB db,
         ColumnFamilyHandle handle,
-        StorageColumnFamilyType cfType,
+        String cfName,
         ColumnFamilyOptions cfOptions,
         Options options
     ) {
         this.db = db;
-        this.cfType = cfType;
+        this.cfName = cfName;
         this.cfOptions = cfOptions;
         this.options = options;
         this.cfHandle = handle;
@@ -87,11 +87,23 @@ class ColumnFamily implements AutoCloseable {
      * @throws RocksDBException If failed.
      * @see RocksDB#get(ColumnFamilyHandle, byte[])
      */
-    byte @Nullable [] get(byte @NotNull [] key) throws RocksDBException {
+    public byte @Nullable [] get(byte @NotNull [] key) throws RocksDBException {
         return db.get(cfHandle, key);
     }
 
     /**
+     * Puts a key-value pair into this column family.
+     *
+     * @param key Key.
+     * @param value Value.
+     * @throws RocksDBException If failed.
+     * @see RocksDB#put(ColumnFamilyHandle, byte[], byte[])
+     */
+    public void put(byte @NotNull [] key, byte @NotNull [] value) throws RocksDBException {
+        db.put(cfHandle, key, value);
+    }
+
+    /**
      * Puts a key-value pair into this column family within the write batch.
      *
      * @param batch Write batch.
@@ -100,11 +112,22 @@ class ColumnFamily implements AutoCloseable {
      * @throws RocksDBException If failed.
      * @see WriteBatch#put(ColumnFamilyHandle, byte[], byte[])
      */
-    void put(WriteBatch batch, byte @NotNull [] key, byte @NotNull [] value) throws RocksDBException {
+    public void put(WriteBatch batch, byte @NotNull [] key, byte @NotNull [] value) throws RocksDBException {
         batch.put(cfHandle, key, value);
     }
 
     /**
+     * Deletes the entry mapped by the key and associated with this column family.
+     *
+     * @param key Key.
+     * @throws RocksDBException If failed.
+     * @see RocksDB#delete(ColumnFamilyHandle, byte[])
+     */
+    public void delete(byte @NotNull [] key) throws RocksDBException {
+        db.delete(cfHandle, key);
+    }
+
+    /**
      * Deletes the entry mapped by the key and associated with this column family within the write batch.
      *
      * @param batch Write batch.
@@ -112,7 +135,7 @@ class ColumnFamily implements AutoCloseable {
      * @throws RocksDBException If failed.
      * @see WriteBatch#delete(ColumnFamilyHandle, byte[])
      */
-    void delete(WriteBatch batch, byte @NotNull [] key) throws RocksDBException {
+    public void delete(WriteBatch batch, byte @NotNull [] key) throws RocksDBException {
         batch.delete(cfHandle, key);
     }
 
@@ -122,7 +145,7 @@ class ColumnFamily implements AutoCloseable {
      * @return Iterator.
      * @see RocksDB#newIterator(ColumnFamilyHandle)
      */
-    RocksIterator newIterator() {
+    public RocksIterator newIterator() {
         return db.newIterator(cfHandle);
     }
 
@@ -133,7 +156,7 @@ class ColumnFamily implements AutoCloseable {
      * @return Iterator.
      * @see RocksDB#newIterator(ColumnFamilyHandle, ReadOptions)
      */
-    RocksIterator newIterator(ReadOptions options) {
+    public RocksIterator newIterator(ReadOptions options) {
         return db.newIterator(cfHandle, options);
     }
 
@@ -145,14 +168,14 @@ class ColumnFamily implements AutoCloseable {
      * @throws RocksDBException If failed.
      * @see RocksDB#ingestExternalFile(ColumnFamilyHandle, List, IngestExternalFileOptions)
      */
-    void ingestExternalFile(List<String> paths, IngestExternalFileOptions options) throws RocksDBException {
+    public void ingestExternalFile(List<String> paths, IngestExternalFileOptions options) throws RocksDBException {
         db.ingestExternalFile(cfHandle, paths, options);
     }
 
     /**
      * @return Name of the column family.
      */
-    String name() {
-        return cfType.name();
+    public String name() {
+        return cfName;
     }
 }
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksBiConsumer.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksBiConsumer.java
new file mode 100644
index 0000000..fabbdfb
--- /dev/null
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksBiConsumer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb;
+
+import org.rocksdb.RocksDBException;
+
+/**
+ * BiConsumer that can throw {@link RocksDBException}.
+ */
+@FunctionalInterface
+public interface RocksBiConsumer {
+    /**
+     * Accepts the key and the value of the entry.
+     *
+     * @param key   Key.
+     * @param value Value.
+     * @throws RocksDBException If failed to process the key-value pair.
+     */
+    void accept(byte[] key, byte[] value) throws RocksDBException;
+}
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksBiPredicate.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksBiPredicate.java
new file mode 100644
index 0000000..3525b34
--- /dev/null
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksBiPredicate.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb;
+
+import org.rocksdb.RocksDBException;
+
+/**
+ * BiPredicate that can throw {@link RocksDBException}.
+ */
+@FunctionalInterface
+public interface RocksBiPredicate {
+    /**
+     * Evaluates the predicate on the given key and the given value.
+     *
+     * @param key   Key.
+     * @param value Value.
+     * @return {@code true} if the input argument matches the predicate, otherwise {@code false}.
+     * @throws RocksDBException If failed to test the key-value pair.
+     */
+    boolean test(byte[] key, byte[] value) throws RocksDBException;
+}
diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
new file mode 100644
index 0000000..f9acbb4
--- /dev/null
+++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb;
+
+import java.nio.file.Path;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+import org.rocksdb.SstFileWriter;
+
+/**
+ * RocksDB utility functions.
+ */
+public class RocksUtils {
+    /**
+     * Creates an SST file for the column family.
+     *
+     * @param columnFamily Column family.
+     * @param snapshot Point-in-time snapshot.
+     * @param path Directory to put the SST file in.
+     */
+    public static void createSstFile(
+        ColumnFamily columnFamily,
+        Snapshot snapshot,
+        Path path
+    ) {
+        try (
+            EnvOptions envOptions = new EnvOptions();
+            Options options = new Options();
+            ReadOptions readOptions = new ReadOptions().setSnapshot(snapshot);
+            RocksIterator it = columnFamily.newIterator(readOptions);
+            SstFileWriter sstFileWriter = new SstFileWriter(envOptions, options)
+        ) {
+            Path sstFile = path.resolve(columnFamily.name());
+
+            sstFileWriter.open(sstFile.toString());
+
+            it.seekToFirst();
+
+            forEach(it, sstFileWriter::put);
+
+            sstFileWriter.finish();
+        }
+        catch (Throwable t) {
+            throw new IgniteInternalException("Failed to write snapshot: " + t.getMessage(), t);
+        }
+    }
+
+    /**
+     * Iterates over the given iterator passing key-value pairs to the given consumer and
+     * checks the iterator's status afterwards.
+     *
+     * @param iterator Iterator.
+     * @param consumer Consumer of key-value pairs.
+     * @throws RocksDBException If failed.
+     */
+    public static void forEach(RocksIterator iterator, RocksBiConsumer consumer) throws RocksDBException {
+        for (; iterator.isValid(); iterator.next())
+            consumer.accept(iterator.key(), iterator.value());
+
+        checkIterator(iterator);
+    }
+
+    /**
+     * Iterates over the given iterator testing key-value pairs with the given predicate and checks
+     * the iterator's status afterwards.
+     *
+     * @param iterator Iterator.
+     * @param consumer Consumer of key-value pairs.
+     * @return {@code true} if a matching key-value pair has been found, {@code false} otherwise.
+     * @throws RocksDBException If failed.
+     */
+    public static boolean find(RocksIterator iterator, RocksBiPredicate consumer) throws RocksDBException {
+        for (; iterator.isValid(); iterator.next()) {
+            boolean result = consumer.test(iterator.key(), iterator.value());
+
+            if (result)
+                return true;
+        }
+
+        checkIterator(iterator);
+
+        return false;
+    }
+
+    /**
+     * Checks the status of the iterator and throws an exception if it is not correct.
+     *
+     * @param it RocksDB iterator.
+     * @throws IgniteInternalException if the iterator has an incorrect status.
+     */
+    public static void checkIterator(RocksIterator it) {
+        try {
+            it.status();
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+    }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
index 67dd309..589a589 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
@@ -51,7 +51,7 @@ public interface InvokeClosure<T> {
     /**
      * @return Operation type for this closure or {@code null} if it is unknown.
      * After method {@link #call(DataRow)} has been called, operation type must
-     * be know and this method can not return {@code null}.
+     * be known and this method can not return {@code null}.
      */
     @Nullable OperationType operationType();
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
index 0778e74..80e4e02 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
@@ -17,9 +17,12 @@
 
 package org.apache.ignite.internal.storage;
 
+import java.nio.file.Path;
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -117,5 +120,21 @@ public interface Storage extends AutoCloseable {
      * @throws StorageException If failed to read data or storage is already stopped.
      */
     public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException;
+
+    /**
+     * Creates a snapshot of the storage's current state in the specified directory.
+     *
+     * @param snapshotPath Directory to store a snapshot.
+     * @return Future representing pending completion of the operation. Can not be {@code null}.
+     */
+    @NotNull
+    CompletableFuture<Void> snapshot(Path snapshotPath);
+
+    /**
+     * Restores a state of the storage which was previously captured with a {@link #snapshot(Path)}.
+     *
+     * @param snapshotPath Path to the snapshot's directory.
+     */
+    void restoreSnapshot(Path snapshotPath);
 }
 
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
index 9b8d820..3793c35 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.internal.storage.basic;
 
+import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.Predicate;
@@ -170,6 +172,16 @@ public class ConcurrentHashMapStorage implements Storage {
     }
 
     /** {@inheritDoc} */
+    @Override public @NotNull CompletableFuture<Void> snapshot(Path snapshotPath) {
+        throw new UnsupportedOperationException("Not implemented!");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void restoreSnapshot(Path snapshotPath) {
+        throw new UnsupportedOperationException("Not implemented!");
+    }
+
+    /** {@inheritDoc} */
     @Override public void close() throws Exception {
         // No-op.
     }
diff --git a/modules/storage-rocksdb/pom.xml b/modules/storage-rocksdb/pom.xml
index 84ee9ad..5c3a01a 100644
--- a/modules/storage-rocksdb/pom.xml
+++ b/modules/storage-rocksdb/pom.xml
@@ -38,10 +38,9 @@
             <artifactId>ignite-storage-api</artifactId>
         </dependency>
 
-        <!-- 3rd party dependencies -->
         <dependency>
-            <groupId>org.rocksdb</groupId>
-            <artifactId>rocksdbjni</artifactId>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-rocksdb-common</artifactId>
         </dependency>
 
         <!-- Test dependencies -->
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
index fbb3652..71d224e 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
@@ -17,17 +17,26 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
 import org.apache.ignite.internal.storage.DataRow;
 import org.apache.ignite.internal.storage.InvokeClosure;
 import org.apache.ignite.internal.storage.SearchRow;
@@ -39,19 +48,34 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 import org.rocksdb.AbstractComparator;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.ComparatorOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.IngestExternalFileOptions;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 
+import static org.apache.ignite.internal.rocksdb.RocksUtils.createSstFile;
+
 /**
  * Storage implementation based on a single RocksDB instance.
  */
 public class RocksDbStorage implements Storage {
+    /** Suffix for the temporary snapshot folder */
+    private static final String TMP_SUFFIX = ".tmp";
+
+    /** Snapshot file name. */
+    private static final String COLUMN_FAMILY_NAME = "data";
+
     static {
         RocksDB.loadLibrary();
     }
@@ -63,11 +87,20 @@ public class RocksDbStorage implements Storage {
     private final AbstractComparator comparator;
 
     /** RockDB options. */
-    private final Options options;
+    private final DBOptions options;
 
     /** RocksDb instance. */
     private final RocksDB db;
 
+    /** Data column family. */
+    private final ColumnFamily data;
+
+    /** DB path. */
+    private final Path dbPath;
+
+    /** Thread-pool for snapshot operations execution. */
+    private final ExecutorService snapshotExecutor = Executors.newSingleThreadExecutor();
+
     /**
      * @param dbPath Path to the folder to store data.
      * @param comparator Keys comparator.
@@ -75,6 +108,8 @@ public class RocksDbStorage implements Storage {
      */
     public RocksDbStorage(Path dbPath, Comparator<ByteBuffer> comparator) throws StorageException {
         try {
+            this.dbPath = dbPath;
+
             comparatorOptions = new ComparatorOptions();
 
             this.comparator = new AbstractComparator(comparatorOptions) {
@@ -89,13 +124,23 @@ public class RocksDbStorage implements Storage {
                 }
             };
 
-            options = new Options();
+            options = new DBOptions()
+                .setCreateMissingColumnFamilies(true)
+                .setCreateIfMissing(true);
 
-            options.setCreateIfMissing(true);
+            Options dataOptions = new Options().setCreateIfMissing(true).setComparator(this.comparator);
 
-            options.setComparator(this.comparator);
+            ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(dataOptions);
 
-            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
+            List<ColumnFamilyDescriptor> descriptors = Collections.singletonList(
+                new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, dataFamilyOptions)
+            );
+
+            var handles = new ArrayList<ColumnFamilyHandle>();
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString(), descriptors, handles);
+
+            this.data = new ColumnFamily(db, handles.get(0), COLUMN_FAMILY_NAME, dataFamilyOptions, dataOptions);
         }
         catch (RocksDBException e) {
             try {
@@ -114,7 +159,7 @@ public class RocksDbStorage implements Storage {
         try {
             byte[] keyBytes = key.keyBytes();
 
-            return new SimpleDataRow(keyBytes, db.get(keyBytes));
+            return new SimpleDataRow(keyBytes, data.get(keyBytes));
         }
         catch (RocksDBException e) {
             throw new StorageException("Failed to read data from the storage", e);
@@ -148,7 +193,11 @@ public class RocksDbStorage implements Storage {
     /** {@inheritDoc} */
     @Override public void write(DataRow row) throws StorageException {
         try {
-            db.put(row.keyBytes(), row.valueBytes());
+            byte[] value = row.valueBytes();
+
+            assert value != null;
+
+            data.put(row.keyBytes(), value);
         }
         catch (RocksDBException e) {
             throw new StorageException("Filed to write data to the storage", e);
@@ -159,8 +208,13 @@ public class RocksDbStorage implements Storage {
     @Override public void writeAll(Collection<? extends DataRow> rows) throws StorageException {
         try (WriteBatch batch = new WriteBatch();
              WriteOptions opts = new WriteOptions()) {
-            for (DataRow row : rows)
-                batch.put(row.keyBytes(), row.valueBytes());
+            for (DataRow row : rows) {
+                byte[] value = row.valueBytes();
+
+                assert value != null;
+
+                data.put(batch, row.keyBytes(), value);
+            }
 
             db.write(opts, batch);
         }
@@ -177,9 +231,13 @@ public class RocksDbStorage implements Storage {
              WriteOptions opts = new WriteOptions()) {
 
             for (DataRow row : rows) {
-                if (db.get(row.keyBytes()) == null)
-                    batch.put(row.keyBytes(), row.valueBytes());
-                else
+                if (data.get(row.keyBytes()) == null) {
+                    byte[] value = row.valueBytes();
+
+                    assert value != null;
+
+                    data.put(batch, row.keyBytes(), value);
+                } else
                     cantInsert.add(row);
             }
 
@@ -195,7 +253,7 @@ public class RocksDbStorage implements Storage {
     /** {@inheritDoc} */
     @Override public void remove(SearchRow key) throws StorageException {
         try {
-            db.delete(key.keyBytes());
+            data.delete(key.keyBytes());
         }
         catch (RocksDBException e) {
             throw new StorageException("Failed to remove data from the storage", e);
@@ -212,12 +270,12 @@ public class RocksDbStorage implements Storage {
             for (SearchRow key : keys) {
                 byte[] keyBytes = key.keyBytes();
 
-                byte[] value = db.get(keyBytes);
+                byte[] value = data.get(keyBytes);
 
                 if (value != null) {
                     res.add(new SimpleDataRow(keyBytes, value));
 
-                    batch.delete(keyBytes);
+                    data.delete(batch, keyBytes);
                 }
             }
 
@@ -260,7 +318,7 @@ public class RocksDbStorage implements Storage {
                 if (Arrays.equals(value, expectedValue)) {
                     res.add(new SimpleDataRow(key, value));
 
-                    batch.delete(key);
+                    data.delete(batch, key);
                 }
             }
 
@@ -278,18 +336,26 @@ public class RocksDbStorage implements Storage {
     @Override public <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException {
         try {
             byte[] keyBytes = key.keyBytes();
-            byte[] existingDataBytes = db.get(keyBytes);
+            byte[] existingDataBytes = data.get(keyBytes);
 
             clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
 
             switch (clo.operationType()) {
                 case WRITE:
-                    db.put(keyBytes, clo.newRow().valueBytes());
+                    DataRow newRow = clo.newRow();
+
+                    assert newRow != null;
+
+                    byte[] value = newRow.valueBytes();
+
+                    assert value != null;
+
+                    data.put(keyBytes, value);
 
                     break;
 
                 case REMOVE:
-                    db.delete(keyBytes);
+                    data.delete(keyBytes);
 
                     break;
 
@@ -306,15 +372,75 @@ public class RocksDbStorage implements Storage {
 
     /** {@inheritDoc} */
     @Override public Cursor<DataRow> scan(Predicate<SearchRow> filter) throws StorageException {
-        return new ScanCursor(db.newIterator(), filter);
+        return new ScanCursor(data.newIterator(), filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public @NotNull CompletableFuture<Void> snapshot(Path snapshotPath) {
+        Path tempPath = Paths.get(snapshotPath.toString() + TMP_SUFFIX);
+
+        // Create a RocksDB point-in-time snapshot
+        Snapshot snapshot = db.getSnapshot();
+
+        return CompletableFuture.runAsync(() -> {
+            // (Re)create the temporary directory
+            IgniteUtils.deleteIfExists(tempPath);
+
+            try {
+                Files.createDirectories(tempPath);
+            }
+            catch (IOException e) {
+                throw new IgniteInternalException("Failed to create directory: " + tempPath, e);
+            }
+        }, snapshotExecutor)
+            .thenRunAsync(() -> createSstFile(data, snapshot, tempPath), snapshotExecutor)
+            .whenComplete((aVoid, throwable) -> {
+                // Release a snapshot
+                db.releaseSnapshot(snapshot);
+
+                // Snapshot is not actually closed here, because a Snapshot instance doesn't own a pointer, the
+                // database does. Calling close to maintain the AutoCloseable semantics
+                snapshot.close();
+
+                if (throwable != null)
+                    return;
+
+                // Delete snapshot directory if it already exists
+                IgniteUtils.deleteIfExists(snapshotPath);
+
+                try {
+                    // Rename the temporary directory
+                    Files.move(tempPath, snapshotPath);
+                }
+                catch (IOException e) {
+                    throw new IgniteInternalException("Failed to rename: " + tempPath + " to " + snapshotPath, e);
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void restoreSnapshot(Path path) {
+        try (IngestExternalFileOptions ingestOptions = new IngestExternalFileOptions()) {
+            Path snapshotPath = path.resolve(COLUMN_FAMILY_NAME);
+
+            if (!Files.exists(snapshotPath))
+                throw new IgniteInternalException("Snapshot not found: " + snapshotPath);
+
+            this.data.ingestExternalFile(Collections.singletonList(snapshotPath.toString()), ingestOptions);
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException("Fail to ingest sst file at path: " + path, e);
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void close() throws Exception {
-        IgniteUtils.closeAll(comparatorOptions, comparator, options, db);
+        IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, TimeUnit.SECONDS);
+
+        IgniteUtils.closeAll(data, db, options, comparator, comparatorOptions);
     }
 
-    /** Cusror wrapper over the RocksIterator object with custom filter. */
+    /** Cursor wrapper over the RocksIterator object with custom filter. */
     private static class ScanCursor implements Cursor<DataRow> {
         /** Iterator from RocksDB. */
         private final RocksIterator iter;
@@ -379,4 +505,12 @@ public class RocksDbStorage implements Storage {
             iter.close();
         }
     }
+
+    /**
+     * @return Path to the database.
+     */
+    @TestOnly
+    public Path getDbPath() {
+        return dbPath;
+    }
 }
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index d005e3b..d63fa39 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -145,6 +145,13 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-raft</artifactId>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
         <!-- Benchmarks dependencies -->
         <dependency>
             <groupId>org.openjdk.jmh</groupId>
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
new file mode 100644
index 0000000..1e9dafa
--- /dev/null
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITTablePersistenceTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.distributed;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.raft.client.service.ITAbstractListenerSnapshotTest;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+
+/**
+ * Persistent (rocksdb-based) partitions raft group snapshots tests.
+ */
+public class ITTablePersistenceTest extends ITAbstractListenerSnapshotTest<PartitionListener> {
+    /** */
+    private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+        1,
+        new Column[] {new Column("key", NativeTypes.INT64, false)},
+        new Column[] {new Column("value", NativeTypes.INT64, false)}
+    );
+
+    /** */
+    private static final Row FIRST_KEY = createKeyRow(0);
+
+    /** */
+    private static final Row FIRST_VALUE = createKeyValueRow(0, 0);
+
+    /** */
+    private static final Row SECOND_KEY = createKeyRow(1);
+
+    /** */
+    private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+
+    /** {@inheritDoc} */
+    @Override public void beforeFollowerStop(RaftGroupService service) throws Exception {
+        var table = new InternalTableImpl("table", UUID.randomUUID(), Map.of(0, service), 1);
+
+        table.upsert(FIRST_VALUE, null).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void afterFollowerStop(RaftGroupService service) throws Exception {
+        var table = new InternalTableImpl("table", UUID.randomUUID(), Map.of(0, service), 1);
+
+        // Remove the first key
+        table.delete(FIRST_KEY, null).get();
+
+        // Put deleted data again
+        table.upsert(FIRST_VALUE, null).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void afterSnapshot(RaftGroupService service) throws Exception {
+        var table = new InternalTableImpl("table", UUID.randomUUID(), Map.of(0, service), 1);
+
+        table.upsert(SECOND_VALUE, null).get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public BooleanSupplier snapshotCheckClosure(JRaftServerImpl restarted, boolean interactedAfterSnapshot) {
+        RocksDbStorage storage = (RocksDbStorage) getListener(restarted, raftGroupId()).getStorage();
+
+        Row key = interactedAfterSnapshot ? SECOND_KEY : FIRST_KEY;
+        Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
+
+        ByteBuffer buffer = key.keySlice();
+        byte[] keyBytes = new byte[buffer.capacity()];
+        buffer.get(keyBytes);
+
+        SimpleDataRow finalRow = new SimpleDataRow(keyBytes, null);
+        SimpleDataRow finalValue = new SimpleDataRow(keyBytes, value.bytes());
+
+        return () -> {
+            DataRow read = storage.read(finalRow);
+            return Objects.equals(finalValue, read);
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public Path getListenerPersistencePath(PartitionListener listener) {
+        return ((RocksDbStorage) listener.getStorage()).getDbPath();
+    }
+
+    /** {@inheritDoc} */
+    @Override public RaftGroupListener createListener(Path workDir) {
+        return new PartitionListener(
+            new RocksDbStorage(workDir.resolve(UUID.randomUUID().toString()), ByteBuffer::compareTo)
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public String raftGroupId() {
+        return "partitions";
+    }
+
+    /**
+     * Creates a {@link Row} with the supplied key.
+     *
+     * @param id Key.
+     * @return Row.
+     */
+    private static Row createKeyRow(long id) {
+        RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
+
+        rowBuilder.appendLong(id);
+
+        return new Row(SCHEMA, new ByteBufferRow(rowBuilder.toBytes()));
+    }
+
+    /**
+     * Creates a {@link Row} with the supplied key and value.
+     *
+     * @param id Key.
+     * @param value Value.
+     * @return Row.
+     */
+    private static Row createKeyValueRow(long id, long value) {
+        RowAssembler rowBuilder = new RowAssembler(SCHEMA, 0, 0);
+
+        rowBuilder.appendLong(id);
+        rowBuilder.appendLong(value);
+
+        return new Row(SCHEMA, new ByteBufferRow(rowBuilder.toBytes()));
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index dd37cb9..cd90d9c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -57,6 +57,7 @@ import org.apache.ignite.raft.client.WriteCommand;
 import org.apache.ignite.raft.client.service.CommandClosure;
 import org.apache.ignite.raft.client.service.RaftGroupListener;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Partition command handler.
@@ -301,13 +302,15 @@ public class PartitionListener implements RaftGroupListener {
 
     /** {@inheritDoc} */
     @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
-        // Not implemented yet.
+        storage.snapshot(path).whenComplete((unused, throwable) -> {
+            doneClo.accept(throwable);
+        });
     }
 
     /** {@inheritDoc} */
     @Override public boolean onSnapshotLoad(Path path) {
-        // Not implemented yet.
-        return false;
+        storage.restoreSnapshot(path);
+        return true;
     }
 
     /** {@inheritDoc} */
@@ -345,4 +348,12 @@ public class PartitionListener implements RaftGroupListener {
 
         return new SimpleDataRow(key, null);
     }
+
+    /**
+     * @return Underlying storage.
+     */
+    @TestOnly
+    public Storage getStorage() {
+        return storage;
+    }
 }
diff --git a/parent/pom.xml b/parent/pom.xml
index 9fe2172..c3ac635 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -246,6 +246,12 @@
 
             <dependency>
                 <groupId>org.apache.ignite</groupId>
+                <artifactId>ignite-rocksdb-common</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.ignite</groupId>
                 <artifactId>ignite-schema</artifactId>
                 <version>${project.version}</version>
             </dependency>
diff --git a/pom.xml b/pom.xml
index 11cdcd5..aa83c61 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,6 +59,7 @@
         <module>modules/raft</module>
         <module>modules/raft-client</module>
         <module>modules/rest</module>
+        <module>modules/rocksdb-common</module>
         <module>modules/runner</module>
         <module>modules/schema</module>
         <module>modules/storage-api</module>