You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/07/22 10:10:08 UTC

[ignite-3] branch main updated: IGNITE-14982 RocksDB-based persistent key-value storage for meta store (#203)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 179d24d  IGNITE-14982 RocksDB-based persistent key-value storage for meta store (#203)
179d24d is described below

commit 179d24de331d6b2d88846daedb779672b4667627
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Jul 22 13:10:02 2021 +0300

    IGNITE-14982 RocksDB-based persistent key-value storage for meta store (#203)
---
 .../apache/ignite/internal/util/ArrayUtils.java    |    2 +-
 .../ignite/internal/util/CollectionUtils.java      |    2 +-
 .../apache/ignite/internal/util/IgniteUtils.java   |   22 +-
 .../testframework/WorkDirectoryExtension.java      |    9 +-
 modules/metastorage-client/pom.xml                 |    7 +
 .../ITMetaStorageServicePersistenceTest.java       |  426 ++++
 .../client/ITMetaStorageServiceTest.java           |   29 +-
 modules/metastorage-server/pom.xml                 |   12 +
 .../metastorage/server/KeyValueStorage.java        |   23 +-
 .../server/SimpleInMemoryKeyValueStorage.java      |   21 +-
 .../ignite/internal/metastorage/server/Value.java  |    2 +-
 .../server/persistence/ColumnFamily.java           |  158 ++
 .../server/persistence/RangeCursor.java            |  184 ++
 .../server/persistence/RocksDBKeyValueStorage.java | 1092 +++++++++++
 .../server/persistence/RocksStorageUtils.java      |  267 +++
 .../persistence/StorageColumnFamilyType.java       |   53 +
 .../server/persistence/WatchCursor.java            |  233 +++
 .../server/raft/MetaStorageListener.java           |   32 +-
 ...eTest.java => AbstractKeyValueStorageTest.java} |  164 +-
 .../server/RocksDbKeyValueStorageTest.java         |   39 +
 .../server/SimpleInMemoryKeyValueStorageTest.java  | 2032 +-------------------
 .../raft/client/service/RaftGroupListener.java     |   10 +-
 .../apache/ignite/raft/server/CounterListener.java |   13 +-
 .../raft/server/ITJRaftCounterServerTest.java      |    4 +-
 .../internal/raft/server/impl/JRaftServerImpl.java |   10 +-
 .../table/distributed/raft/PartitionListener.java  |   10 +-
 26 files changed, 2724 insertions(+), 2132 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
index 1f2e9f3..a33cf5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.util;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-
 import org.apache.ignite.lang.IgniteInternalException;
 import org.jetbrains.annotations.Nullable;
 
@@ -272,6 +271,7 @@ public final class ArrayUtils {
      *
      * @param arr Array.
      * @param obj One or more elements.
+     * @param <T> Type of the elements of the array.
      * @return Concatenated array.
      */
     @SafeVarargs
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index 4273ba4..21a7899 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.util;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -61,6 +60,7 @@ public final class CollectionUtils {
      * Gets first element from given list or returns {@code null} if list is empty.
      *
      * @param list List to retrieve the first element.
+     * @param <T> Type of the elements of the list.
      * @return The first element of the given list or {@code null} in case the list is empty.
      */
     public static <T> T first(List<? extends T> list) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 8e8af1a..f5ea3c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -24,6 +24,7 @@ import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -371,6 +372,9 @@ public class IgniteUtils {
         try {
             Files.walkFileTree(path, new SimpleFileVisitor<>() {
                 @Override public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+                    if (exc != null)
+                        throw exc;
+
                     Files.delete(dir);
 
                     return FileVisitResult.CONTINUE;
@@ -449,14 +453,16 @@ public class IgniteUtils {
      * thrown exception will be propagated to the caller, after all other objects are closed, similar to
      * the try-with-resources block.
      *
-     * @param closeables collection of objects to close
+     * @param closeables Collection of objects to close.
+     * @throws Exception If failed to close.
      */
     public static void closeAll(Collection<? extends AutoCloseable> closeables) throws Exception {
         Exception ex = null;
 
         for (AutoCloseable closeable : closeables) {
             try {
-                closeable.close();
+                if (closeable != null)
+                    closeable.close();
             }
             catch (Exception e) {
                 if (ex == null)
@@ -469,4 +475,16 @@ public class IgniteUtils {
         if (ex != null)
             throw ex;
     }
+
+    /**
+     * Closes all provided objects.
+     *
+     * @param closeables Array of closeable objects to close.
+     * @throws Exception If failed to close.
+     *
+     * @see #closeAll(Collection)
+     */
+    public static void closeAll(AutoCloseable... closeables) throws Exception {
+        closeAll(Arrays.asList(closeables));
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
index 43b44b8..54791a9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
@@ -41,7 +41,8 @@ import org.junit.platform.commons.support.HierarchyTraversalMode;
  * {@link WorkDirectory} annotation.
  * <p>
  * A new temporary folder is created for every test method and will be located relative to the module,
- * where the tests are being run, by the following path: "target/work/{@literal <name-of-the-test-method>}".
+ * where the tests are being run, by the following path:
+ * "target/work/{@literal <name-of-the-test-class>/<name-of-the-test-method>_<current_time_millis>}".
  * It is removed after a test has finished running, but this behaviour can be controlled by setting the
  * {@link WorkDirectoryExtension#KEEP_WORK_DIR_PROPERTY} property to {@code true}, in which case the created folder can
  * be kept intact for debugging purposes.
@@ -110,7 +111,11 @@ public class WorkDirectoryExtension implements BeforeEachCallback, AfterEachCall
         if (shouldRemoveDir())
             IgniteUtils.deleteIfExists(BASE_PATH);
 
-        Path workDir = BASE_PATH.resolve(extensionContext.getRequiredTestMethod().getName());
+        String testClassDir = extensionContext.getRequiredTestClass().getSimpleName();
+
+        String testMethodDir = extensionContext.getRequiredTestMethod().getName() + '_' + System.currentTimeMillis();
+
+        Path workDir = BASE_PATH.resolve(testClassDir).resolve(testMethodDir);
 
         Files.createDirectories(workDir);
 
diff --git a/modules/metastorage-client/pom.xml b/modules/metastorage-client/pom.xml
index ffb6ef1..0041b99 100644
--- a/modules/metastorage-client/pom.xml
+++ b/modules/metastorage-client/pom.xml
@@ -86,5 +86,12 @@
             <artifactId>ignite-metastorage-server</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
new file mode 100644
index 0000000..3783ccc
--- /dev/null
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServicePersistenceTest.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.client;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BooleanSupplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.persistence.RocksDBKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
+import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl.DelegatingStateMachine;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static java.lang.Thread.sleep;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Persistent (rocksdb-based) meta storage client tests.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ITMetaStorageServicePersistenceTest {
+    /** Starting server port. */
+    private static final int PORT = 5003;
+
+    /** Starting client port. */
+    private static final int CLIENT_PORT = 6003;
+
+    /**
+     * Peers list.
+     */
+    private static final List<Peer> INITIAL_CONF = IntStream.rangeClosed(0, 2)
+        .mapToObj(i -> new NetworkAddress(getLocalAddress(), PORT + i))
+        .map(Peer::new)
+        .collect(Collectors.toUnmodifiableList());
+
+    /** */
+    private static final String METASTORAGE_RAFT_GROUP_NAME = "METASTORAGE_RAFT_GROUP";
+
+    /** Factory. */
+    private static final RaftClientMessagesFactory FACTORY = new RaftClientMessagesFactory();
+
+    /** Network factory. */
+    private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
+
+    /** */
+    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
+
+    /** */
+    @WorkDirectory
+    private Path workDir;
+
+    /** Cluster. */
+    private final List<ClusterService> cluster = new ArrayList<>();
+
+    /** Servers. */
+    private final List<JRaftServerImpl> servers = new ArrayList<>();
+
+    /** Clients. */
+    private final List<RaftGroupService> clients = new ArrayList<>();
+
+    /**
+     * Shutdown raft server and stop all cluster nodes.
+     *
+     * @throws Exception If failed to shutdown raft server,
+     */
+    @AfterEach
+    public void afterTest() throws Exception {
+        for (RaftGroupService client : clients)
+            client.shutdown();
+
+        for (JRaftServerImpl server : servers)
+            server.shutdown();
+    }
+
+    /**
+     * Test parameters for {@link #testSnapshot}.
+     */
+    private static class TestData {
+        /** Delete raft group folder. */
+        private final boolean deleteFolder;
+
+        /** Write to meta storage after a snapshot. */
+        private final boolean writeAfterSnapshot;
+
+        /** */
+        private TestData(boolean deleteFolder, boolean writeAfterSnapshot) {
+            this.deleteFolder = deleteFolder;
+            this.writeAfterSnapshot = writeAfterSnapshot;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return String.format("deleteFolder=%s, writeAfterSnapshot=%s", deleteFolder, writeAfterSnapshot);
+        }
+    }
+
+    /**
+     * @return {@link #testSnapshot} parameters.
+     */
+    private static List<TestData> testSnapshotData() {
+        return List.of(
+            new TestData(false, false),
+            new TestData(true, true),
+            new TestData(false, true),
+            new TestData(true, false)
+        );
+    }
+
+    /**
+     * Tests that a joining raft node successfully restores a snapshot.
+     *
+     * @param testData Test parameters.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("testSnapshotData")
+    public void testSnapshot(TestData testData) throws Exception {
+        ByteArray firstKey = ByteArray.fromString("first");
+        byte[] firstValue = "firstValue".getBytes(StandardCharsets.UTF_8);
+
+        // Setup a metastorage raft service
+        RaftGroupService metaStorageSvc = prepareMetaStorage();
+
+        MetaStorageServiceImpl metaStorage = new MetaStorageServiceImpl(metaStorageSvc);
+
+        // Put some data in the metastorage
+        metaStorage.put(firstKey, firstValue).get();
+
+        // Check that data has been written successfully
+        check(metaStorage, new EntryImpl(firstKey, firstValue, 1, 1));;
+
+        // Select any node that is not the leader of the group
+        JRaftServerImpl toStop = servers.stream()
+            .filter(server -> !server.localPeer(METASTORAGE_RAFT_GROUP_NAME).equals(metaStorageSvc.leader()))
+            .findAny()
+            .orElseThrow();
+
+        // Get the path to that node's raft directory
+        String serverDataPath = toStop.getServerDataPath(METASTORAGE_RAFT_GROUP_NAME);
+
+        // Get the path to that node's RocksDB key-value storage
+        Path dbPath = getStorage(toStop).getDbPath();
+
+        int stopIdx = servers.indexOf(toStop);
+
+        // Remove that node from the list of servers
+        servers.remove(stopIdx);
+
+        // Shutdown that node
+        toStop.shutdown();
+
+        // Create a raft snapshot of the metastorage service
+        metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
+
+        // Remove the first key from the metastorage
+        metaStorage.remove(firstKey).get();
+
+        // Check that data has been removed
+        check(metaStorage, new EntryImpl(firstKey, null, 2, 2));
+
+        // Put same data again
+        metaStorage.put(firstKey, firstValue).get();
+
+        // Check that it has been written
+        check(metaStorage, new EntryImpl(firstKey, firstValue, 3, 3));
+
+        // Create another raft snapshot
+        metaStorageSvc.snapshot(metaStorageSvc.leader()).get();
+
+        byte[] lastKey = firstKey.bytes();
+        byte[] lastValue = firstValue;
+
+        if (testData.deleteFolder) {
+            // Delete stopped node's raft directory and key-value storage directory
+            // to check if snapshot could be restored by the restarted node
+            IgniteUtils.deleteIfExists(dbPath);
+            IgniteUtils.deleteIfExists(Paths.get(serverDataPath));
+        }
+
+        if (testData.writeAfterSnapshot) {
+            // Put new data after the second snapshot to check if after
+            // the snapshot restore operation restarted node will receive it
+            ByteArray secondKey = ByteArray.fromString("second");
+            byte[] secondValue = "secondValue".getBytes(StandardCharsets.UTF_8);
+
+            metaStorage.put(secondKey, secondValue).get();
+
+            lastKey = secondKey.bytes();
+            lastValue = secondValue;
+        }
+
+        // Restart the node
+        JRaftServerImpl restarted = startServer(stopIdx, new RocksDBKeyValueStorage(dbPath));
+
+        assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
+
+        KeyValueStorage storage = getStorage(restarted);
+
+        byte[] finalLastKey = lastKey;
+
+        int expectedRevision = testData.writeAfterSnapshot ? 4 : 3;
+        int expectedUpdateCounter = testData.writeAfterSnapshot ? 4 : 3;
+
+        EntryImpl expectedLastEntry = new EntryImpl(new ByteArray(lastKey), lastValue, expectedRevision, expectedUpdateCounter);
+
+        // Wait until the snapshot is restored
+        boolean success = waitForCondition(() -> {
+            org.apache.ignite.internal.metastorage.server.Entry e = storage.get(finalLastKey);
+            return e.empty() == expectedLastEntry.empty()
+                && e.tombstone() == expectedLastEntry.tombstone()
+                && e.revision() == expectedLastEntry.revision()
+                && e.updateCounter() == expectedLastEntry.revision()
+                && Arrays.equals(e.key(), expectedLastEntry.key().bytes())
+                && Arrays.equals(e.value(), expectedLastEntry.value());
+        }, 3_000);
+
+        assertTrue(success);
+
+        // Check that the last value has been written successfully
+        check(metaStorage, expectedLastEntry);
+    }
+
+    /**
+     * Get the meta store's key-value storage of the jraft server.
+     *
+     * @param server Server.
+     * @return Meta store's key value storage.
+     */
+    private static RocksDBKeyValueStorage getStorage(JRaftServerImpl server) {
+        org.apache.ignite.raft.jraft.RaftGroupService svc = server.raftGroupService(METASTORAGE_RAFT_GROUP_NAME);
+
+        DelegatingStateMachine fsm = (DelegatingStateMachine) svc.getRaftNode().getOptions().getFsm();
+
+        MetaStorageListener listener = (MetaStorageListener) fsm.getListener();
+
+        KeyValueStorage storage = listener.getStorage();
+
+        return (RocksDBKeyValueStorage) storage;
+    }
+
+    /**
+     * Check meta storage entry.
+     *
+     * @param metaStorage Meta storage service.
+     * @param expected Expected entry.
+     * @throws ExecutionException If failed.
+     * @throws InterruptedException If failed.
+     */
+    private void check(MetaStorageServiceImpl metaStorage, EntryImpl expected)
+        throws ExecutionException, InterruptedException {
+        Entry entry = metaStorage.get(expected.key()).get();
+
+        assertEquals(expected, entry);
+    }
+
+    /** */
+    @SuppressWarnings("BusyWait") private static boolean waitForCondition(BooleanSupplier cond, long timeout) {
+        long stop = System.currentTimeMillis() + timeout;
+
+        while (System.currentTimeMillis() < stop) {
+            if (cond.getAsBoolean())
+                return true;
+
+            try {
+                sleep(50);
+            }
+            catch (InterruptedException e) {
+                return false;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * @param cluster The cluster.
+     * @param exp Expected count.
+     * @param timeout The timeout in millis.
+     * @return {@code True} if topology size is equal to expected.
+     */
+    private boolean waitForTopology(ClusterService cluster, int exp, int timeout) {
+        return waitForCondition(() -> cluster.topologyService().allMembers().size() >= exp, timeout);
+    }
+
+    /**
+     * @return Local address.
+     */
+    private static String getLocalAddress() {
+        try {
+            return InetAddress.getLocalHost().getHostAddress();
+        }
+        catch (UnknownHostException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Creates a cluster service.
+     */
+    private ClusterService clusterService(String name, int port, List<NetworkAddress> servers) {
+        var context = new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY);
+
+        var network = NETWORK_FACTORY.createClusterService(context);
+
+        network.start();
+
+        cluster.add(network);
+
+        return network;
+    }
+
+    /**
+     * Starts a raft server.
+     *
+     * @param idx Server index (affects port of the server).
+     * @param storage KeyValueStorage for the MetaStorage.
+     * @return Server.
+     */
+    private JRaftServerImpl startServer(int idx, KeyValueStorage storage) {
+        var addr = new NetworkAddress(getLocalAddress(), PORT);
+
+        ClusterService service = clusterService("server" + idx, PORT + idx, List.of(addr));
+
+        Path jraft = workDir.resolve("jraft" + idx);
+
+        JRaftServerImpl server = new JRaftServerImpl(service, jraft.toString()) {
+            @Override public void shutdown() throws Exception {
+                super.shutdown();
+
+                service.shutdown();
+            }
+        };
+
+        server.startRaftGroup(
+            METASTORAGE_RAFT_GROUP_NAME,
+            new MetaStorageListener(storage),
+            INITIAL_CONF
+        );
+
+        servers.add(server);
+
+        return server;
+    }
+
+    /**
+     * Prepares meta storage by instantiating corresponding raft server with {@link MetaStorageListener} and
+     * a client.
+     *
+     * @return Meta storage raft group service instance.
+     */
+    private RaftGroupService prepareMetaStorage() throws IOException {
+        for (int i = 0; i < INITIAL_CONF.size(); i++)
+            startServer(i, new RocksDBKeyValueStorage(workDir.resolve(UUID.randomUUID().toString())));
+
+        assertTrue(waitForTopology(cluster.get(0), servers.size(), 3_000));
+
+        return startClient(METASTORAGE_RAFT_GROUP_NAME, new NetworkAddress(getLocalAddress(), PORT));
+    }
+
+    /**
+     * Starts a client with a specific address.
+     */
+    private RaftGroupService startClient(String groupId, NetworkAddress addr) {
+        ClusterService clientNode = clusterService(
+            "client_" + groupId + "_", CLIENT_PORT + clients.size(), List.of(addr));
+
+        RaftGroupServiceImpl client = new RaftGroupServiceImpl(groupId, clientNode, FACTORY, 10_000,
+            List.of(new Peer(addr)), false, 200) {
+            @Override public void shutdown() {
+                super.shutdown();
+
+                clientNode.shutdown();
+            }
+        };
+
+        clients.add(client);
+
+        return client;
+    }
+}
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index 78a6a41..19a95a1 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.metastorage.client;
 
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -25,6 +26,7 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -32,6 +34,8 @@ import java.util.stream.IntStream;
 import org.apache.ignite.internal.metastorage.common.OperationType;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteLogger;
@@ -47,9 +51,8 @@ import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
-import org.apache.ignite.internal.raft.server.RaftServer;
-import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -818,7 +821,7 @@ public class ITMetaStorageServiceTest {
 
         MetaStorageService metaStorageSvc = prepareMetaStorage(
                 new AbstractKeyValueStorage() {
-                    @Override public Cursor<org.apache.ignite.internal.metastorage.server.WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
+                    @Override public Cursor<org.apache.ignite.internal.metastorage.server.WatchEvent> watch(byte[] keyFrom, byte @Nullable [] keyTo, long rev) {
                         return new Cursor<>() {
                             private final Iterator<org.apache.ignite.internal.metastorage.server.WatchEvent> it = new Iterator<>() {
                                 @Override public boolean hasNext() {
@@ -1145,7 +1148,7 @@ public class ITMetaStorageServiceTest {
         }
 
         /** {@inheritDoc} */
-        @Override public Cursor<org.apache.ignite.internal.metastorage.server.WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
+        @Override public Cursor<org.apache.ignite.internal.metastorage.server.WatchEvent> watch(byte[] keyFrom, byte @Nullable [] keyTo, long rev) {
             fail();
 
             return null;
@@ -1169,5 +1172,23 @@ public class ITMetaStorageServiceTest {
         @Override public void compact() {
             fail();
         }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            fail();
+        }
+
+        /** {@inheritDoc} */
+        @NotNull
+        @Override public CompletableFuture<Void> snapshot(Path snapshotPath) {
+            fail();
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void restoreSnapshot(Path snapshotPath) {
+            fail();
+        }
     }
 }
diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml
index 5f0f453..07bb1df 100644
--- a/modules/metastorage-server/pom.xml
+++ b/modules/metastorage-server/pom.xml
@@ -44,6 +44,11 @@
         </dependency>
 
         <dependency>
+            <groupId>org.rocksdb</groupId>
+            <artifactId>rocksdbjni</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.jetbrains</groupId>
             <artifactId>annotations</artifactId>
         </dependency>
@@ -60,5 +65,12 @@
             <artifactId>junit-jupiter-engine</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index d4c7da5..9250d7d 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -17,15 +17,18 @@
 
 package org.apache.ignite.internal.metastorage.server;
 
+import java.nio.file.Path;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Defines key/value storage interface.
  */
-public interface KeyValueStorage {
+public interface KeyValueStorage extends AutoCloseable {
     /**
      * Returns storage revision.
      *
@@ -177,7 +180,7 @@ public interface KeyValueStorage {
      * @param rev Start revision number.
      * @return Cursor by update events.
      */
-    Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev);
+    Cursor<WatchEvent> watch(byte[] keyFrom, byte @Nullable [] keyTo, long rev);
 
     /**
      * Creates subscription on updates of entries corresponding to the given keys range (where upper bound is unlimited)
@@ -203,4 +206,20 @@ public interface KeyValueStorage {
      * Compacts storage (removes tombstones).
      */
     void compact();
+
+    /**
+     * Creates a snapshot of the storage's current state in the specified directory.
+     *
+     * @param snapshotPath Directory to store a snapshot.
+     * @return Future representing pending completion of the operation. Could not be {@code null}.
+     */
+    @NotNull
+    CompletableFuture<Void> snapshot(Path snapshotPath);
+
+    /**
+     * Restores a state of the storage which was previously captured with a {@link #snapshot(Path)}.
+     *
+     * @param snapshotPath Path to the snapshot's directory.
+     */
+    void restoreSnapshot(Path snapshotPath);
 }
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index bab95a8..26efb72 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.metastorage.server;
 
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -28,9 +29,11 @@ import java.util.NavigableMap;
 import java.util.NoSuchElementException;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
 
@@ -284,7 +287,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
     }
 
     /** {@inheritDoc} */
-    @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
+    @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte @Nullable [] keyTo, long rev) {
         assert keyFrom != null : "keyFrom couldn't be null.";
         assert rev > 0 : "rev must be positive.";
 
@@ -328,6 +331,22 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public CompletableFuture<Void> snapshot(Path snapshotPath) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void restoreSnapshot(Path snapshotPath) {
+        throw new UnsupportedOperationException();
+    }
+
     /** */
     private boolean doRemove(byte[] key, long curRev) {
         Entry e = doGet(key, LATEST_REV, false);
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
index ce79b17..e7dff54 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
@@ -66,7 +66,7 @@ public class Value {
      *
      * @return {@code True} if value is tombstone, otherwise - {@code false}.
      */
-    boolean tombstone() {
+    public boolean tombstone() {
         return bytes == TOMBSTONE;
     }
 }
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/ColumnFamily.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/ColumnFamily.java
new file mode 100644
index 0000000..4052aeb
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/ColumnFamily.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.util.List;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+
+/**
+ * Wrapper for the column family that encapsulates {@link ColumnFamilyHandle} and RocksDB's operations with it.
+ */
+class ColumnFamily implements AutoCloseable {
+    /** RocksDB instance. */
+    private final RocksDB db;
+
+    /** Column family type. */
+    private final StorageColumnFamilyType cfType;
+
+    /** Column family handle. */
+    private final ColumnFamilyHandle cfHandle;
+
+    /** Column family options. */
+    private final ColumnFamilyOptions cfOptions;
+
+    /** Options for the column family options. */
+    private final Options options;
+
+    /**
+     * Constructor.
+     *
+     * @param db Db.
+     * @param handle Column family handle.
+     * @param cfType Column family type.
+     * @param cfOptions Column family options.
+     * @param options Options for the column family options.
+     * @throws RocksDBException If failed.
+     */
+    ColumnFamily(
+        RocksDB db,
+        ColumnFamilyHandle handle,
+        StorageColumnFamilyType cfType,
+        ColumnFamilyOptions cfOptions,
+        Options options
+    ) {
+        this.db = db;
+        this.cfType = cfType;
+        this.cfOptions = cfOptions;
+        this.options = options;
+        this.cfHandle = handle;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        IgniteUtils.closeAll(cfHandle, cfOptions, options);
+    }
+
+    /**
+     * Gets the value associated with the key from this column family.
+     *
+     * @param key Key.
+     * @return Value.
+     * @throws RocksDBException If failed.
+     * @see RocksDB#get(ColumnFamilyHandle, byte[])
+     */
+    byte @Nullable [] get(byte @NotNull [] key) throws RocksDBException {
+        return db.get(cfHandle, key);
+    }
+
+    /**
+     * Puts a key-value pair into this column family within the write batch.
+     *
+     * @param batch Write batch.
+     * @param key Key.
+     * @param value Value.
+     * @throws RocksDBException If failed.
+     * @see WriteBatch#put(ColumnFamilyHandle, byte[], byte[])
+     */
+    void put(WriteBatch batch, byte @NotNull [] key, byte @NotNull [] value) throws RocksDBException {
+        batch.put(cfHandle, key, value);
+    }
+
+    /**
+     * Deletes the entry mapped by the key and associated with this column family within the write batch.
+     *
+     * @param batch Write batch.
+     * @param key Key.
+     * @throws RocksDBException If failed.
+     * @see WriteBatch#delete(ColumnFamilyHandle, byte[])
+     */
+    void delete(WriteBatch batch, byte @NotNull [] key) throws RocksDBException {
+        batch.delete(cfHandle, key);
+    }
+
+    /**
+     * Creates a new iterator over this column family.
+     *
+     * @return Iterator.
+     * @see RocksDB#newIterator(ColumnFamilyHandle)
+     */
+    RocksIterator newIterator() {
+        return db.newIterator(cfHandle);
+    }
+
+    /**
+     * Creates a new iterator with given read options over this column family.
+     *
+     * @param options Read options.
+     * @return Iterator.
+     * @see RocksDB#newIterator(ColumnFamilyHandle, ReadOptions)
+     */
+    RocksIterator newIterator(ReadOptions options) {
+        return db.newIterator(cfHandle, options);
+    }
+
+    /**
+     * Ingests external files into this column family.
+     *
+     * @param paths Paths to the external files.
+     * @param options Ingestion options.
+     * @throws RocksDBException If failed.
+     * @see RocksDB#ingestExternalFile(ColumnFamilyHandle, List, IngestExternalFileOptions)
+     */
+    void ingestExternalFile(List<String> paths, IngestExternalFileOptions options) throws RocksDBException {
+        db.ingestExternalFile(cfHandle, paths, options);
+    }
+
+    /**
+     * @return Name of the column family.
+     */
+    String name() {
+        return cfType.name();
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RangeCursor.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RangeCursor.java
new file mode 100644
index 0000000..826268e
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RangeCursor.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cursor by entries which correspond to the given keys range.
+ */
+class RangeCursor implements Cursor<Entry> {
+    /** Storage. */
+    private final RocksDBKeyValueStorage storage;
+
+    /** Lower iteration bound (included). */
+    private final byte[] keyFrom;
+
+    /** Upper iteration bound (excluded). */
+    private final byte @Nullable [] keyTo;
+
+    /** Revision upper bound (included). */
+    private final long rev;
+
+    /** Iterator. */
+    private final Iterator<Entry> it;
+
+    /** Next entry. */
+    @Nullable
+    private Entry nextRetEntry;
+
+    /** Key of the last returned entry. */
+    private byte[] lastRetKey;
+
+    /**
+     * {@code true} if the iteration is finished.
+     */
+    private boolean finished;
+
+    /**
+     * Constructor.
+     *
+     * @param storage Storage.
+     * @param keyFrom {@link #keyFrom}.
+     * @param keyTo {@link #keyTo}.
+     * @param rev {@link #rev}.
+     */
+    RangeCursor(RocksDBKeyValueStorage storage, byte[] keyFrom, byte @Nullable [] keyTo, long rev) {
+        this.storage = storage;
+        this.keyFrom = keyFrom;
+        this.keyTo = keyTo;
+        this.rev = rev;
+        this.it = createIterator();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() {
+        return it.hasNext();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Entry next() {
+        return it.next();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Iterator<Entry> iterator() {
+        return it;
+    }
+
+    /**
+     * Creates an iterator for this cursor.
+     *
+     * @return Iterator.
+     */
+    @NotNull
+    private Iterator<Entry> createIterator() {
+        return new Iterator<>() {
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                storage.lock().readLock().lock();
+
+                try {
+                    while (true) {
+                        if (finished)
+                            return false;
+
+                        if (nextRetEntry != null)
+                            return true;
+
+                        byte[] key = lastRetKey;
+
+                        while (nextRetEntry == null) {
+                            Map.Entry<byte[], long[]> e =
+                                key == null ? storage.revisionCeilingEntry(keyFrom) : storage.revisionHigherEntry(key);
+
+                            if (e == null) {
+                                finished = true;
+
+                                break;
+                            }
+
+                            key = e.getKey();
+
+                            if (keyTo != null && RocksDBKeyValueStorage.CMP.compare(key, keyTo) >= 0) {
+                                finished = true;
+
+                                break;
+                            }
+
+                            long[] revs = e.getValue();
+
+                            assert revs != null && revs.length != 0 :
+                                "Revisions should not be empty or null: [revs=" + Arrays.toString(revs) + ']';
+
+                            long lastRev = RocksDBKeyValueStorage.maxRevision(revs, rev);
+
+                            if (lastRev == -1)
+                                continue;
+
+                            Entry entry = storage.doGetValue(key, lastRev);
+
+                            assert !entry.empty() : "Iterator should not return empty entry.";
+
+                            nextRetEntry = entry;
+                        }
+                    }
+                }
+                finally {
+                    storage.lock().readLock().unlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public Entry next() {
+                storage.lock().readLock().lock();
+
+                try {
+                    if (!hasNext())
+                        throw new NoSuchElementException();
+
+                    Entry e = nextRetEntry;
+
+                    nextRetEntry = null;
+
+                    assert e != null;
+
+                    lastRetKey = e.key();
+
+                    return e;
+                }
+                finally {
+                    storage.lock().readLock().unlock();
+                }
+            }
+        };
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java
new file mode 100644
index 0000000..507307d
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java
@@ -0,0 +1,1092 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.Operation;
+import org.apache.ignite.internal.metastorage.server.Value;
+import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+import org.rocksdb.SstFileWriter;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.appendLong;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToLong;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToValue;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.find;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.forEach;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.getAsLongs;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.keyToRocksKey;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longToBytes;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes;
+import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA;
+import static org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
+import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
+
+/**
+ * Key-value storage based on RocksDB.
+ * Keys are stored with revision.
+ * Values are stored in the default column family with an update counter and a boolean flag which represents
+ * whether this record is a tombstone.
+ * <br>
+ * Key: [8 bytes revision, N bytes key itself].
+ * <br>
+ * Value: [8 bytes update counter, 1 byte tombstone flag, N bytes value].
+ * <br>
+ * The mapping from the key to the set of the storage's revisions is stored in the "index" column family.
+ * A key represents the key of an entry and the value is a {@code byte[]} that represents a {@code long[]} where every
+ * item is a revision of the storage.
+ */
+public class RocksDBKeyValueStorage implements KeyValueStorage {
+    /** Suffix for the temporary snapshot folder */
+    private static final String TMP_SUFFIX = ".tmp";
+
+    /** A revision to store with system entries. */
+    private static final long SYSTEM_REVISION_MARKER_VALUE = -1;
+
+    /** Revision key. */
+    private static final byte[] REVISION_KEY = keyToRocksKey(
+        SYSTEM_REVISION_MARKER_VALUE,
+        "SYSTEM_REVISION_KEY".getBytes(StandardCharsets.UTF_8)
+    );
+
+    /** Update counter key. */
+    private static final byte[] UPDATE_COUNTER_KEY = keyToRocksKey(
+        SYSTEM_REVISION_MARKER_VALUE,
+        "SYSTEM_UPDATE_COUNTER_KEY".getBytes(StandardCharsets.UTF_8)
+    );
+
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RockDB options. */
+    private final DBOptions options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /** Data column family. */
+    private final ColumnFamily data;
+
+    /** Index column family. */
+    private final ColumnFamily index;
+
+    /** RW lock. */
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+    /** Thread-pool for snapshot operations execution. */
+    private final ExecutorService snapshotExecutor = Executors.newFixedThreadPool(2);
+
+    /**
+     * Special value for the revision number which means that operation should be applied
+     * to the latest revision of an entry.
+     */
+    private static final long LATEST_REV = -1;
+
+    /** Lexicographic order comparator. */
+    static final Comparator<byte[]> CMP = Arrays::compare;
+
+    /** Path to the rocksdb database. */
+    private final Path dbPath;
+
+    /** Revision. Will be incremented for each single-entry or multi-entry update operation. */
+    private long rev;
+
+    /** Update counter. Will be incremented for each update of any particular entry. */
+    private long updCntr;
+
+    /**
+     * Constructor.
+     *
+     * @param dbPath RocksDB path.
+     */
+    public RocksDBKeyValueStorage(Path dbPath) {
+        try {
+            options = new DBOptions()
+                .setCreateMissingColumnFamilies(true)
+                .setCreateIfMissing(true);
+
+            this.dbPath = dbPath;
+
+            Options dataOptions = new Options().setCreateIfMissing(true)
+                // The prefix is the revision of an entry, so prefix length is the size of a long
+                .useFixedLengthPrefixExtractor(Long.BYTES);
+
+            ColumnFamilyOptions dataFamilyOptions = new ColumnFamilyOptions(dataOptions);
+
+            Options indexOptions = new Options().setCreateIfMissing(true);
+
+            ColumnFamilyOptions indexFamilyOptions = new ColumnFamilyOptions(indexOptions);
+
+            List<ColumnFamilyDescriptor> descriptors = Arrays.asList(
+                new ColumnFamilyDescriptor(DATA.nameAsBytes(), dataFamilyOptions),
+                new ColumnFamilyDescriptor(INDEX.nameAsBytes(), indexFamilyOptions)
+            );
+
+            var handles = new ArrayList<ColumnFamilyHandle>();
+
+            // Delete existing data, relying on the raft's snapshot and log playback
+            destroyRocksDB();
+
+            this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString(), descriptors, handles);
+
+            data = new ColumnFamily(db, handles.get(0), DATA, dataFamilyOptions, dataOptions);
+
+            index = new ColumnFamily(db, handles.get(1), INDEX, indexFamilyOptions, indexOptions);
+        }
+        catch (Exception e) {
+            try {
+                close();
+            }
+            catch (Exception exception) {
+                e.addSuppressed(exception);
+            }
+
+            throw new IgniteInternalException("Failed to start the storage", e);
+        }
+    }
+
+    /**
+     * Clear the RocksDB instance.
+     * The major difference with directly deleting the DB directory manually is that
+     * destroyDB() will take care of the case where the RocksDB database is stored
+     * in multiple directories. For instance, a single DB can be configured to store
+     * its data in multiple directories by specifying different paths to
+     * DBOptions::db_paths, DBOptions::db_log_dir, and DBOptions::wal_dir.
+     *
+     * @throws RocksDBException If failed.
+     */
+    private void destroyRocksDB() throws RocksDBException {
+        try (Options opt = new Options()) {
+            RocksDB.destroyDB(dbPath.toString(), opt);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, TimeUnit.SECONDS);
+
+        IgniteUtils.closeAll(data, index, db, options);
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public CompletableFuture<Void> snapshot(Path snapshotPath) {
+        Path tempPath = Paths.get(snapshotPath.toString() + TMP_SUFFIX);
+
+        // Create a RocksDB point-in-time snapshot
+        Snapshot snapshot = db.getSnapshot();
+
+        return CompletableFuture.runAsync(() -> {
+            // (Re)create the temporary directory
+            IgniteUtils.deleteIfExists(tempPath);
+
+            try {
+                Files.createDirectories(tempPath);
+            }
+            catch (IOException e) {
+                throw new IgniteInternalException("Failed to create directory: " + tempPath, e);
+            }
+        }, snapshotExecutor).thenCompose(aVoid ->
+            // Create futures for capturing SST snapshots of the column families
+            CompletableFuture.allOf(createSstFile(data, snapshot, tempPath), createSstFile(index, snapshot, tempPath)))
+        .whenComplete((aVoid, throwable) -> {
+            // Release a snapshot
+            db.releaseSnapshot(snapshot);
+
+            // Snapshot is not actually closed here, because a Snapshot instance doesn't own a pointer, the
+            // database does. Calling close to maintain the AutoCloseable semantics
+            snapshot.close();
+
+            if (throwable != null)
+                return;
+
+            // Delete snapshot directory if it already exists
+            IgniteUtils.deleteIfExists(snapshotPath);
+
+            try {
+                // Rename the temporary directory
+                Files.move(tempPath, snapshotPath);
+            }
+            catch (IOException e) {
+                throw new IgniteInternalException("Failed to rename: " + tempPath + " to " + snapshotPath, e);
+            }
+        });
+    }
+
+    /**
+     * Create an SST file for the column family.
+     *
+     * @param columnFamily Column family.
+     * @param snapshot Point-in-time snapshot.
+     * @param path Directory to put the SST file in.
+     * @return Future representing pending completion of the operation.
+     */
+    private CompletableFuture<Void> createSstFile(ColumnFamily columnFamily, Snapshot snapshot, Path path) {
+        return CompletableFuture.runAsync(() -> {
+            try (
+                EnvOptions envOptions = new EnvOptions();
+                Options options = new Options();
+                ReadOptions readOptions = new ReadOptions().setSnapshot(snapshot);
+                RocksIterator it = columnFamily.newIterator(readOptions);
+                SstFileWriter sstFileWriter = new SstFileWriter(envOptions, options)
+            ) {
+                Path sstFile = path.resolve(columnFamily.name());
+
+                sstFileWriter.open(sstFile.toString());
+
+                it.seekToFirst();
+
+                forEach(it, sstFileWriter::put);
+
+                sstFileWriter.finish();
+            }
+            catch (Throwable t) {
+                throw new IgniteInternalException("Failed to write snapshot: " + t.getMessage(), t);
+            }
+        }, snapshotExecutor);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void restoreSnapshot(Path path) {
+        rwLock.writeLock().lock();
+
+        try (IngestExternalFileOptions ingestOptions = new IngestExternalFileOptions()) {
+            for (ColumnFamily family : Arrays.asList(data, index)) {
+                Path snapshotPath = path.resolve(family.name());
+
+                if (!Files.exists(snapshotPath))
+                    throw new IgniteInternalException("Snapshot not found: " + snapshotPath);
+
+                family.ingestExternalFile(Collections.singletonList(snapshotPath.toString()), ingestOptions);
+            }
+
+            rev = bytesToLong(data.get(REVISION_KEY));
+
+            updCntr = bytesToLong(data.get(UPDATE_COUNTER_KEY));
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException("Fail to ingest sst file at path: " + path, e);
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long revision() {
+        return rev;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long updateCounter() {
+        return updCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void put(byte[] key, byte[] value) {
+        rwLock.writeLock().lock();
+
+        try (WriteBatch batch = new WriteBatch()) {
+            long curRev = rev + 1;
+
+            long cntr = updCntr + 1;
+
+            addDataToBatch(batch, key, value, curRev, cntr);
+
+            updateKeysIndex(batch, key, curRev);
+
+            fillAndWriteBatch(batch, curRev, cntr);
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Adds a revision to the keys index.
+     *
+     * @param batch Write batch.
+     * @param key Key.
+     * @param curRev New revision for key.
+     */
+    private void updateKeysIndex(WriteBatch batch, byte[] key, long curRev) {
+        try {
+            // Get the revisions current value
+            byte @Nullable [] array = index.get(key);
+
+            // Store the new value
+            index.put(batch, key, appendLong(array, curRev));
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+    }
+
+    /**
+     * Fills the batch with system values (the update counter and the revision) and writes it to the db.
+     *
+     * @param batch Write batch.
+     * @param newRev New revision.
+     * @param newCntr New update counter.
+     * @throws RocksDBException If failed.
+     */
+    private void fillAndWriteBatch(WriteBatch batch, long newRev, long newCntr) throws RocksDBException {
+        try (WriteOptions opts = new WriteOptions()) {
+            data.put(batch, UPDATE_COUNTER_KEY, longToBytes(newCntr));
+            data.put(batch, REVISION_KEY, longToBytes(newRev));
+
+            db.write(opts, batch);
+
+            rev = newRev;
+            updCntr = newCntr;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Entry getAndPut(byte[] key, byte[] value) {
+        rwLock.writeLock().lock();
+
+        try (WriteBatch batch = new WriteBatch()) {
+            long curRev = rev + 1;
+            long cntr = updCntr + 1;
+
+            long[] revs = getRevisions(key);
+
+            long lastRev = revs.length == 0 ? 0 : lastRevision(revs);
+
+            addDataToBatch(batch, key, value, curRev, cntr);
+
+            updateKeysIndex(batch, key, curRev);
+
+            fillAndWriteBatch(batch, curRev, cntr);
+
+            // Return previous value.
+            return doGetValue(key, lastRev);
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void putAll(List<byte[]> keys, List<byte[]> values) {
+        rwLock.writeLock().lock();
+
+        try (WriteBatch batch = new WriteBatch()) {
+            long curRev = rev + 1;
+
+            long counter = addAllToBatch(batch, keys, values, curRev);
+
+            for (byte[] key : keys)
+                updateKeysIndex(batch, key, curRev);
+
+            fillAndWriteBatch(batch, curRev, counter);
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
+        Collection<Entry> res;
+
+        rwLock.writeLock().lock();
+
+        try (WriteBatch batch = new WriteBatch()) {
+            long curRev = rev + 1;
+
+            res = doGetAll(keys, curRev);
+
+            long counter = addAllToBatch(batch, keys, values, curRev);
+
+            for (byte[] key : keys)
+                updateKeysIndex(batch, key, curRev);
+
+            fillAndWriteBatch(batch, curRev, counter);
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Entry get(byte[] key) {
+        rwLock.readLock().lock();
+
+        try {
+            return doGet(key, LATEST_REV, false);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Entry get(byte[] key, long rev) {
+        rwLock.readLock().lock();
+
+        try {
+            return doGet(key, rev, true);
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Collection<Entry> getAll(List<byte[]> keys) {
+        return doGetAll(keys, LATEST_REV);
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Collection<Entry> getAll(List<byte[]> keys, long revUpperBound) {
+        return doGetAll(keys, revUpperBound);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove(byte[] key) {
+        rwLock.writeLock().lock();
+
+        try (WriteBatch batch = new WriteBatch()) {
+            long curRev = rev + 1;
+            long counter = updCntr + 1;
+
+            if (addToBatchForRemoval(batch, key, curRev, counter)) {
+                updateKeysIndex(batch, key, curRev);
+
+                fillAndWriteBatch(batch, curRev, counter);
+            }
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Entry getAndRemove(byte[] key) {
+        rwLock.writeLock().lock();
+
+        try {
+            Entry e = doGet(key, LATEST_REV, false);
+
+            if (e.empty() || e.tombstone())
+                return e;
+
+            return getAndPut(key, TOMBSTONE);
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeAll(List<byte[]> keys) {
+        rwLock.writeLock().lock();
+
+        try (WriteBatch batch = new WriteBatch()) {
+            long curRev = rev + 1;
+
+            List<byte[]> existingKeys = new ArrayList<>(keys.size());
+
+            long counter = updCntr;
+
+            for (byte[] key : keys) {
+                if (addToBatchForRemoval(batch, key, curRev, counter + 1)) {
+                    existingKeys.add(key);
+
+                    counter++;
+                }
+            }
+
+            for (byte[] key : existingKeys)
+                updateKeysIndex(batch, key, curRev);
+
+            fillAndWriteBatch(batch, curRev, counter);
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Collection<Entry> getAndRemoveAll(List<byte[]> keys) {
+        Collection<Entry> res = new ArrayList<>(keys.size());
+
+        rwLock.writeLock().lock();
+
+        try (WriteBatch batch = new WriteBatch()) {
+            long curRev = rev + 1;
+
+            List<byte[]> existingKeys = new ArrayList<>(keys.size());
+
+            List<byte[]> vals = new ArrayList<>(keys.size());
+
+            for (byte[] key : keys) {
+                Entry e = doGet(key, LATEST_REV, false);
+
+                res.add(e);
+
+                if (e.empty() || e.tombstone())
+                    continue;
+
+                existingKeys.add(key);
+
+                vals.add(TOMBSTONE);
+            }
+
+            long counter = addAllToBatch(batch, existingKeys, vals, curRev);
+
+            for (byte[] key : existingKeys)
+                updateKeysIndex(batch, key, curRev);
+
+            fillAndWriteBatch(batch, curRev, counter);
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure) {
+        rwLock.writeLock().lock();
+
+        try (WriteBatch batch = new WriteBatch()) {
+            Entry e = get(condition.key());
+
+            boolean branch = condition.test(e);
+
+            Collection<Operation> ops = branch ? success : failure;
+
+            long curRev = rev + 1;
+
+            boolean modified = false;
+
+            long counter = updCntr;
+
+            List<byte[]> updatedKeys = new ArrayList<>();
+
+            for (Operation op : ops) {
+                byte[] key = op.key();
+
+                switch (op.type()) {
+                    case PUT:
+                        counter++;
+
+                        addDataToBatch(batch, key, op.value(), curRev, counter);
+
+                        updatedKeys.add(key);
+
+                        modified = true;
+
+                        break;
+
+                    case REMOVE:
+                        counter++;
+
+                        boolean removed = addToBatchForRemoval(batch, key, curRev, counter);
+
+                        if (!removed)
+                            counter--;
+                        else
+                            updatedKeys.add(key);
+
+                        modified |= removed;
+
+                        break;
+
+                    case NO_OP:
+                        break;
+
+                    default:
+                        throw new IllegalArgumentException("Unknown operation type: " + op.type());
+                }
+            }
+
+            if (modified) {
+                for (byte[] key : updatedKeys)
+                    updateKeysIndex(batch, key, curRev);
+
+                fillAndWriteBatch(batch, curRev, counter);
+            }
+
+            return branch;
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) {
+        return new RangeCursor(this, keyFrom, keyTo, rev);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
+        return new RangeCursor(this, keyFrom, keyTo, revUpperBound);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte @Nullable [] keyTo, long rev) {
+        assert keyFrom != null : "keyFrom couldn't be null.";
+        assert rev > 0 : "rev must be positive.";
+
+        return new WatchCursor(this, rev, k ->
+            CMP.compare(keyFrom, k) <= 0 && (keyTo == null || CMP.compare(k, keyTo) < 0)
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<WatchEvent> watch(byte[] key, long rev) {
+        assert key != null : "key couldn't be null.";
+        assert rev > 0 : "rev must be positive.";
+
+        return new WatchCursor(this, rev, k -> CMP.compare(k, key) == 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev) {
+        assert keys != null && !keys.isEmpty() : "keys couldn't be null or empty: " + keys;
+        assert rev > 0 : "rev must be positive.";
+
+        TreeSet<byte[]> keySet = new TreeSet<>(CMP);
+
+        keySet.addAll(keys);
+
+        return new WatchCursor(this, rev, keySet::contains);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void compact() {
+        rwLock.writeLock().lock();
+
+        try (WriteBatch batch = new WriteBatch()) {
+            try (RocksIterator iterator = index.newIterator()) {
+                iterator.seekToFirst();
+
+                forEach(iterator, (key, value) -> compactForKey(batch, key, getAsLongs(value)));
+            }
+
+            fillAndWriteBatch(batch, rev, updCntr);
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+        finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Adds a key to a batch marking the value as a tombstone.
+     *
+     * @param batch Write batch.
+     * @param key Target key.
+     * @param curRev Revision.
+     * @param counter Update counter.
+     * @return {@code true} if an entry can be deleted.
+     * @throws RocksDBException If failed.
+     */
+    private boolean addToBatchForRemoval(WriteBatch batch, byte[] key, long curRev, long counter) throws RocksDBException {
+        Entry e = doGet(key, LATEST_REV, false);
+
+        if (e.empty() || e.tombstone())
+            return false;
+
+        addDataToBatch(batch, key, TOMBSTONE, curRev, counter);
+
+        return true;
+    }
+
+    /**
+     * Compacts all entries by the given key, removing all previous revisions and deleting the last entry if it is
+     * a tombstone.
+     *
+     * @param batch Write batch.
+     * @param key Target key.
+     * @param revs Revisions.
+     * @throws RocksDBException If failed.
+     */
+    private void compactForKey(WriteBatch batch, byte[] key, long[] revs) throws RocksDBException {
+        long lastRev = lastRevision(revs);
+
+        for (int i = 0; i < revs.length - 1; i++)
+            data.delete(batch, keyToRocksKey(revs[i], key));
+
+        byte[] rocksKey = keyToRocksKey(lastRev, key);
+
+        Value value = bytesToValue(data.get(rocksKey));
+
+        if (value.tombstone()) {
+            index.delete(batch, rocksKey);
+
+            index.delete(batch, key);
+        }
+        else
+            index.put(batch, key, longToBytes(lastRev));
+    }
+
+    /**
+     * Gets all entries with given keys and a revision.
+     *
+     * @param keys Target keys.
+     * @param rev Target revision.
+     * @return Collection of entries.
+     */
+    @NotNull
+    private Collection<Entry> doGetAll(Collection<byte[]> keys, long rev) {
+        assert keys != null : "keys list can't be null.";
+        assert !keys.isEmpty() : "keys list can't be empty.";
+        assert rev > 0 || rev == LATEST_REV : "Revision must be positive or " + LATEST_REV + '.';
+
+        Collection<Entry> res = new ArrayList<>(keys.size());
+
+        rwLock.readLock().lock();
+
+        try {
+            for (byte[] key : keys)
+                res.add(doGet(key, rev, false));
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+
+        return res;
+    }
+
+    /**
+     * Gets the value by key and revision.
+     *
+     * @param key Target key.
+     * @param rev Target revision.
+     * @param exactRev {@code true} if searching for exact revision, {@code false} if rev is an upper bound (inclusive).
+     * @return Value.
+     */
+    @NotNull
+    Entry doGet(byte[] key, long rev, boolean exactRev) {
+        assert rev == LATEST_REV && !exactRev || rev > LATEST_REV :
+            "Invalid arguments: [rev=" + rev + ", exactRev=" + exactRev + ']';
+
+        long[] revs;
+        try {
+            revs = getRevisions(key);
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        if (revs == null || revs.length == 0)
+            return Entry.empty(key);
+
+        long lastRev;
+
+        if (rev == LATEST_REV)
+            lastRev = lastRevision(revs);
+        else
+            lastRev = exactRev ? rev : maxRevision(revs, rev);
+
+        // lastRev can be -1 if maxRevision return -1.
+        if (lastRev == -1)
+            return Entry.empty(key);
+
+        return doGetValue(key, lastRev);
+    }
+
+    /**
+     * Get a list of the revisions of the entry corresponding to the key.
+     *
+     * @param key Key.
+     * @return List of the revisions.
+     * @throws RocksDBException If failed to perform {@link RocksDB#get(ColumnFamilyHandle, byte[])}.
+     */
+    private long[] getRevisions(byte[] key) throws RocksDBException {
+        byte[] revisions = index.get(key);
+
+        if (revisions == null)
+            return LONG_EMPTY_ARRAY;
+
+        return getAsLongs(revisions);
+    }
+
+    /**
+     * Returns maximum revision which must be less or equal to {@code upperBoundRev}. If there is no such revision then
+     * {@code -1} will be returned.
+     *
+     * @param revs Revisions list.
+     * @param upperBoundRev Revision upper bound.
+     * @return Maximum revision or {@code -1} if there is no such revision.
+     */
+    static long maxRevision(long[] revs, long upperBoundRev) {
+        for (int i = revs.length - 1; i >= 0; i--) {
+            long rev = revs[i];
+
+            if (rev <= upperBoundRev)
+                return rev;
+        }
+
+        return -1;
+    }
+
+    /**
+     * Gets the value by a key and a revision.
+     *
+     * @param key Target key.
+     * @param revision Target revision.
+     * @return Entry.
+     */
+    @NotNull
+    Entry doGetValue(byte[] key, long revision) {
+        if (revision == 0)
+            return Entry.empty(key);
+
+        byte[] valueBytes;
+
+        try {
+            valueBytes = data.get(keyToRocksKey(revision, key));
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        if (valueBytes == null || valueBytes.length == 0)
+            return Entry.empty(key);
+
+        Value lastVal = bytesToValue(valueBytes);
+
+        if (lastVal.tombstone())
+            return Entry.tombstone(key, revision, lastVal.updateCounter());
+
+        return new Entry(key, lastVal.bytes(), revision, lastVal.updateCounter());
+    }
+
+    /**
+     * Adds an entry to the batch.
+     *
+     * @param batch Write batch.
+     * @param key Key.
+     * @param value Value.
+     * @param curRev Revision.
+     * @param cntr Update counter.
+     * @throws RocksDBException If failed.
+     */
+    private void addDataToBatch(WriteBatch batch, byte[] key, byte[] value, long curRev, long cntr) throws RocksDBException {
+        byte[] rocksKey = keyToRocksKey(curRev, key);
+
+        byte[] rocksValue = valueToBytes(value, cntr);
+
+        data.put(batch, rocksKey, rocksValue);
+    }
+
+    /**
+     * Adds all entries to the batch.
+     *
+     * @param batch Write batch.
+     * @param keys Keys.
+     * @param values Values.
+     * @param curRev Revision.
+     * @return New update counter value.
+     * @throws RocksDBException If failed.
+     */
+    private long addAllToBatch(WriteBatch batch, List<byte[]> keys, List<byte[]> values, long curRev) throws RocksDBException {
+        long counter = this.updCntr;
+
+        for (int i = 0; i < keys.size(); i++) {
+            counter++;
+
+            byte[] key = keys.get(i);
+
+            byte[] bytes = values.get(i);
+
+            addDataToBatch(batch, key, bytes, curRev, counter);
+        }
+
+        return counter;
+    }
+
+    /**
+     * Gets an entry from the keys index with the least key greater than or equal to the specified key.
+     *
+     * @param keyFrom Key.
+     * @return Higher or equal entry. Returns {@code null} if no such entry exists.
+     */
+    @Nullable
+    Map.Entry<byte[], long[]> revisionCeilingEntry(byte[] keyFrom) {
+        return higherOrCeiling(keyFrom, false);
+    }
+
+    /**
+     * Gets an entry from the keys index with the least key greater than the specified key.
+     *
+     * @param key Key.
+     * @return Higher entry or {@code null} if no such entry exists.
+     */
+    @Nullable
+    Map.Entry<byte[], long[]> revisionHigherEntry(byte[] key) {
+        return higherOrCeiling(key, true);
+    }
+
+    /**
+     * Gets an entry from the keys index with the least key greater than or equal to the specified key,
+     * depending on the strictlyHigher parameter.
+     *
+     * @param key
+     * @param strictlyHigher {@code true} for a strictly higher entry, {@code false} for a ceiling one.
+     * @return Entry for the least key greater than or equal to the specified key. If no such entry
+     * exists returns {@code null}.
+     */
+    @Nullable
+    private IgniteBiTuple<byte[], long[]> higherOrCeiling(byte[] key, boolean strictlyHigher) {
+        try (RocksIterator iterator = index.newIterator()) {
+            iterator.seek(key);
+
+            RocksStorageUtils.RocksBiPredicate predicate = strictlyHigher ?
+                (k, v) -> CMP.compare(k, key) > 0 :
+                (k, v) -> CMP.compare(k, key) >= 0;
+
+            boolean found = find(iterator, predicate);
+
+            if (!found)
+                return null;
+
+            return new IgniteBiTuple<>(iterator.key(), getAsLongs(iterator.value()));
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+    }
+
+    /**
+     * Creates a new iterator over the {@link StorageColumnFamilyType#DATA} column family.
+     *
+     * @param options Read options.
+     * @return Iterator.
+     */
+    public RocksIterator newDataIterator(ReadOptions options) {
+        return data.newIterator(options);
+    }
+
+    /**
+     * Gets last revision from the list.
+     *
+     * @param revs Revisions.
+     * @return Last revision.
+     */
+    private static long lastRevision(long[] revs) {
+        return revs[revs.length - 1];
+    }
+
+    /**
+     * @return Database lock
+     */
+    ReadWriteLock lock() {
+        return rwLock;
+    }
+
+    /**
+     * @return Database.
+     */
+    RocksDB db() {
+        return db;
+    }
+
+    /** */
+    @TestOnly
+    public Path getDbPath() {
+        return dbPath;
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
new file mode 100644
index 0000000..54d13d7
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksStorageUtils.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.metastorage.server.Value;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
+
+/**
+ * Utility class for {@link RocksDBKeyValueStorage}.
+ */
+class RocksStorageUtils {
+    /**
+     * VarHandle that gives the access to the elements of a {@code byte[]} array viewed as if it
+     * were a {@code long[]} array. Byte order must be little endian for a correct
+     * lexicographic order comparison.
+     */
+    private static final VarHandle LONG_ARRAY_HANDLE = MethodHandles.byteArrayViewVarHandle(
+        long[].class,
+        ByteOrder.LITTLE_ENDIAN
+    );
+
+    /**
+     * Converts a long value to a byte array.
+     *
+     * @param value Value.
+     * @return Byte array.
+     */
+    static byte[] longToBytes(long value) {
+        var buffer = new byte[Long.BYTES];
+
+        LONG_ARRAY_HANDLE.set(buffer, 0, value);
+
+        return buffer;
+    }
+
+    /**
+     * Converts a byte array to a long value.
+     *
+     * @param array Byte array.
+     * @return Long value.
+     */
+    static long bytesToLong(byte[] array) {
+        assert array.length == Long.BYTES;
+
+        return (long) LONG_ARRAY_HANDLE.get(array, 0);
+    }
+
+    /**
+     * Adds a revision to a key.
+     *
+     * @param revision Revision.
+     * @param key Key.
+     * @return Key with a revision.
+     */
+    static byte[] keyToRocksKey(long revision, byte[] key) {
+        var buffer = new byte[Long.BYTES + key.length];
+
+        LONG_ARRAY_HANDLE.set(buffer, 0, revision);
+
+        System.arraycopy(key, 0, buffer, Long.BYTES, key.length);
+
+        return buffer;
+    }
+
+    /**
+     * Gets a key from a key with revision.
+     *
+     * @param rocksKey Key with a revision.
+     * @return Key without a revision.
+     */
+    static byte[] rocksKeyToBytes(byte[] rocksKey) {
+        // Copy bytes of the rocks key ignoring the revision (first 8 bytes)
+        return Arrays.copyOfRange(rocksKey, Long.BYTES, rocksKey.length);
+    }
+
+    /**
+     * Builds a value from a byte array.
+     *
+     * @param valueBytes Value byte array.
+     * @return Value.
+     */
+    static Value bytesToValue(byte[] valueBytes) {
+        // At least an 8-byte update counter and a 1-byte boolean
+        assert valueBytes.length > Long.BYTES;
+
+        // Read an update counter (8-byte long) from the entry.
+        long updateCounter = (long) LONG_ARRAY_HANDLE.get(valueBytes, 0);
+
+        // Read a has-value flag (1 byte) from the entry.
+        boolean hasValue = valueBytes[Long.BYTES] != 0;
+
+        byte[] val;
+        if (hasValue)
+            // Copy the value.
+            val = Arrays.copyOfRange(valueBytes, Long.BYTES + 1, valueBytes.length);
+        else
+            // There is no value, mark it as a tombstone.
+            val = TOMBSTONE;
+
+        return new Value(val, updateCounter);
+    }
+
+    /**
+     * Adds an update counter and a tombstone flag to a value.
+     * @param value Value byte array.
+     * @param updateCounter Update counter.
+     * @return Value with an update counter and a tombstone.
+     */
+    static byte[] valueToBytes(byte[] value, long updateCounter) {
+        var bytes = new byte[Long.BYTES + Byte.BYTES + value.length];
+
+        LONG_ARRAY_HANDLE.set(bytes, 0, updateCounter);
+
+        bytes[Long.BYTES] = (byte) (value == TOMBSTONE ? 0 : 1);
+
+        System.arraycopy(value, 0, bytes, Long.BYTES + Byte.BYTES, value.length);
+
+        return bytes;
+    }
+
+    /**
+     * Gets an array of longs from the byte array of longs.
+     *
+     * @param bytes Byte array of longs.
+     * @return Array of longs.
+     */
+    @NotNull
+    static long[] getAsLongs(byte[] bytes) {
+        // Value must be divisible by a size of a long, because it's a list of longs
+        assert (bytes.length % Long.BYTES) == 0;
+
+        return IntStream.range(0, bytes.length / Long.BYTES)
+            .mapToLong(i -> (long) LONG_ARRAY_HANDLE.get(bytes, i * Long.BYTES))
+            .toArray();
+    }
+
+    /**
+     * Add a long value to an array of longs that is represented by an array of bytes.
+     *
+     * @param bytes Byte array that represents an array of longs.
+     * @param value New long value.
+     * @return Byte array with a new value.
+     */
+    static byte @NotNull [] appendLong(byte @Nullable [] bytes, long value) {
+        if (bytes == null)
+            return longToBytes(value);
+
+        // Allocate a one long size bigger array
+        var result = new byte[bytes.length + Long.BYTES];
+
+        // Copy the current value
+        System.arraycopy(bytes, 0, result, 0, bytes.length);
+
+        LONG_ARRAY_HANDLE.set(result, bytes.length, value);
+
+        return result;
+    }
+
+    /**
+     * Iterates over the given iterator passing key-value pairs to the given consumer and
+     * checks the iterator's status afterwards.
+     *
+     * @param iterator Iterator.
+     * @param consumer Consumer of key-value pairs.
+     * @throws RocksDBException If failed.
+     */
+    static void forEach(RocksIterator iterator, RocksBiConsumer consumer) throws RocksDBException {
+        for (; iterator.isValid(); iterator.next())
+            consumer.accept(iterator.key(), iterator.value());
+
+        checkIterator(iterator);
+    }
+
+    /**
+     * Iterates over the given iterator testing key-value pairs with the given predicate and checks
+     * the iterator's status afterwards.
+     *
+     * @param iterator Iterator.
+     * @param consumer Consumer of key-value pairs.
+     * @return {@code true} if a matching key-value pair has been found, {@code false} otherwise.
+     * @throws RocksDBException If failed.
+     */
+    static boolean find(RocksIterator iterator, RocksBiPredicate consumer) throws RocksDBException {
+        for (; iterator.isValid(); iterator.next()) {
+            boolean result = consumer.test(iterator.key(), iterator.value());
+
+            if (result)
+                return true;
+        }
+
+        checkIterator(iterator);
+
+        return false;
+    }
+
+    /**
+     * Checks the status of the iterator and throws an exception if it is not correct.
+     *
+     * @param it RocksDB iterator.
+     * @throws IgniteInternalException if the iterator has an incorrect status.
+     */
+    static void checkIterator(RocksIterator it) {
+        try {
+            it.status();
+        }
+        catch (RocksDBException e) {
+            throw new IgniteInternalException(e);
+        }
+    }
+
+    /**
+     * BiConsumer that can throw {@link RocksDBException}.
+     */
+    @FunctionalInterface
+    interface RocksBiConsumer {
+        /**
+         * Accepts the key and the value of the entry.
+         *
+         * @param key Key.
+         * @param value Value.
+         * @throws RocksDBException If failed to process the key-value pair.
+         */
+        void accept(byte[] key, byte[] value) throws RocksDBException;
+    }
+
+    /**
+     * BiPredicate that can throw {@link RocksDBException}.
+     */
+    @FunctionalInterface
+    interface RocksBiPredicate {
+        /**
+         * Evaluates the predicate on the given key and the given value.
+         *
+         * @param key Key.
+         * @param value Value.
+         * @return {@code true} if the input argument matches the predicate, otherwise {@code false}.
+         * @throws RocksDBException If failed to test the key-value pair.
+         */
+        boolean test(byte[] key, byte[] value) throws RocksDBException;
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
new file mode 100644
index 0000000..852bf02
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/StorageColumnFamilyType.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.nio.charset.StandardCharsets;
+import org.rocksdb.RocksDB;
+
+/**
+ * A type of the column family.
+ */
+enum StorageColumnFamilyType {
+    /** Column family for the data. */
+    DATA(RocksDB.DEFAULT_COLUMN_FAMILY),
+
+    /** Column family for the index. Index is a mapping from entry key to a list of revisions of the storage. */
+    INDEX("INDEX".getBytes(StandardCharsets.UTF_8));
+
+    /** Byte representation of the column family's name. */
+    private final byte[] nameAsBytes;
+
+    /**
+     * Constructor.
+     *
+     * @param bytes Column family name's bytes.
+     */
+    StorageColumnFamilyType(byte[] bytes) {
+        nameAsBytes = bytes;
+    }
+
+    /**
+     * Gets column family name's bytes.
+     *
+     * @return Column family name's bytes.
+     */
+    public byte[] nameAsBytes() {
+        return nameAsBytes;
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
new file mode 100644
index 0000000..90ff5f6
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.EntryEvent;
+import org.apache.ignite.internal.metastorage.server.Value;
+import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToValue;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.checkIterator;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longToBytes;
+import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.rocksKeyToBytes;
+
+/**
+ * Subscription on updates of entries corresponding to the given keys range (where the upper bound is unlimited)
+ * and starting from the given revision number.
+ */
+class WatchCursor implements Cursor<WatchEvent> {
+    /** Storage. */
+    private final RocksDBKeyValueStorage storage;
+
+    /** Key predicate. */
+    private final Predicate<byte[]> p;
+
+    /** Iterator for this cursor. */
+    private final Iterator<WatchEvent> it;
+
+    /** Options for {@link #nativeIterator}. */
+    private final ReadOptions options = new ReadOptions().setPrefixSameAsStart(true);
+
+    /** RocksDB iterator. */
+    private final RocksIterator nativeIterator;
+
+    /**
+     * Last matching revision.
+     */
+    private long lastRetRev;
+
+    /**
+     * Next matching revision. {@code -1} means that it has not been found yet or does not exist.
+     */
+    private long nextRetRev = -1;
+
+    /**
+     * Constructor.
+     *
+     * @param storage Storage.
+     * @param rev Starting revision.
+     * @param p Key predicate.
+     */
+    WatchCursor(RocksDBKeyValueStorage storage, long rev, Predicate<byte[]> p) {
+        this.storage = storage;
+        this.p = p;
+        this.lastRetRev = rev - 1;
+        this.nativeIterator = storage.newDataIterator(options);
+        this.it = createIterator();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() {
+        return it.hasNext();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable
+    @Override public WatchEvent next() {
+        return it.next();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        IgniteUtils.closeAll(options, nativeIterator);
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Iterator<WatchEvent> iterator() {
+        return it;
+    }
+
+    /**
+     * Creates an iterator for this cursor.
+     *
+     * @return Iterator.
+     */
+    @NotNull
+    private Iterator<WatchEvent> createIterator() {
+        return new Iterator<>() {
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                storage.lock().readLock().lock();
+
+                try {
+                    if (nextRetRev != -1)
+                        // Next revision is already calculated and is not -1, meaning that there is a set of keys
+                        // matching the revision and the predicate.
+                        return true;
+
+                    while (true) {
+                        long curRev = lastRetRev + 1;
+
+                        byte[] revisionPrefix = longToBytes(curRev);
+
+                        boolean empty = true;
+
+                        if (!nativeIterator.isValid()) {
+                            try {
+                                nativeIterator.refresh();
+                            }
+                            catch (RocksDBException e) {
+                                throw new IgniteInternalException(e);
+                            }
+                        }
+
+                        // Check all keys by the revision to see if any one of them match the predicate.
+                        for (nativeIterator.seek(revisionPrefix); nativeIterator.isValid(); nativeIterator.next()) {
+                            empty = false;
+
+                            byte[] key = rocksKeyToBytes(nativeIterator.key());
+
+                            if (p.test(key)) {
+                                // Current revision matches.
+                                nextRetRev = curRev;
+
+                                return true;
+                            }
+                        }
+
+                        checkIterator(nativeIterator);
+
+                        if (empty)
+                            return false;
+
+                        // Go to the next revision.
+                        lastRetRev++;
+                    }
+                }
+                finally {
+                    storage.lock().readLock().unlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Nullable
+            @Override public WatchEvent next() {
+                storage.lock().readLock().lock();
+
+                try {
+                    while (true) {
+                        if (!hasNext())
+                            return null;
+
+                        var ref = new Object() {
+                            boolean noItemsInRevision = true;
+                        };
+
+                        List<EntryEvent> evts = new ArrayList<>();
+
+                        // Iterate over the keys of the current revision and get all matching entries.
+                        RocksStorageUtils.forEach(nativeIterator, (k, v) -> {
+                            ref.noItemsInRevision = false;
+
+                            byte[] key = rocksKeyToBytes(k);
+
+                            Value val = bytesToValue(v);
+
+                            if (p.test(key)) {
+                                Entry newEntry;
+
+                                if (val.tombstone())
+                                    newEntry = Entry.tombstone(key, nextRetRev, val.updateCounter());
+                                else
+                                    newEntry = new Entry(key, val.bytes(), nextRetRev, val.updateCounter());
+
+                                Entry oldEntry = storage.doGet(key, nextRetRev - 1, false);
+
+                                evts.add(new EntryEvent(oldEntry, newEntry));
+                            }
+                        });
+
+                        if (ref.noItemsInRevision)
+                            return null;
+
+                        if (evts.isEmpty())
+                            continue;
+
+                        // Set the last returned revision to the current revision's value.
+                        lastRetRev = nextRetRev;
+
+                        // Set current revision to -1, meaning that it is not found yet.
+                        nextRetRev = -1;
+
+                        return new WatchEvent(evts);
+                    }
+                }
+                catch (RocksDBException e) {
+                    throw new IgniteInternalException(e);
+                }
+                finally {
+                    storage.lock().readLock().unlock();
+                }
+            }
+        };
+    }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index 38f76da..55a7b44 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.metastorage.server.raft;
 
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -65,6 +66,7 @@ import org.apache.ignite.raft.client.ReadCommand;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.apache.ignite.raft.client.service.CommandClosure;
 import org.apache.ignite.raft.client.service.RaftGroupListener;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Meta storage listener.
@@ -314,14 +316,34 @@ public class MetaStorageListener implements RaftGroupListener {
     }
 
     /** {@inheritDoc} */
-    @Override public void onSnapshotSave(String path, Consumer<Throwable> doneClo) {
-        // Not implemented yet.
+    @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
+        storage.snapshot(path).whenComplete((unused, throwable) -> {
+            doneClo.accept(throwable);
+        });
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onSnapshotLoad(String path) {
-        // Not implemented yet.
-        return false;
+    @Override public boolean onSnapshotLoad(Path path) {
+        storage.restoreSnapshot(path);
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onShutdown() {
+        try {
+            storage.close();
+        }
+        catch (Exception e) {
+            throw new IgniteInternalException("Failed to close storage: " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * @return {@link KeyValueStorage} that is backing this listener.
+     */
+    @TestOnly
+    public KeyValueStorage getStorage() {
+        return storage;
     }
 
     /** */
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
similarity index 92%
copy from modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
copy to modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
index 508424f..24c7253 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
@@ -26,8 +26,10 @@ import java.util.stream.Collectors;
 import org.apache.ignite.internal.metastorage.common.OperationType;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.ByteArray;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+
 import static java.util.function.Function.identity;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -38,17 +40,27 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /**
- * Tests for in-memory meta storage implementation.
+ * Tests for key-value storage implementations.
  */
-class SimpleInMemoryKeyValueStorageTest {
+public abstract class AbstractKeyValueStorageTest {
     /** */
     private KeyValueStorage storage;
 
     @BeforeEach
     public void setUp() {
-        storage = new SimpleInMemoryKeyValueStorage();
+        storage = storage();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        storage.close();
     }
 
+    /**
+     * @return Key value storage for this test.
+     */
+    abstract KeyValueStorage storage();
+
     @Test
     public void put() {
         byte[] key = k(1);
@@ -967,12 +979,12 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(1, storage.updateCounter());
 
         boolean branch = storage.invoke(
-                new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 1),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                ),
-                List.of(new Operation(OperationType.PUT, key3, val3))
+            new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 1),
+            List.of(
+                new Operation(OperationType.PUT, key1, val1_2),
+                new Operation(OperationType.PUT, key2, val2)
+            ),
+            List.of(new Operation(OperationType.PUT, key3, val3))
         );
 
         // "Success" branch is applied.
@@ -1023,12 +1035,12 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(1, storage.updateCounter());
 
         boolean branch = storage.invoke(
-                new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 2),
-                List.of(new Operation(OperationType.PUT, key3, val3)),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                )
+            new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 2),
+            List.of(new Operation(OperationType.PUT, key3, val3)),
+            List.of(
+                new Operation(OperationType.PUT, key1, val1_2),
+                new Operation(OperationType.PUT, key2, val2)
+            )
         );
 
         // "Failure" branch is applied.
@@ -1079,12 +1091,12 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(1, storage.updateCounter());
 
         boolean branch = storage.invoke(
-                new ExistenceCondition(ExistenceCondition.Type.EXISTS, key1),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                ),
-                List.of(new Operation(OperationType.PUT, key3, val3))
+            new ExistenceCondition(ExistenceCondition.Type.EXISTS, key1),
+            List.of(
+                new Operation(OperationType.PUT, key1, val1_2),
+                new Operation(OperationType.PUT, key2, val2)
+            ),
+            List.of(new Operation(OperationType.PUT, key3, val3))
         );
 
         // "Success" branch is applied.
@@ -1135,12 +1147,12 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(1, storage.updateCounter());
 
         boolean branch = storage.invoke(
-                new ExistenceCondition(ExistenceCondition.Type.EXISTS, key3),
-                List.of(new Operation(OperationType.PUT, key3, val3)),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                )
+            new ExistenceCondition(ExistenceCondition.Type.EXISTS, key3),
+            List.of(new Operation(OperationType.PUT, key3, val3)),
+            List.of(
+                new Operation(OperationType.PUT, key1, val1_2),
+                new Operation(OperationType.PUT, key2, val2)
+            )
         );
 
         // "Failure" branch is applied.
@@ -1191,12 +1203,12 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(1, storage.updateCounter());
 
         boolean branch = storage.invoke(
-                new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key2),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                ),
-                List.of(new Operation(OperationType.PUT, key3, val3))
+            new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key2),
+            List.of(
+                new Operation(OperationType.PUT, key1, val1_2),
+                new Operation(OperationType.PUT, key2, val2)
+            ),
+            List.of(new Operation(OperationType.PUT, key3, val3))
         );
 
         // "Success" branch is applied.
@@ -1247,12 +1259,12 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(1, storage.updateCounter());
 
         boolean branch = storage.invoke(
-                new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key1),
-                List.of(new Operation(OperationType.PUT, key3, val3)),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                )
+            new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key1),
+            List.of(new Operation(OperationType.PUT, key3, val3)),
+            List.of(
+                new Operation(OperationType.PUT, key1, val1_2),
+                new Operation(OperationType.PUT, key2, val2)
+            )
         );
 
         // "Failure" branch is applied.
@@ -1303,9 +1315,9 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(2, storage.updateCounter());
 
         boolean branch = storage.invoke(
-                new TombstoneCondition(key1),
-                List.of(new Operation(OperationType.PUT, key2, val2)),
-                List.of(new Operation(OperationType.PUT, key3, val3))
+            new TombstoneCondition(key1),
+            List.of(new Operation(OperationType.PUT, key2, val2)),
+            List.of(new Operation(OperationType.PUT, key3, val3))
         );
 
         // "Success" branch is applied.
@@ -1355,9 +1367,9 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(1, storage.updateCounter());
 
         boolean branch = storage.invoke(
-                new TombstoneCondition(key1),
-                List.of(new Operation(OperationType.PUT, key2, val2)),
-                List.of(new Operation(OperationType.PUT, key3, val3))
+            new TombstoneCondition(key1),
+            List.of(new Operation(OperationType.PUT, key2, val2)),
+            List.of(new Operation(OperationType.PUT, key3, val3))
         );
 
         // "Failure" branch is applied.
@@ -1408,12 +1420,12 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(1, storage.updateCounter());
 
         boolean branch = storage.invoke(
-                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_1),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                ),
-                List.of(new Operation(OperationType.PUT, key3, val3))
+            new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_1),
+            List.of(
+                new Operation(OperationType.PUT, key1, val1_2),
+                new Operation(OperationType.PUT, key2, val2)
+            ),
+            List.of(new Operation(OperationType.PUT, key3, val3))
         );
 
         // "Success" branch is applied.
@@ -1464,12 +1476,12 @@ class SimpleInMemoryKeyValueStorageTest {
         assertEquals(1, storage.updateCounter());
 
         boolean branch = storage.invoke(
-                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_2),
-                List.of(new Operation(OperationType.PUT, key3, val3)),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                )
+            new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_2),
+            List.of(new Operation(OperationType.PUT, key3, val3)),
+            List.of(
+                new Operation(OperationType.PUT, key1, val1_2),
+                new Operation(OperationType.PUT, key2, val2)
+            )
         );
 
         // "Failure" branch is applied.
@@ -1520,9 +1532,9 @@ class SimpleInMemoryKeyValueStorageTest {
 
         // No-op.
         boolean branch = storage.invoke(
-                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
-                List.of(new Operation(OperationType.NO_OP, null, null)),
-                List.of(new Operation(OperationType.NO_OP, null, null))
+            new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
+            List.of(new Operation(OperationType.NO_OP, null, null)),
+            List.of(new Operation(OperationType.NO_OP, null, null))
         );
 
         assertTrue(branch);
@@ -1533,12 +1545,12 @@ class SimpleInMemoryKeyValueStorageTest {
 
         // Put.
         branch = storage.invoke(
-                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
-                List.of(
-                        new Operation(OperationType.PUT, key2, val2),
-                        new Operation(OperationType.PUT, key3, val3)
-                ),
-                List.of(new Operation(OperationType.NO_OP, null, null))
+            new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
+            List.of(
+                new Operation(OperationType.PUT, key2, val2),
+                new Operation(OperationType.PUT, key3, val3)
+            ),
+            List.of(new Operation(OperationType.NO_OP, null, null))
         );
 
         assertTrue(branch);
@@ -1567,12 +1579,12 @@ class SimpleInMemoryKeyValueStorageTest {
 
         // Remove.
         branch = storage.invoke(
-                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
-                List.of(
-                        new Operation(OperationType.REMOVE, key2, null),
-                        new Operation(OperationType.REMOVE, key3, null)
-                ),
-                List.of(new Operation(OperationType.NO_OP, null, null))
+            new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
+            List.of(
+                new Operation(OperationType.REMOVE, key2, null),
+                new Operation(OperationType.REMOVE, key3, null)
+            ),
+            List.of(new Operation(OperationType.NO_OP, null, null))
         );
 
         assertTrue(branch);
@@ -1773,7 +1785,7 @@ class SimpleInMemoryKeyValueStorageTest {
     }
 
     @Test
-    public void watchCursorForRange() {
+    public void watchCursorForRange() throws Exception {
         byte[] key1 = k(1);
         byte[] val1_1 = kv(1, 11);
 
@@ -1817,7 +1829,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertFalse(watchEvent.single());
 
         Map<ByteArray, EntryEvent> map = watchEvent.entryEvents().stream()
-                .collect(Collectors.toMap(evt -> new ByteArray(evt.entry().key()), identity()));
+            .collect(Collectors.toMap(evt -> new ByteArray(evt.entry().key()), identity()));
 
         assertEquals(2, map.size());
 
@@ -1895,6 +1907,8 @@ class SimpleInMemoryKeyValueStorageTest {
         assertNull(newEntry1.value());
 
         assertFalse(it.hasNext());
+
+        cur.close();
     }
 
     @Test
@@ -1969,7 +1983,7 @@ class SimpleInMemoryKeyValueStorageTest {
         assertArrayEquals(key1, newEntry1.key());
         assertArrayEquals(val1_1, newEntry1.value());
 
-         newEntry1 = e1.entry();
+        newEntry1 = e1.entry();
 
         assertFalse(newEntry1.empty());
         assertFalse(newEntry1.tombstone());
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
new file mode 100644
index 0000000..af39c1a
--- /dev/null
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import java.nio.file.Path;
+import org.apache.ignite.internal.metastorage.server.persistence.RocksDBKeyValueStorage;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for RocksDB key-value storage implementation.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class RocksDbKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    /** */
+    @WorkDirectory
+    private Path workDir;
+
+    /** {@inheritDoc} */
+    @Override KeyValueStorage storage() {
+        return new RocksDBKeyValueStorage(workDir);
+    }
+}
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index 508424f..fd68a7e 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -17,2034 +17,12 @@
 
 package org.apache.ignite.internal.metastorage.server;
 
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
-import org.apache.ignite.internal.metastorage.common.OperationType;
-import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.lang.ByteArray;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import static java.util.function.Function.identity;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-
 /**
- * Tests for in-memory meta storage implementation.
+ * Tests for in-memory key-value storage implementation.
  */
-class SimpleInMemoryKeyValueStorageTest {
-    /** */
-    private KeyValueStorage storage;
-
-    @BeforeEach
-    public void setUp() {
-        storage = new SimpleInMemoryKeyValueStorage();
-    }
-
-    @Test
-    public void put() {
-        byte[] key = k(1);
-        byte[] val = kv(1, 1);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-        assertTrue(storage.get(key).empty());
-
-        storage.put(key, val);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        Entry e = storage.get(key);
-
-        assertFalse(e.empty());
-        assertFalse(e.tombstone());
-        assertEquals(1, e.revision());
-        assertEquals(1, e.updateCounter());
-
-        storage.put(key, val);
-
-        assertEquals(2, storage.revision());
-        assertEquals(2, storage.updateCounter());
-
-        e = storage.get(key);
-
-        assertFalse(e.empty());
-        assertFalse(e.tombstone());
-        assertEquals(2, e.revision());
-        assertEquals(2, e.updateCounter());
-    }
-
-    @Test
-    void getAll() {
-        byte[] key1 = k(1);
-        byte[] val1 = kv(1, 1);
-
-        byte[] key2 = k(2);
-        byte[] val2_1 = kv(2, 21);
-        byte[] val2_2 = kv(2, 22);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        byte[] key4 = k(4);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        // Regular put.
-        storage.put(key1, val1);
-
-        // Rewrite.
-        storage.put(key2, val2_1);
-        storage.put(key2, val2_2);
-
-        // Remove.
-        storage.put(key3, val3);
-        storage.remove(key3);
-
-        assertEquals(5, storage.revision());
-        assertEquals(5, storage.updateCounter());
-
-        Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4));
-
-        assertEquals(4, entries.size());
-
-        Map<ByteArray, Entry> map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
-        // Test regular put value.
-        Entry e1 = map.get(new ByteArray(key1));
-
-        assertNotNull(e1);
-        assertEquals(1, e1.revision());
-        assertEquals(1, e1.updateCounter());
-        assertFalse(e1.tombstone());
-        assertFalse(e1.empty());
-        assertArrayEquals(val1, e1.value());
-
-        // Test rewritten value.
-        Entry e2 = map.get(new ByteArray(key2));
-
-        assertNotNull(e2);
-        assertEquals(3, e2.revision());
-        assertEquals(3, e2.updateCounter());
-        assertFalse(e2.tombstone());
-        assertFalse(e2.empty());
-        assertArrayEquals(val2_2, e2.value());
-
-        // Test removed value.
-        Entry e3 = map.get(new ByteArray(key3));
-
-        assertNotNull(e3);
-        assertEquals(5, e3.revision());
-        assertEquals(5, e3.updateCounter());
-        assertTrue(e3.tombstone());
-        assertFalse(e3.empty());
-
-        // Test empty value.
-        Entry e4 = map.get(new ByteArray(key4));
-
-        assertNotNull(e4);
-        assertFalse(e4.tombstone());
-        assertTrue(e4.empty());
-    }
-
-    @Test
-    void getAllWithRevisionBound() {
-        byte[] key1 = k(1);
-        byte[] val1 = kv(1, 1);
-
-        byte[] key2 = k(2);
-        byte[] val2_1 = kv(2, 21);
-        byte[] val2_2 = kv(2, 22);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        byte[] key4 = k(4);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        // Regular put.
-        storage.put(key1, val1);
-
-        // Rewrite.
-        storage.put(key2, val2_1);
-        storage.put(key2, val2_2);
-
-        // Remove.
-        storage.put(key3, val3);
-        storage.remove(key3);
-
-        assertEquals(5, storage.revision());
-        assertEquals(5, storage.updateCounter());
-
-        // Bounded by revision 2.
-        Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4), 2);
-
-        assertEquals(4, entries.size());
-
-        Map<ByteArray, Entry> map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
-        // Test regular put value.
-        Entry e1 = map.get(new ByteArray(key1));
-
-        assertNotNull(e1);
-        assertEquals(1, e1.revision());
-        assertEquals(1, e1.updateCounter());
-        assertFalse(e1.tombstone());
-        assertFalse(e1.empty());
-        assertArrayEquals(val1, e1.value());
-
-        // Test while not rewritten value.
-        Entry e2 = map.get(new ByteArray(key2));
-
-        assertNotNull(e2);
-        assertEquals(2, e2.revision());
-        assertEquals(2, e2.updateCounter());
-        assertFalse(e2.tombstone());
-        assertFalse(e2.empty());
-        assertArrayEquals(val2_1, e2.value());
-
-        // Values with larger revision don't exist yet.
-        Entry e3 = map.get(new ByteArray(key3));
-
-        assertNotNull(e3);
-        assertTrue(e3.empty());
-
-        Entry e4 = map.get(new ByteArray(key4));
-
-        assertNotNull(e4);
-        assertTrue(e4.empty());
-
-        // Bounded by revision 4.
-        entries = storage.getAll(List.of(key1, key2, key3, key4), 4);
-
-        assertEquals(4, entries.size());
-
-        map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
-        // Test regular put value.
-        e1 = map.get(new ByteArray(key1));
-
-        assertNotNull(e1);
-        assertEquals(1, e1.revision());
-        assertEquals(1, e1.updateCounter());
-        assertFalse(e1.tombstone());
-        assertFalse(e1.empty());
-        assertArrayEquals(val1, e1.value());
-
-        // Test rewritten value.
-        e2 = map.get(new ByteArray(key2));
-
-        assertNotNull(e2);
-        assertEquals(3, e2.revision());
-        assertEquals(3, e2.updateCounter());
-        assertFalse(e2.tombstone());
-        assertFalse(e2.empty());
-        assertArrayEquals(val2_2, e2.value());
-
-        // Test not removed value.
-        e3 = map.get(new ByteArray(key3));
-
-        assertNotNull(e3);
-        assertEquals(4, e3.revision());
-        assertEquals(4, e3.updateCounter());
-        assertFalse(e3.tombstone());
-        assertFalse(e3.empty());
-        assertArrayEquals(val3, e3.value());
-
-        // Value with larger revision doesn't exist yet.
-        e4 = map.get(new ByteArray(key4));
-
-        assertNotNull(e4);
-        assertTrue(e4.empty());
-    }
-
-    @Test
-    public void getAndPut() {
-        byte[] key = k(1);
-        byte[] val = kv(1, 1);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-        assertTrue(storage.get(key).empty());
-
-        Entry e = storage.getAndPut(key, val);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-        assertTrue(e.empty());
-        assertFalse(e.tombstone());
-        assertEquals(0, e.revision());
-        assertEquals(0, e.updateCounter());
-
-        e = storage.getAndPut(key, val);
-
-        assertEquals(2, storage.revision());
-        assertEquals(2, storage.updateCounter());
-        assertFalse(e.empty());
-        assertFalse(e.tombstone());
-        assertEquals(1, e.revision());
-        assertEquals(1, e.updateCounter());
-    }
-
-    @Test
-    public void putAll() {
-        byte[] key1 = k(1);
-        byte[] val1 = kv(1, 1);
-
-        byte[] key2 = k(2);
-        byte[] val2_1 = kv(2, 21);
-        byte[] val2_2 = kv(2, 22);
-
-        byte[] key3 = k(3);
-        byte[] val3_1 = kv(3, 31);
-        byte[] val3_2 = kv(3, 32);
-
-        byte[] key4 = k(4);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        // Must be rewritten.
-        storage.put(key2, val2_1);
-
-        // Remove. Tombstone must be replaced by new value.
-        storage.put(key3, val3_1);
-        storage.remove(key3);
-
-        assertEquals(3, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        storage.putAll(List.of(key1, key2, key3), List.of(val1, val2_2, val3_2));
-
-        assertEquals(4, storage.revision());
-        assertEquals(6, storage.updateCounter());
-
-        Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4));
-
-        assertEquals(4, entries.size());
-
-        Map<ByteArray, Entry> map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
-        // Test regular put value.
-        Entry e1 = map.get(new ByteArray(key1));
-
-        assertNotNull(e1);
-        assertEquals(4, e1.revision());
-        assertEquals(4, e1.updateCounter());
-        assertFalse(e1.tombstone());
-        assertFalse(e1.empty());
-        assertArrayEquals(val1, e1.value());
-
-        // Test rewritten value.
-        Entry e2 = map.get(new ByteArray(key2));
-
-        assertNotNull(e2);
-        assertEquals(4, e2.revision());
-        assertEquals(5, e2.updateCounter());
-        assertFalse(e2.tombstone());
-        assertFalse(e2.empty());
-        assertArrayEquals(val2_2, e2.value());
-
-        // Test removed value.
-        Entry e3 = map.get(new ByteArray(key3));
-
-        assertNotNull(e3);
-        assertEquals(4, e3.revision());
-        assertEquals(6, e3.updateCounter());
-        assertFalse(e3.tombstone());
-        assertFalse(e3.empty());
-
-        // Test empty value.
-        Entry e4 = map.get(new ByteArray(key4));
-
-        assertNotNull(e4);
-        assertFalse(e4.tombstone());
-        assertTrue(e4.empty());
-    }
-
-    @Test
-    public void getAndPutAll() {
-        byte[] key1 = k(1);
-        byte[] val1 = kv(1, 1);
-
-        byte[] key2 = k(2);
-        byte[] val2_1 = kv(2, 21);
-        byte[] val2_2 = kv(2, 22);
-
-        byte[] key3 = k(3);
-        byte[] val3_1 = kv(3, 31);
-        byte[] val3_2 = kv(3, 32);
-
-        byte[] key4 = k(4);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        // Must be rewritten.
-        storage.put(key2, val2_1);
-
-        // Remove. Tombstone must be replaced by new value.
-        storage.put(key3, val3_1);
-        storage.remove(key3);
-
-        assertEquals(3, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        Collection<Entry> entries = storage.getAndPutAll(List.of(key1, key2, key3), List.of(val1, val2_2, val3_2));
-
-        assertEquals(4, storage.revision());
-        assertEquals(6, storage.updateCounter());
-
-        assertEquals(3, entries.size());
-
-        Map<ByteArray, Entry> map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
-        // Test regular put value.
-        Entry e1 = map.get(new ByteArray(key1));
-
-        assertNotNull(e1);
-        assertEquals(0, e1.revision());
-        assertEquals(0, e1.updateCounter());
-        assertFalse(e1.tombstone());
-        assertTrue(e1.empty());
-
-        // Test rewritten value.
-        Entry e2 = map.get(new ByteArray(key2));
-
-        assertNotNull(e2);
-        assertEquals(1, e2.revision());
-        assertEquals(1, e2.updateCounter());
-        assertFalse(e2.tombstone());
-        assertFalse(e2.empty());
-        assertArrayEquals(val2_1, e2.value());
-
-        // Test removed value.
-        Entry e3 = map.get(new ByteArray(key3));
-
-        assertNotNull(e3);
-        assertEquals(3, e3.revision());
-        assertEquals(3, e3.updateCounter());
-        assertTrue(e3.tombstone());
-        assertFalse(e3.empty());
-
-        // Test state after putAll.
-        entries = storage.getAll(List.of(key1, key2, key3, key4));
-
-        assertEquals(4, entries.size());
-
-        map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
-        // Test regular put value.
-        e1 = map.get(new ByteArray(key1));
-
-        assertNotNull(e1);
-        assertEquals(4, e1.revision());
-        assertEquals(4, e1.updateCounter());
-        assertFalse(e1.tombstone());
-        assertFalse(e1.empty());
-        assertArrayEquals(val1, e1.value());
-
-        // Test rewritten value.
-        e2 = map.get(new ByteArray(key2));
-
-        assertNotNull(e2);
-        assertEquals(4, e2.revision());
-        assertEquals(5, e2.updateCounter());
-        assertFalse(e2.tombstone());
-        assertFalse(e2.empty());
-        assertArrayEquals(val2_2, e2.value());
-
-        // Test removed value.
-        e3 = map.get(new ByteArray(key3));
-
-        assertNotNull(e3);
-        assertEquals(4, e3.revision());
-        assertEquals(6, e3.updateCounter());
-        assertFalse(e3.tombstone());
-        assertFalse(e3.empty());
-
-        // Test empty value.
-        Entry e4 = map.get(new ByteArray(key4));
-
-        assertNotNull(e4);
-        assertFalse(e4.tombstone());
-        assertTrue(e4.empty());
-    }
-
-    @Test
-    public void remove() {
-        byte[] key = k(1);
-        byte[] val = kv(1, 1);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-        assertTrue(storage.get(key).empty());
-
-        // Remove non-existent entry.
-        storage.remove(key);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-        assertTrue(storage.get(key).empty());
-
-        storage.put(key, val);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        // Remove existent entry.
-        storage.remove(key);
-
-        assertEquals(2, storage.revision());
-        assertEquals(2, storage.updateCounter());
-
-        Entry e = storage.get(key);
-
-        assertFalse(e.empty());
-        assertTrue(e.tombstone());
-        assertEquals(2, e.revision());
-        assertEquals(2, e.updateCounter());
-
-        // Remove already removed entry (tombstone can't be removed).
-        storage.remove(key);
-
-        assertEquals(2, storage.revision());
-        assertEquals(2, storage.updateCounter());
-
-        e = storage.get(key);
-
-        assertFalse(e.empty());
-        assertTrue(e.tombstone());
-        assertEquals(2, e.revision());
-        assertEquals(2, e.updateCounter());
-    }
-
-    @Test
-    public void getAndRemove() {
-        byte[] key = k(1);
-        byte[] val = kv(1, 1);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-        assertTrue(storage.get(key).empty());
-
-        // Remove non-existent entry.
-        Entry e = storage.getAndRemove(key);
-
-        assertTrue(e.empty());
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-        assertTrue(storage.get(key).empty());
-
-        storage.put(key, val);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        // Remove existent entry.
-        e = storage.getAndRemove(key);
-
-        assertFalse(e.empty());
-        assertFalse(e.tombstone());
-        assertEquals(1, e.revision());
-        assertEquals(1, e.updateCounter());
-        assertEquals(2, storage.revision());
-        assertEquals(2, storage.updateCounter());
-
-        e = storage.get(key);
-
-        assertFalse(e.empty());
-        assertTrue(e.tombstone());
-        assertEquals(2, e.revision());
-        assertEquals(2, e.updateCounter());
-
-        // Remove already removed entry (tombstone can't be removed).
-        e = storage.getAndRemove(key);
-
-        assertFalse(e.empty());
-        assertTrue(e.tombstone());
-        assertEquals(2, e.revision());
-        assertEquals(2, e.updateCounter());
-        assertEquals(2, storage.revision());
-        assertEquals(2, storage.updateCounter());
-
-        e = storage.get(key);
-
-        assertFalse(e.empty());
-        assertTrue(e.tombstone());
-        assertEquals(2, e.revision());
-        assertEquals(2, e.updateCounter());
-    }
-
-    @Test
-    public void removeAll() {
-        byte[] key1 = k(1);
-        byte[] val1 = kv(1, 1);
-
-        byte[] key2 = k(2);
-        byte[] val2_1 = kv(2, 21);
-        byte[] val2_2 = kv(2, 22);
-
-        byte[] key3 = k(3);
-        byte[] val3_1 = kv(3, 31);
-
-        byte[] key4 = k(4);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        // Regular put.
-        storage.put(key1, val1);
-
-        // Rewrite.
-        storage.put(key2, val2_1);
-        storage.put(key2, val2_2);
-
-        // Remove. Tombstone must not be removed again.
-        storage.put(key3, val3_1);
-        storage.remove(key3);
-
-        assertEquals(5, storage.revision());
-        assertEquals(5, storage.updateCounter());
-
-        storage.removeAll(List.of(key1, key2, key3, key4));
-
-        assertEquals(6, storage.revision());
-        assertEquals(7, storage.updateCounter()); // Only two keys are updated.
-
-        Collection<Entry> entries = storage.getAll(List.of(key1, key2, key3, key4));
-
-        assertEquals(4, entries.size());
-
-        Map<ByteArray, Entry> map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
-        // Test regular put value.
-        Entry e1 = map.get(new ByteArray(key1));
-
-        assertNotNull(e1);
-        assertEquals(6, e1.revision());
-        assertEquals(6, e1.updateCounter());
-        assertTrue(e1.tombstone());
-        assertFalse(e1.empty());
-
-        // Test rewritten value.
-        Entry e2 = map.get(new ByteArray(key2));
-
-        assertNotNull(e2);
-        assertEquals(6, e2.revision());
-        assertEquals(7, e2.updateCounter());
-        assertTrue(e2.tombstone());
-        assertFalse(e2.empty());
-
-        // Test removed value.
-        Entry e3 = map.get(new ByteArray(key3));
-
-        assertNotNull(e3);
-        assertEquals(5, e3.revision());
-        assertEquals(5, e3.updateCounter());
-        assertTrue(e3.tombstone());
-        assertFalse(e3.empty());
-
-        // Test empty value.
-        Entry e4 = map.get(new ByteArray(key4));
-
-        assertNotNull(e4);
-        assertFalse(e4.tombstone());
-        assertTrue(e4.empty());
-    }
-
-    @Test
-    public void getAndRemoveAll() {
-        byte[] key1 = k(1);
-        byte[] val1 = kv(1, 1);
-
-        byte[] key2 = k(2);
-        byte[] val2_1 = kv(2, 21);
-        byte[] val2_2 = kv(2, 22);
-
-        byte[] key3 = k(3);
-        byte[] val3_1 = kv(3, 31);
-
-        byte[] key4 = k(4);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        // Regular put.
-        storage.put(key1, val1);
-
-        // Rewrite.
-        storage.put(key2, val2_1);
-        storage.put(key2, val2_2);
-
-        // Remove. Tombstone must not be removed again.
-        storage.put(key3, val3_1);
-        storage.remove(key3);
-
-        assertEquals(5, storage.revision());
-        assertEquals(5, storage.updateCounter());
-
-        Collection<Entry> entries = storage.getAndRemoveAll(List.of(key1, key2, key3, key4));
-
-        assertEquals(6, storage.revision());
-        assertEquals(7, storage.updateCounter()); // Only two keys are updated.
-
-        assertEquals(4, entries.size());
-
-        Map<ByteArray, Entry> map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
-        // Test regular put value.
-        Entry e1 = map.get(new ByteArray(key1));
-
-        assertNotNull(e1);
-        assertEquals(1, e1.revision());
-        assertEquals(1, e1.updateCounter());
-        assertFalse(e1.tombstone());
-        assertFalse(e1.empty());
-
-        // Test rewritten value.
-        Entry e2 = map.get(new ByteArray(key2));
-
-        assertNotNull(e2);
-        assertEquals(3, e2.revision());
-        assertEquals(3, e2.updateCounter());
-        assertFalse(e2.tombstone());
-        assertFalse(e2.empty());
-
-        // Test removed value.
-        Entry e3 = map.get(new ByteArray(key3));
-
-        assertNotNull(e3);
-        assertEquals(5, e3.revision());
-        assertEquals(5, e3.updateCounter());
-        assertTrue(e3.tombstone());
-        assertFalse(e3.empty());
-
-        // Test empty value.
-        Entry e4 = map.get(new ByteArray(key4));
-
-        assertNotNull(e4);
-        assertFalse(e4.tombstone());
-        assertTrue(e4.empty());
-
-        // Test state after getAndRemoveAll.
-        entries = storage.getAll(List.of(key1, key2, key3, key4));
-
-        assertEquals(4, entries.size());
-
-        map = entries.stream().collect(Collectors.toMap(e -> new ByteArray(e.key()), identity()));
-
-        // Test regular put value.
-        e1 = map.get(new ByteArray(key1));
-
-        assertNotNull(e1);
-        assertEquals(6, e1.revision());
-        assertEquals(6, e1.updateCounter());
-        assertTrue(e1.tombstone());
-        assertFalse(e1.empty());
-
-        // Test rewritten value.
-        e2 = map.get(new ByteArray(key2));
-
-        assertNotNull(e2);
-        assertEquals(6, e2.revision());
-        assertEquals(7, e2.updateCounter());
-        assertTrue(e2.tombstone());
-        assertFalse(e2.empty());
-
-        // Test removed value.
-        e3 = map.get(new ByteArray(key3));
-
-        assertNotNull(e3);
-        assertEquals(5, e3.revision());
-        assertEquals(5, e3.updateCounter());
-        assertTrue(e3.tombstone());
-        assertFalse(e3.empty());
-
-        // Test empty value.
-        e4 = map.get(new ByteArray(key4));
-
-        assertNotNull(e4);
-        assertFalse(e4.tombstone());
-        assertTrue(e4.empty());
-    }
-
-    @Test
-    public void getAfterRemove() {
-        byte[] key = k(1);
-        byte[] val = kv(1, 1);
-
-        storage.getAndPut(key, val);
-
-        storage.getAndRemove(key);
-
-        Entry e = storage.get(key);
-
-        assertEquals(2, storage.revision());
-        assertEquals(2, storage.updateCounter());
-        assertEquals(2, e.revision());
-        assertTrue(e.tombstone());
-    }
-
-    @Test
-    public void getAndPutAfterRemove() {
-        byte[] key = k(1);
-        byte[] val = kv(1, 1);
-
-        storage.getAndPut(key, val);
-
-        storage.getAndRemove(key);
-
-        Entry e = storage.getAndPut(key, val);
-
-        assertEquals(3, storage.revision());
-        assertEquals(3, storage.updateCounter());
-        assertEquals(2, e.revision());
-        assertTrue(e.tombstone());
-    }
-
-    @Test
-    public void putGetRemoveCompact() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 1);
-        byte[] val1_3 = kv(1, 3);
-
-        byte[] key2 = k(2);
-        byte[] val2_2 = kv(2, 2);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        // Previous entry is empty.
-        Entry emptyEntry = storage.getAndPut(key1, val1_1);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-        assertTrue(emptyEntry.empty());
-
-        // Entry with rev == 1.
-        Entry e1_1 = storage.get(key1);
-
-        assertFalse(e1_1.empty());
-        assertFalse(e1_1.tombstone());
-        assertArrayEquals(key1, e1_1.key());
-        assertArrayEquals(val1_1, e1_1.value());
-        assertEquals(1, e1_1.revision());
-        assertEquals(1, e1_1.updateCounter());
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        // Previous entry is empty.
-        emptyEntry = storage.getAndPut(key2, val2_2);
-
-        assertEquals(2, storage.revision());
-        assertEquals(2, storage.updateCounter());
-        assertTrue(emptyEntry.empty());
-
-        // Entry with rev == 2.
-        Entry e2 = storage.get(key2);
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertArrayEquals(key2, e2.key());
-        assertArrayEquals(val2_2, e2.value());
-        assertEquals(2, e2.revision());
-        assertEquals(2, e2.updateCounter());
-        assertEquals(2, storage.revision());
-        assertEquals(2, storage.updateCounter());
-
-        // Previous entry is not empty.
-        e1_1 = storage.getAndPut(key1, val1_3);
-
-        assertFalse(e1_1.empty());
-        assertFalse(e1_1.tombstone());
-        assertArrayEquals(key1, e1_1.key());
-        assertArrayEquals(val1_1, e1_1.value());
-        assertEquals(1, e1_1.revision());
-        assertEquals(1, e1_1.updateCounter());
-        assertEquals(3, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        // Entry with rev == 3.
-        Entry e1_3 = storage.get(key1);
-
-        assertFalse(e1_3.empty());
-        assertFalse(e1_3.tombstone());
-        assertArrayEquals(key1, e1_3.key());
-        assertArrayEquals(val1_3, e1_3.value());
-        assertEquals(3, e1_3.revision());
-        assertEquals(3, e1_3.updateCounter());
-        assertEquals(3, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        // Remove existing entry.
-        Entry e2_2 = storage.getAndRemove(key2);
-
-        assertFalse(e2_2.empty());
-        assertFalse(e2_2.tombstone());
-        assertArrayEquals(key2, e2_2.key());
-        assertArrayEquals(val2_2, e2_2.value());
-        assertEquals(2, e2_2.revision());
-        assertEquals(2, e2_2.updateCounter());
-        assertEquals(4, storage.revision()); // Storage revision is changed.
-        assertEquals(4, storage.updateCounter());
-
-        // Remove already removed entry.
-        Entry tombstoneEntry = storage.getAndRemove(key2);
-
-        assertFalse(tombstoneEntry.empty());
-        assertTrue(tombstoneEntry.tombstone());
-        assertEquals(4, storage.revision()); // Storage revision is not changed.
-        assertEquals(4, storage.updateCounter());
-
-        // Compact and check that tombstones are removed.
-        storage.compact();
-
-        assertEquals(4, storage.revision());
-        assertEquals(4, storage.updateCounter());
-        assertTrue(storage.getAndRemove(key2).empty());
-        assertTrue(storage.get(key2).empty());
-
-        // Remove existing entry.
-        e1_3 = storage.getAndRemove(key1);
-
-        assertFalse(e1_3.empty());
-        assertFalse(e1_3.tombstone());
-        assertArrayEquals(key1, e1_3.key());
-        assertArrayEquals(val1_3, e1_3.value());
-        assertEquals(3, e1_3.revision());
-        assertEquals(3, e1_3.updateCounter());
-        assertEquals(5, storage.revision()); // Storage revision is changed.
-        assertEquals(5, storage.updateCounter());
-
-        // Remove already removed entry.
-        tombstoneEntry = storage.getAndRemove(key1);
-
-        assertFalse(tombstoneEntry.empty());
-        assertTrue(tombstoneEntry.tombstone());
-        assertEquals(5, storage.revision()); // // Storage revision is not changed.
-        assertEquals(5, storage.updateCounter());
-
-        // Compact and check that tombstones are removed.
-        storage.compact();
-
-        assertEquals(5, storage.revision());
-        assertEquals(5, storage.updateCounter());
-        assertTrue(storage.getAndRemove(key1).empty());
-        assertTrue(storage.get(key1).empty());
-    }
-
-    @Test
-    public void invokeWithRevisionCondition_successBranch() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 11);
-        byte[] val1_2 = kv(1, 12);
-
-        byte[] key2 = k(2);
-        byte[] val2 = kv(2, 2);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        storage.put(key1, val1_1);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        boolean branch = storage.invoke(
-                new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 1),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                ),
-                List.of(new Operation(OperationType.PUT, key3, val3))
-        );
-
-        // "Success" branch is applied.
-        assertTrue(branch);
-        assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        Entry e1 = storage.get(key1);
-
-        assertFalse(e1.empty());
-        assertFalse(e1.tombstone());
-        assertEquals(2, e1.revision());
-        assertEquals(2, e1.updateCounter());
-        assertArrayEquals(val1_2, e1.value());
-
-        Entry e2 = storage.get(key2);
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertEquals(2, e2.revision());
-        assertEquals(3, e2.updateCounter());
-        assertArrayEquals(val2, e2.value());
-
-        // "Failure" branch isn't applied.
-        Entry e3 = storage.get(key3);
-
-        assertTrue(e3.empty());
-    }
-
-    @Test
-    public void invokeWithRevisionCondition_failureBranch() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 11);
-        byte[] val1_2 = kv(1, 12);
-
-        byte[] key2 = k(2);
-        byte[] val2 = kv(2, 2);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        storage.put(key1, val1_1);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        boolean branch = storage.invoke(
-                new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 2),
-                List.of(new Operation(OperationType.PUT, key3, val3)),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                )
-        );
-
-        // "Failure" branch is applied.
-        assertFalse(branch);
-        assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        Entry e1 = storage.get(key1);
-
-        assertFalse(e1.empty());
-        assertFalse(e1.tombstone());
-        assertEquals(2, e1.revision());
-        assertEquals(2, e1.updateCounter());
-        assertArrayEquals(val1_2, e1.value());
-
-        Entry e2 = storage.get(key2);
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertEquals(2, e2.revision());
-        assertEquals(3, e2.updateCounter());
-        assertArrayEquals(val2, e2.value());
-
-        // "Success" branch isn't applied.
-        Entry e3 = storage.get(key3);
-
-        assertTrue(e3.empty());
-    }
-
-    @Test
-    public void invokeWithExistsCondition_successBranch() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 11);
-        byte[] val1_2 = kv(1, 12);
-
-        byte[] key2 = k(2);
-        byte[] val2 = kv(2, 2);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        storage.put(key1, val1_1);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        boolean branch = storage.invoke(
-                new ExistenceCondition(ExistenceCondition.Type.EXISTS, key1),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                ),
-                List.of(new Operation(OperationType.PUT, key3, val3))
-        );
-
-        // "Success" branch is applied.
-        assertTrue(branch);
-        assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        Entry e1 = storage.get(key1);
-
-        assertFalse(e1.empty());
-        assertFalse(e1.tombstone());
-        assertEquals(2, e1.revision());
-        assertEquals(2, e1.updateCounter());
-        assertArrayEquals(val1_2, e1.value());
-
-        Entry e2 = storage.get(key2);
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertEquals(2, e2.revision());
-        assertEquals(3, e2.updateCounter());
-        assertArrayEquals(val2, e2.value());
-
-        // "Failure" branch isn't applied.
-        Entry e3 = storage.get(key3);
-
-        assertTrue(e3.empty());
-    }
-
-    @Test
-    public void invokeWithExistsCondition_failureBranch() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 11);
-        byte[] val1_2 = kv(1, 12);
-
-        byte[] key2 = k(2);
-        byte[] val2 = kv(2, 2);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        storage.put(key1, val1_1);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        boolean branch = storage.invoke(
-                new ExistenceCondition(ExistenceCondition.Type.EXISTS, key3),
-                List.of(new Operation(OperationType.PUT, key3, val3)),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                )
-        );
-
-        // "Failure" branch is applied.
-        assertFalse(branch);
-        assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        Entry e1 = storage.get(key1);
-
-        assertFalse(e1.empty());
-        assertFalse(e1.tombstone());
-        assertEquals(2, e1.revision());
-        assertEquals(2, e1.updateCounter());
-        assertArrayEquals(val1_2, e1.value());
-
-        Entry e2 = storage.get(key2);
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertEquals(2, e2.revision());
-        assertEquals(3, e2.updateCounter());
-        assertArrayEquals(val2, e2.value());
-
-        // "Success" branch isn't applied.
-        Entry e3 = storage.get(key3);
-
-        assertTrue(e3.empty());
-    }
-
-    @Test
-    public void invokeWithNotExistsCondition_successBranch() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 11);
-        byte[] val1_2 = kv(1, 12);
-
-        byte[] key2 = k(2);
-        byte[] val2 = kv(2, 2);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        storage.put(key1, val1_1);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        boolean branch = storage.invoke(
-                new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key2),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                ),
-                List.of(new Operation(OperationType.PUT, key3, val3))
-        );
-
-        // "Success" branch is applied.
-        assertTrue(branch);
-        assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        Entry e1 = storage.get(key1);
-
-        assertFalse(e1.empty());
-        assertFalse(e1.tombstone());
-        assertEquals(2, e1.revision());
-        assertEquals(2, e1.updateCounter());
-        assertArrayEquals(val1_2, e1.value());
-
-        Entry e2 = storage.get(key2);
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertEquals(2, e2.revision());
-        assertEquals(3, e2.updateCounter());
-        assertArrayEquals(val2, e2.value());
-
-        // "Failure" branch isn't applied.
-        Entry e3 = storage.get(key3);
-
-        assertTrue(e3.empty());
-    }
-
-    @Test
-    public void invokeWithNotExistsCondition_failureBranch() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 11);
-        byte[] val1_2 = kv(1, 12);
-
-        byte[] key2 = k(2);
-        byte[] val2 = kv(2, 2);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        storage.put(key1, val1_1);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        boolean branch = storage.invoke(
-                new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key1),
-                List.of(new Operation(OperationType.PUT, key3, val3)),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                )
-        );
-
-        // "Failure" branch is applied.
-        assertFalse(branch);
-        assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        Entry e1 = storage.get(key1);
-
-        assertFalse(e1.empty());
-        assertFalse(e1.tombstone());
-        assertEquals(2, e1.revision());
-        assertEquals(2, e1.updateCounter());
-        assertArrayEquals(val1_2, e1.value());
-
-        Entry e2 = storage.get(key2);
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertEquals(2, e2.revision());
-        assertEquals(3, e2.updateCounter());
-        assertArrayEquals(val2, e2.value());
-
-        // "Success" branch isn't applied.
-        Entry e3 = storage.get(key3);
-
-        assertTrue(e3.empty());
-    }
-
-    @Test
-    public void invokeWithTombstoneCondition_successBranch() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 11);
-
-        byte[] key2 = k(2);
-        byte[] val2 = kv(2, 2);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        storage.put(key1, val1_1);
-        storage.remove(key1); // Should be tombstone after remove.
-
-        assertEquals(2, storage.revision());
-        assertEquals(2, storage.updateCounter());
-
-        boolean branch = storage.invoke(
-                new TombstoneCondition(key1),
-                List.of(new Operation(OperationType.PUT, key2, val2)),
-                List.of(new Operation(OperationType.PUT, key3, val3))
-        );
-
-        // "Success" branch is applied.
-        assertTrue(branch);
-        assertEquals(3, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        Entry e1 = storage.get(key1);
-
-        assertFalse(e1.empty());
-        assertTrue(e1.tombstone());
-        assertEquals(2, e1.revision());
-        assertEquals(2, e1.updateCounter());
-        assertNull(e1.value());
-
-        Entry e2 = storage.get(key2);
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertEquals(3, e2.revision());
-        assertEquals(3, e2.updateCounter());
-        assertArrayEquals(val2, e2.value());
-
-        // "Failure" branch isn't applied.
-        Entry e3 = storage.get(key3);
-
-        assertTrue(e3.empty());
-    }
-
-    @Test
-    public void invokeWithTombstoneCondition_failureBranch() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 11);
-
-        byte[] key2 = k(2);
-        byte[] val2 = kv(2, 2);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        storage.put(key1, val1_1);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        boolean branch = storage.invoke(
-                new TombstoneCondition(key1),
-                List.of(new Operation(OperationType.PUT, key2, val2)),
-                List.of(new Operation(OperationType.PUT, key3, val3))
-        );
-
-        // "Failure" branch is applied.
-        assertFalse(branch);
-        assertEquals(2, storage.revision());
-        assertEquals(2, storage.updateCounter());
-
-        Entry e1 = storage.get(key1);
-
-        assertFalse(e1.empty());
-        assertFalse(e1.tombstone());
-        assertEquals(1, e1.revision());
-        assertEquals(1, e1.updateCounter());
-        assertArrayEquals(val1_1, e1.value());
-
-        Entry e3 = storage.get(key3);
-
-        assertFalse(e3.empty());
-        assertFalse(e3.tombstone());
-        assertEquals(2, e3.revision());
-        assertEquals(2, e3.updateCounter());
-        assertArrayEquals(val3, e3.value());
-
-        // "Success" branch isn't applied.
-        Entry e2 = storage.get(key2);
-
-        assertTrue(e2.empty());
-    }
-
-    @Test
-    public void invokeWithValueCondition_successBranch() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 11);
-        byte[] val1_2 = kv(1, 12);
-
-        byte[] key2 = k(2);
-        byte[] val2 = kv(2, 2);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        storage.put(key1, val1_1);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        boolean branch = storage.invoke(
-                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_1),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                ),
-                List.of(new Operation(OperationType.PUT, key3, val3))
-        );
-
-        // "Success" branch is applied.
-        assertTrue(branch);
-        assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        Entry e1 = storage.get(key1);
-
-        assertFalse(e1.empty());
-        assertFalse(e1.tombstone());
-        assertEquals(2, e1.revision());
-        assertEquals(2, e1.updateCounter());
-        assertArrayEquals(val1_2, e1.value());
-
-        Entry e2 = storage.get(key2);
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertEquals(2, e2.revision());
-        assertEquals(3, e2.updateCounter());
-        assertArrayEquals(val2, e2.value());
-
-        // "Failure" branch isn't applied.
-        Entry e3 = storage.get(key3);
-
-        assertTrue(e3.empty());
-    }
-
-    @Test
-    public void invokeWithValueCondition_failureBranch() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 11);
-        byte[] val1_2 = kv(1, 12);
-
-        byte[] key2 = k(2);
-        byte[] val2 = kv(2, 2);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        storage.put(key1, val1_1);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        boolean branch = storage.invoke(
-                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_2),
-                List.of(new Operation(OperationType.PUT, key3, val3)),
-                List.of(
-                        new Operation(OperationType.PUT, key1, val1_2),
-                        new Operation(OperationType.PUT, key2, val2)
-                )
-        );
-
-        // "Failure" branch is applied.
-        assertFalse(branch);
-        assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        Entry e1 = storage.get(key1);
-
-        assertFalse(e1.empty());
-        assertFalse(e1.tombstone());
-        assertEquals(2, e1.revision());
-        assertEquals(2, e1.updateCounter());
-        assertArrayEquals(val1_2, e1.value());
-
-        Entry e2 = storage.get(key2);
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertEquals(2, e2.revision());
-        assertEquals(3, e2.updateCounter());
-        assertArrayEquals(val2, e2.value());
-
-        // "Success" branch isn't applied.
-        Entry e3 = storage.get(key3);
-
-        assertTrue(e3.empty());
-    }
-
-    @Test
-    public void invokeOperations() {
-        byte[] key1 = k(1);
-        byte[] val1 = kv(1, 1);
-
-        byte[] key2 = k(2);
-        byte[] val2 = kv(2, 2);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        storage.put(key1, val1);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        // No-op.
-        boolean branch = storage.invoke(
-                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
-                List.of(new Operation(OperationType.NO_OP, null, null)),
-                List.of(new Operation(OperationType.NO_OP, null, null))
-        );
-
-        assertTrue(branch);
-
-        // No updates.
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        // Put.
-        branch = storage.invoke(
-                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
-                List.of(
-                        new Operation(OperationType.PUT, key2, val2),
-                        new Operation(OperationType.PUT, key3, val3)
-                ),
-                List.of(new Operation(OperationType.NO_OP, null, null))
-        );
-
-        assertTrue(branch);
-
-        // +1 for revision, +2 for update counter.
-        assertEquals(2, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        Entry e2 = storage.get(key2);
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertEquals(2, e2.revision());
-        assertEquals(2, e2.updateCounter());
-        assertArrayEquals(key2, e2.key());
-        assertArrayEquals(val2, e2.value());
-
-        Entry e3 = storage.get(key3);
-
-        assertFalse(e3.empty());
-        assertFalse(e3.tombstone());
-        assertEquals(2, e3.revision());
-        assertEquals(3, e3.updateCounter());
-        assertArrayEquals(key3, e3.key());
-        assertArrayEquals(val3, e3.value());
-
-        // Remove.
-        branch = storage.invoke(
-                new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
-                List.of(
-                        new Operation(OperationType.REMOVE, key2, null),
-                        new Operation(OperationType.REMOVE, key3, null)
-                ),
-                List.of(new Operation(OperationType.NO_OP, null, null))
-        );
-
-        assertTrue(branch);
-
-        // +1 for revision, +2 for update counter.
-        assertEquals(3, storage.revision());
-        assertEquals(5, storage.updateCounter());
-
-        e2 = storage.get(key2);
-
-        assertFalse(e2.empty());
-        assertTrue(e2.tombstone());
-        assertEquals(3, e2.revision());
-        assertEquals(4, e2.updateCounter());
-        assertArrayEquals(key2, e2.key());
-
-        e3 = storage.get(key3);
-
-        assertFalse(e3.empty());
-        assertTrue(e3.tombstone());
-        assertEquals(3, e3.revision());
-        assertEquals(5, e3.updateCounter());
-        assertArrayEquals(key3, e3.key());
-    }
-
-    @Test
-    public void compact() {
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        // Compact empty.
-        storage.compact();
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        // Compact non-empty.
-        fill(storage, 1, 1);
-
-        assertEquals(1, storage.revision());
-        assertEquals(1, storage.updateCounter());
-
-        fill(storage, 2, 2);
-
-        assertEquals(3, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        fill(storage, 3, 3);
-
-        assertEquals(6, storage.revision());
-        assertEquals(6, storage.updateCounter());
-
-        storage.getAndRemove(k(3));
-
-        assertEquals(7, storage.revision());
-        assertEquals(7, storage.updateCounter());
-        assertTrue(storage.get(k(3)).tombstone());
-
-        storage.compact();
-
-        assertEquals(7, storage.revision());
-        assertEquals(7, storage.updateCounter());
-
-        Entry e1 = storage.get(k(1));
-
-        assertFalse(e1.empty());
-        assertFalse(e1.tombstone());
-        assertArrayEquals(k(1), e1.key());
-        assertArrayEquals(kv(1,1), e1.value());
-        assertEquals(1, e1.revision());
-        assertEquals(1, e1.updateCounter());
-
-        Entry e2 = storage.get(k(2));
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertArrayEquals(k(2), e2.key());
-        assertArrayEquals(kv(2,2), e2.value());
-        assertTrue(storage.get(k(2), 2).empty());
-        assertEquals(3, e2.revision());
-        assertEquals(3, e2.updateCounter());
-
-        Entry e3 = storage.get(k(3));
-
-        assertTrue(e3.empty());
-        assertTrue(storage.get(k(3), 5).empty());
-        assertTrue(storage.get(k(3), 6).empty());
-        assertTrue(storage.get(k(3), 7).empty());
-    }
-
-    @Test
-    public void rangeCursor() {
-        byte[] key1 = k(1);
-        byte[] val1 = kv(1, 1);
-
-        byte[] key2 = k(2);
-        byte[] val2 = kv(2, 2);
-
-        byte[] key3 = k(3);
-        byte[] val3 = kv(3, 3);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        storage.putAll(List.of(key1, key2, key3), List.of(val1, val2, val3));
-
-        assertEquals(1, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        // Range for latest revision without max bound.
-        Cursor<Entry> cur = storage.range(key1, null);
-
-        Iterator<Entry> it = cur.iterator();
-
-        assertTrue(it.hasNext());
-
-        Entry e1 = it.next();
-
-        assertFalse(e1.empty());
-        assertFalse(e1.tombstone());
-        assertArrayEquals(key1, e1.key());
-        assertArrayEquals(val1, e1.value());
-        assertEquals(1, e1.revision());
-        assertEquals(1, e1.updateCounter());
-
-        assertTrue(it.hasNext());
-
-        Entry e2 = it.next();
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertArrayEquals(key2, e2.key());
-        assertArrayEquals(val2, e2.value());
-        assertEquals(1, e2.revision());
-        assertEquals(2, e2.updateCounter());
-
-        // Deliberately don't call it.hasNext()
-
-        Entry e3 = it.next();
-
-        assertFalse(e3.empty());
-        assertFalse(e3.tombstone());
-        assertArrayEquals(key3, e3.key());
-        assertArrayEquals(val3, e3.value());
-        assertEquals(1, e3.revision());
-        assertEquals(3, e3.updateCounter());
-
-        assertFalse(it.hasNext());
-
-        try {
-            it.next();
-
-            fail();
-        }
-        catch (NoSuchElementException e) {
-            System.out.println();
-            // No-op.
-        }
-
-        // Range for latest revision with max bound.
-        cur = storage.range(key1, key3);
-
-        it = cur.iterator();
-
-        assertTrue(it.hasNext());
-
-        e1 = it.next();
-
-        assertFalse(e1.empty());
-        assertFalse(e1.tombstone());
-        assertArrayEquals(key1, e1.key());
-        assertArrayEquals(val1, e1.value());
-        assertEquals(1, e1.revision());
-        assertEquals(1, e1.updateCounter());
-
-        assertTrue(it.hasNext());
-
-        e2 = it.next();
-
-        assertFalse(e2.empty());
-        assertFalse(e2.tombstone());
-        assertArrayEquals(key2, e2.key());
-        assertArrayEquals(val2, e2.value());
-        assertEquals(1, e2.revision());
-        assertEquals(2, e2.updateCounter());
-
-        assertFalse(it.hasNext());
-
-        try {
-            it.next();
-
-            fail();
-        }
-        catch (NoSuchElementException e) {
-            System.out.println();
-            // No-op.
-        }
-    }
-
-    @Test
-    public void watchCursorForRange() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 11);
-
-        byte[] key2 = k(2);
-        byte[] val2_1 = kv(2, 21);
-        byte[] val2_2 = kv(2, 22);
-
-        byte[] key3 = k(3);
-        byte[] val3_1 = kv(3, 31);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        // Watch for all updates starting from revision 2.
-        Cursor<WatchEvent> cur = storage.watch(key1, null, 2);
-
-        Iterator<WatchEvent> it = cur.iterator();
-
-        assertFalse(it.hasNext());
-        assertNull(it.next());
-
-        storage.putAll(List.of(key1, key2), List.of(val1_1, val2_1));
-
-        assertEquals(1, storage.revision());
-        assertEquals(2, storage.updateCounter());
-
-        // Revision is less than 2.
-        assertFalse(it.hasNext());
-        assertNull(it.next());
-
-        storage.putAll(List.of(key2, key3), List.of(val2_2, val3_1));
-
-        assertEquals(2, storage.revision());
-        assertEquals(4, storage.updateCounter());
-
-        // Revision is 2.
-        assertTrue(it.hasNext());
-
-        WatchEvent watchEvent = it.next();
-
-        assertFalse(watchEvent.single());
-
-        Map<ByteArray, EntryEvent> map = watchEvent.entryEvents().stream()
-                .collect(Collectors.toMap(evt -> new ByteArray(evt.entry().key()), identity()));
-
-        assertEquals(2, map.size());
-
-        // First update under revision.
-        EntryEvent e2 = map.get(new ByteArray(key2));
-
-        assertNotNull(e2);
-
-        Entry oldEntry2 = e2.oldEntry();
-
-        assertFalse(oldEntry2.empty());
-        assertFalse(oldEntry2.tombstone());
-        assertEquals(1, oldEntry2.revision());
-        assertEquals(2, oldEntry2.updateCounter());
-        assertArrayEquals(key2, oldEntry2.key());
-        assertArrayEquals(val2_1, oldEntry2.value());
-
-        Entry newEntry2 = e2.entry();
-
-        assertFalse(newEntry2.empty());
-        assertFalse(newEntry2.tombstone());
-        assertEquals(2, newEntry2.revision());
-        assertEquals(3, newEntry2.updateCounter());
-        assertArrayEquals(key2, newEntry2.key());
-        assertArrayEquals(val2_2, newEntry2.value());
-
-        // Second update under revision.
-        EntryEvent e3 = map.get(new ByteArray(key3));
-
-        assertNotNull(e3);
-
-        Entry oldEntry3 = e3.oldEntry();
-
-        assertTrue(oldEntry3.empty());
-        assertFalse(oldEntry3.tombstone());
-        assertArrayEquals(key3, oldEntry3.key());
-
-        Entry newEntry3 = e3.entry();
-
-        assertFalse(newEntry3.empty());
-        assertFalse(newEntry3.tombstone());
-        assertEquals(2, newEntry3.revision());
-        assertEquals(4, newEntry3.updateCounter());
-        assertArrayEquals(key3, newEntry3.key());
-        assertArrayEquals(val3_1, newEntry3.value());
-
-        assertFalse(it.hasNext());
-
-        storage.remove(key1);
-
-        assertTrue(it.hasNext());
-
-        watchEvent = it.next();
-
-        assertTrue(watchEvent.single());
-
-        EntryEvent e1 = watchEvent.entryEvent();
-
-        Entry oldEntry1 = e1.oldEntry();
-
-        assertFalse(oldEntry1.empty());
-        assertFalse(oldEntry1.tombstone());
-        assertEquals(1, oldEntry1.revision());
-        assertEquals(1, oldEntry1.updateCounter());
-        assertArrayEquals(key1, oldEntry1.key());
-        assertArrayEquals(val1_1, oldEntry1.value());
-
-        Entry newEntry1 = e1.entry();
-
-        assertFalse(newEntry1.empty());
-        assertTrue(newEntry1.tombstone());
-        assertEquals(3, newEntry1.revision());
-        assertEquals(5, newEntry1.updateCounter());
-        assertArrayEquals(key1, newEntry1.key());
-        assertNull(newEntry1.value());
-
-        assertFalse(it.hasNext());
-    }
-
-    @Test
-    public void watchCursorForKey() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 11);
-        byte[] val1_2 = kv(1, 12);
-
-        byte[] key2 = k(2);
-        byte[] val2_1 = kv(2, 21);
-        byte[] val2_2 = kv(2, 22);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        Cursor<WatchEvent> cur = storage.watch(key1, 1);
-
-        Iterator<WatchEvent> it = cur.iterator();
-
-        assertFalse(it.hasNext());
-        assertNull(it.next());
-
-        storage.putAll(List.of(key1, key2), List.of(val1_1, val2_1));
-
-        assertEquals(1, storage.revision());
-        assertEquals(2, storage.updateCounter());
-
-        assertTrue(it.hasNext());
-
-        WatchEvent watchEvent = it.next();
-
-        assertTrue(watchEvent.single());
-
-        EntryEvent e1 = watchEvent.entryEvent();
-
-        Entry oldEntry1 = e1.oldEntry();
-
-        assertTrue(oldEntry1.empty());
-        assertFalse(oldEntry1.tombstone());
-
-        Entry newEntry1 = e1.entry();
-
-        assertFalse(newEntry1.empty());
-        assertFalse(newEntry1.tombstone());
-        assertEquals(1, newEntry1.revision());
-        assertEquals(1, newEntry1.updateCounter());
-        assertArrayEquals(key1, newEntry1.key());
-        assertArrayEquals(val1_1, newEntry1.value());
-
-        assertFalse(it.hasNext());
-
-        storage.put(key2, val2_2);
-
-        assertFalse(it.hasNext());
-
-        storage.put(key1, val1_2);
-
-        assertTrue(it.hasNext());
-
-        watchEvent = it.next();
-
-        assertTrue(watchEvent.single());
-
-        e1 = watchEvent.entryEvent();
-
-        oldEntry1 = e1.oldEntry();
-
-        assertFalse(oldEntry1.empty());
-        assertFalse(oldEntry1.tombstone());
-        assertEquals(1, oldEntry1.revision());
-        assertEquals(1, oldEntry1.updateCounter());
-        assertArrayEquals(key1, newEntry1.key());
-        assertArrayEquals(val1_1, newEntry1.value());
-
-         newEntry1 = e1.entry();
-
-        assertFalse(newEntry1.empty());
-        assertFalse(newEntry1.tombstone());
-        assertEquals(3, newEntry1.revision());
-        assertEquals(4, newEntry1.updateCounter());
-        assertArrayEquals(key1, newEntry1.key());
-        assertArrayEquals(val1_2, newEntry1.value());
-
-        assertFalse(it.hasNext());
-    }
-
-    @Test
-    public void watchCursorForKeys() {
-        byte[] key1 = k(1);
-        byte[] val1_1 = kv(1, 11);
-
-        byte[] key2 = k(2);
-        byte[] val2_1 = kv(2, 21);
-        byte[] val2_2 = kv(2, 22);
-
-        byte[] key3 = k(3);
-        byte[] val3_1 = kv(3, 31);
-        byte[] val3_2 = kv(3, 32);
-
-        assertEquals(0, storage.revision());
-        assertEquals(0, storage.updateCounter());
-
-        Cursor<WatchEvent> cur = storage.watch(List.of(key1, key2), 1);
-
-        Iterator<WatchEvent> it = cur.iterator();
-
-        assertFalse(it.hasNext());
-        assertNull(it.next());
-
-        storage.putAll(List.of(key1, key2, key3), List.of(val1_1, val2_1, val3_1));
-
-        assertEquals(1, storage.revision());
-        assertEquals(3, storage.updateCounter());
-
-        assertTrue(it.hasNext());
-
-        WatchEvent watchEvent = it.next();
-
-        assertFalse(watchEvent.single());
-
-        assertFalse(it.hasNext());
-
-        storage.put(key2, val2_2);
-
-        assertTrue(it.hasNext());
-
-        watchEvent = it.next();
-
-        assertTrue(watchEvent.single());
-
-        assertFalse(it.hasNext());
-
-        storage.put(key3, val3_2);
-
-        assertFalse(it.hasNext());
-    }
-
-    /** */
-    private static void fill(KeyValueStorage storage, int keySuffix, int num) {
-        for (int i = 0; i < num; i++)
-            storage.getAndPut(k(keySuffix), kv(keySuffix, i + 1));
-    }
-
-    /** */
-    private static byte[] k(int k) {
-        return ("key" + k).getBytes();
-    }
-
-    /** */
-    private static byte[] kv(int k, int v) {
-        return ("key" + k + '_' + "val" + v).getBytes();
+class SimpleInMemoryKeyValueStorageTest extends AbstractKeyValueStorageTest {
+    /** {@inheritDoc} */
+    @Override KeyValueStorage storage() {
+        return new SimpleInMemoryKeyValueStorage();
     }
 }
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
index 9a09282..e186fdf 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.raft.client.service;
 
+import java.nio.file.Path;
 import java.util.Iterator;
 import java.util.function.Consumer;
 import org.apache.ignite.raft.client.ReadCommand;
@@ -56,7 +57,7 @@ public interface RaftGroupListener {
      * @param doneClo The closure to call on finish. Pass the not null exception if the snapshot has not been created or
      *                null on successful creation.
      */
-    public void onSnapshotSave(String path, Consumer<Throwable> doneClo);
+    public void onSnapshotSave(Path path, Consumer<Throwable> doneClo);
 
     /**
      * The callback to load a snapshot.
@@ -64,5 +65,10 @@ public interface RaftGroupListener {
      * @param path Snapshot directory.
      * @return {@code True} if the snapshot was loaded successfully.
      */
-    boolean onSnapshotLoad(String path);
+    boolean onSnapshotLoad(Path path);
+
+    /**
+     * Invoked once after a raft node has been shut down.
+     */
+    void onShutdown();
 }
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
index c692a66..c0a856e 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/CounterListener.java
@@ -19,6 +19,7 @@ package org.apache.ignite.raft.server;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Iterator;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
@@ -73,7 +74,7 @@ public class CounterListener implements RaftGroupListener {
     }
 
     /** {@inheritDoc} */
-    @Override public void onSnapshotSave(String path, Consumer<Throwable> doneClo) {
+    @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
         final long currVal = this.counter.get();
 
         Utils.runInThread(executor, () -> {
@@ -90,8 +91,9 @@ public class CounterListener implements RaftGroupListener {
         });
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean onSnapshotLoad(String path) {
+    /** {@inheritDoc}
+     * @param path*/
+    @Override public boolean onSnapshotLoad(Path path) {
         final CounterSnapshotFile snapshot = new CounterSnapshotFile(path + File.separator + "data");
         try {
             this.counter.set(snapshot.load());
@@ -103,6 +105,11 @@ public class CounterListener implements RaftGroupListener {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public void onShutdown() {
+        // No-op.
+    }
+
     /**
      * @return Current value.
      */
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
index c42fb37..a1c719a 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITJRaftCounterServerTest.java
@@ -311,7 +311,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
     @Test
     public void testCreateSnapshotGracefulFailure() throws Exception {
         listenerFactory = () -> new CounterListener() {
-            @Override public void onSnapshotSave(String path, Consumer<Throwable> doneClo) {
+            @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
                 doneClo.accept(new IgniteInternalException("Very bad"));
             }
         };
@@ -345,7 +345,7 @@ class ITJRaftCounterServerTest extends RaftServerAbstractTest {
     @Test
     public void testCreateSnapshotAbnormalFailure() throws Exception {
         listenerFactory = () -> new CounterListener() {
-            @Override public void onSnapshotSave(String path, Consumer<Throwable> doneClo) {
+            @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
                 doneClo.accept(new IgniteInternalException("Very bad"));
             }
         };
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
index 2dfc7a4..5d46122 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JRaftServerImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.raft.server.impl;
 import java.io.File;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.nio.file.Path;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -278,7 +279,7 @@ public class JRaftServerImpl implements RaftServer {
         /** {@inheritDoc} */
         @Override public void onSnapshotSave(SnapshotWriter writer, Closure done) {
             try {
-                listener.onSnapshotSave(writer.getPath(), res -> {
+                listener.onSnapshotSave(Path.of(writer.getPath()), res -> {
                     if (res == null) {
                         File file = new File(writer.getPath());
 
@@ -302,7 +303,12 @@ public class JRaftServerImpl implements RaftServer {
 
         /** {@inheritDoc} */
         @Override public boolean onSnapshotLoad(SnapshotReader reader) {
-            return listener.onSnapshotLoad(reader.getPath());
+            return listener.onSnapshotLoad(Path.of(reader.getPath()));
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onShutdown() {
+            listener.onShutdown();
         }
     }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 34141e8..4f34d0d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.raft;
 
+import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Objects;
@@ -268,16 +269,21 @@ public class PartitionListener implements RaftGroupListener {
     }
 
     /** {@inheritDoc} */
-    @Override public void onSnapshotSave(String path, Consumer<Throwable> doneClo) {
+    @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
         // Not implemented yet.
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onSnapshotLoad(String path) {
+    @Override public boolean onSnapshotLoad(Path path) {
         // Not implemented yet.
         return false;
     }
 
+    /** {@inheritDoc} */
+    @Override public void onShutdown() {
+        // No-op.
+    }
+
     /**
      * Wrapper provides correct byte[] comparison.
      */