You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/08/10 10:07:10 UTC

[ignite-3] branch main updated: IGNITE-14790 Persistent partition storage based on RocksDB. (#254)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 68d80ef  IGNITE-14790 Persistent partition storage based on RocksDB. (#254)
68d80ef is described below

commit 68d80efcd133d7ab3da71a586977304956df0856
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Tue Aug 10 13:07:03 2021 +0300

    IGNITE-14790 Persistent partition storage based on RocksDB. (#254)
---
 .../apache/ignite/internal/app/IgnitionImpl.java   |  28 +-
 .../apache/ignite/internal/schema/BinaryRow.java   |   5 +
 .../ignite/internal/schema/ByteBufferRow.java      |   5 +
 .../org/apache/ignite/internal/schema/row/Row.java |   4 +
 .../apache/ignite/internal/storage/DataRow.java    |   5 +
 .../ignite/internal/storage/InvokeClosure.java     |  25 +-
 .../apache/ignite/internal/storage/SearchRow.java  |   6 +-
 .../apache/ignite/internal/storage/Storage.java    |  63 ++-
 .../storage/basic/ConcurrentHashMapStorage.java    | 112 ++--
 ...eClosure.java => DeleteExactInvokeClosure.java} |  50 +-
 ...Closure.java => GetAndRemoveInvokeClosure.java} |  46 +-
 .../storage/basic/GetAndReplaceInvokeClosure.java  |  86 ++++
 ...InvokeClosure.java => InsertInvokeClosure.java} |  36 +-
 ...Closure.java => ReplaceExactInvokeClosure.java} |  44 +-
 .../internal/storage/basic/SimpleDataRow.java      |  25 +-
 .../storage/basic/SimpleReadInvokeClosure.java     |   7 +-
 .../storage/basic/SimpleRemoveInvokeClosure.java   |   5 +-
 .../storage/basic/SimpleWriteInvokeClosure.java    |   5 +-
 .../internal/storage/AbstractStorageTest.java      | 567 +++++++++++++++++++--
 .../internal/storage/rocksdb/RocksDbStorage.java   | 175 ++++++-
 modules/table/pom.xml                              |  10 +
 .../ignite/distributed/ITDistributedTableTest.java |  14 +-
 .../internal/table/distributed/TableManager.java   |  32 +-
 .../table/distributed/command/ReplaceCommand.java  |   2 +
 .../table/distributed/raft/PartitionListener.java  | 279 +++++-----
 .../ignite/internal/table/TableManagerTest.java    |  12 +-
 .../raft/PartitionCommandListenerTest.java         |  31 +-
 27 files changed, 1333 insertions(+), 346 deletions(-)

diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index bb633e0..fc559c8 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -81,6 +81,11 @@ public class IgnitionImpl implements Ignition {
      */
     private static final Path VAULT_DB_PATH = Paths.get("vault");
 
+    /**
+     * Path for the partitions persistent storage.
+     */
+    private static final Path PARTITIONS_STORE_PATH = Paths.get("db");
+
     /** */
     private static final String[] BANNER = {
         "",
@@ -324,7 +329,8 @@ public class IgnitionImpl implements Ignition {
                     metaStorageMgr,
                     schemaMgr,
                     affinityMgr,
-                    raftMgr
+                    raftMgr,
+                    getPartitionsStorePath(workDir)
                 )
             );
 
@@ -376,6 +382,26 @@ public class IgnitionImpl implements Ignition {
     }
 
     /**
+     * Returns a path to the partitions store directory.
+     * Creates a directory if it doesn't exist.
+     *
+     * @param workDir Ignite work directory.
+     * @return Partitions store path.
+     */
+    @NotNull
+    private static Path getPartitionsStorePath(Path workDir) {
+        Path partitionsStore = workDir.resolve(PARTITIONS_STORE_PATH);
+
+        try {
+            Files.createDirectories(partitionsStore);
+        } catch (IOException e) {
+            throw new IgniteInternalException("Failed to create directory for partitions storage: " + e.getMessage(), e);
+        }
+
+        return partitionsStore;
+    }
+
+    /**
      * Starts the Vault component.
      */
     private static VaultManager createVault(Path workDir) {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index ba20a16..df32145 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -133,6 +133,11 @@ public interface BinaryRow {
      byte[] readBytes(int off, int len);
 
     /**
+     * @return Byte array of the row.
+     */
+     byte[] bytes();
+
+    /**
      * Row flags.
      */
     final class RowFlags {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index 6041aa3..a2edc42 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -154,4 +154,9 @@ public class ByteBufferRow implements BinaryRow {
             buf.limit(buf.capacity());
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public byte[] bytes() {
+        return buf.array();
+    }
 }
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index ea802e5..5aa4e33 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -665,4 +665,8 @@ public class Row implements BinaryRow {
     @Override public byte[] readBytes(int off, int len) {
         return row.readBytes(off, len);
     }
+
+    @Override public byte[] bytes() {
+        return row.bytes();
+    }
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
index 4373f98..34b1618 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/DataRow.java
@@ -33,4 +33,9 @@ public interface DataRow extends SearchRow {
      * @return Value object as a byte buffer. Allows more effective memory management in certain cases.
      */
     ByteBuffer value();
+
+    /**
+     * @return {@code true} if this row has a value.
+     */
+    boolean hasValueBytes();
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
index 7126069..67dd309 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/InvokeClosure.java
@@ -17,14 +17,31 @@
 
 package org.apache.ignite.internal.storage;
 
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-/** */
-public interface InvokeClosure {
+/**
+ * Closure that performs an operation on the storage.
+ *
+ * @param <T> Type of the invocation's result.
+ */
+public interface InvokeClosure<T> {
+    /**
+     * In this method closure decides what type of operation should be performed on the storage, based on the
+     * current data in the storage passed as an argument.
+     * The result of the operation can be obtained via the {@link #result()} method.
+     *
+     * @param row Old row.
+     */
+    void call(@NotNull DataRow row);
+
     /**
-     * @param row Old row or {@code null} if the old row has not been found.
+     * @return Result of the invocation. Can be {@code null}.
      */
-    void call(@Nullable DataRow row);
+    @Nullable
+    default T result() {
+        return null;
+    }
 
     /**
      * @return New row for the {@link OperationType#WRITE} operation.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
index 74db8d9..ce3ceff 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/SearchRow.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.storage;
 
 import java.nio.ByteBuffer;
-import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Interface to be used as a key representation to search data in storage.
@@ -27,10 +27,10 @@ public interface SearchRow {
     /**
      * @return Key bytes.
      */
-    byte @Nullable [] keyBytes();
+    byte @NotNull [] keyBytes();
 
     /**
      * @return Key object as a byte buffer. Allows more effective memory management in certain cases.
      */
-    @Nullable ByteBuffer key();
+    @NotNull ByteBuffer key();
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
index 9be5a27..0778e74 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/Storage.java
@@ -17,46 +17,97 @@
 
 package org.apache.ignite.internal.storage;
 
+import java.util.Collection;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Interface providing methods to read, remove and update keys in storage.
+ * Any locking is unnecessary as this storage is used within RAFT groups where all write operations are
+ * serialized.
  */
-public interface Storage {
+public interface Storage extends AutoCloseable {
     /**
      * Reads a DataRow for a given key.
      *
      * @param key Search row.
      * @return Data row.
-     * @throws StorageException If failed to read data or storage is already stopped.
+     * @throws StorageException If failed to read the data or the storage is already stopped.
      */
     public DataRow read(SearchRow key) throws StorageException;
 
     /**
-     * Writes a DataRow to the storage.
+     * Reads {@link DataRow}s for a given collection of keys.
+     *
+     * @param keys Search rows.
+     * @return Data rows.
+     * @throws StorageException If failed to read the data or the storage is already stopped.
+     */
+    public Collection<DataRow> readAll(Collection<? extends SearchRow> keys);
+
+    /**
+     * Writes a DataRow into the storage.
      *
      * @param row Data row.
-     * @throws StorageException If failed to read data or storage is already stopped.
+     * @throws StorageException If failed to write the data or the storage is already stopped.
      */
     public void write(DataRow row) throws StorageException;
 
     /**
+     * Writes a collection of {@link DataRow}s into the storage.
+     *
+     * @param rows Data rows.
+     * @throws StorageException If failed to write the data or the storage is already stopped.
+     */
+    public void writeAll(Collection<? extends DataRow> rows) throws StorageException;
+
+    /**
+     * Inserts a collection of {@link DataRow}s into the storage and returns a collection of rows that
+     * can't be inserted due to their keys being already present in the storage.
+     *
+     * @param rows Data rows.
+     * @return Collection of rows that could not be inserted.
+     * @throws StorageException If failed to write the data or the storage is already stopped.
+     */
+    public Collection<DataRow> insertAll(Collection<? extends DataRow> rows) throws StorageException;
+
+    /**
      * Removes a DataRow associated with a given Key.
      *
      * @param key Search row.
-     * @throws StorageException If failed to read data or storage is already stopped.
+     * @throws StorageException If failed to remove the data or the storage is already stopped.
      */
     public void remove(SearchRow key) throws StorageException;
 
     /**
+     * Removes {@link DataRow}s mapped by given keys.
+     *
+     * @param keys Search rows.
+     * @return List of removed data rows.
+     * @throws StorageException If failed to remove the data or the storage is already stopped.
+     */
+    public Collection<DataRow> removeAll(Collection<? extends SearchRow> keys);
+
+    /**
+     * Removes {@link DataRow}s mapped by given keys and containing given values.
+     *
+     * @param keyValues Data rows.
+     * @return List of removed data rows.
+     * @throws StorageException If failed to remove the data or the storage is already stopped.
+     */
+    public Collection<DataRow> removeAllExact(Collection<? extends DataRow> keyValues);
+
+    /**
      * Executes an update with custom logic implemented by storage.UpdateClosure interface.
      *
      * @param key Search key.
      * @param clo Invoke closure.
+     * @param <T> Closure invocation's result type.
      * @throws StorageException If failed to read data or storage is already stopped.
      */
-    public void invoke(SearchRow key, InvokeClosure clo) throws StorageException;
+    @Nullable
+    public <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException;
 
     /**
      * Creates cursor over the storage data.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
index e462653..9b8d820 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ConcurrentHashMapStorage.java
@@ -17,12 +17,14 @@
 
 package org.apache.ignite.internal.storage.basic;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.storage.DataRow;
 import org.apache.ignite.internal.storage.InvokeClosure;
 import org.apache.ignite.internal.storage.SearchRow;
@@ -31,6 +33,7 @@ import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.ByteArray;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Storage implementation based on {@link ConcurrentHashMap}.
@@ -39,9 +42,6 @@ public class ConcurrentHashMapStorage implements Storage {
     /** Storage content. */
     private final ConcurrentMap<ByteArray, byte[]> map = new ConcurrentHashMap<>();
 
-    /** RW lock. */
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
     /** {@inheritDoc} */
     @Override public DataRow read(SearchRow key) throws StorageException {
         byte[] keyBytes = key.keyBytes();
@@ -52,60 +52,91 @@ public class ConcurrentHashMapStorage implements Storage {
     }
 
     /** {@inheritDoc} */
+    @Override public Collection<DataRow> readAll(Collection<? extends SearchRow> keys) {
+        return keys.stream()
+            .map(SearchRow::keyBytes)
+            .map(key -> new SimpleDataRow(key, map.get(new ByteArray(key))))
+            .collect(Collectors.toList());
+    }
+
+    /** {@inheritDoc} */
     @Override public void write(DataRow row) throws StorageException {
-        rwLock.readLock().lock();
+        map.put(new ByteArray(row.keyBytes()), row.valueBytes());
+    }
 
-        try {
-            map.put(new ByteArray(row.keyBytes()), row.valueBytes());
-        }
-        finally {
-            rwLock.readLock().unlock();
-        }
+    /** {@inheritDoc} */
+    @Override public void writeAll(Collection<? extends DataRow> rows) throws StorageException {
+        rows.forEach(row -> map.put(new ByteArray(row.keyBytes()), row.valueBytes()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> insertAll(Collection<? extends DataRow> rows) throws StorageException {
+        return rows.stream()
+            .map(row -> map.putIfAbsent(new ByteArray(row.keyBytes()), row.valueBytes()) == null ? null : row)
+            .filter(Objects::nonNull)
+            .collect(Collectors.toList());
     }
 
     /** {@inheritDoc} */
     @Override public void remove(SearchRow key) throws StorageException {
-        rwLock.readLock().lock();
+        map.remove(new ByteArray(key.keyBytes()));
+    }
 
-        try {
-            map.remove(new ByteArray(key.keyBytes()));
-        }
-        finally {
-            rwLock.readLock().unlock();
-        }
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> removeAll(Collection<? extends SearchRow> keys) {
+        return keys.stream()
+            .map(SearchRow::keyBytes)
+            .map(key -> new SimpleDataRow(key, map.remove(new ByteArray(key))))
+            .filter(SimpleDataRow::hasValueBytes)
+            .collect(Collectors.toList());
     }
 
     /** {@inheritDoc} */
-    @Override public void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
+    @Override public Collection<DataRow> removeAllExact(Collection<? extends DataRow> keyValues) {
+        return keyValues.stream()
+            .filter(kv -> {
+                ByteArray key = new ByteArray(kv.keyBytes());
+
+                byte[] currentValue = map.get(key);
+
+                if (Arrays.equals(currentValue, kv.valueBytes())) {
+                    map.remove(key);
+
+                    return true;
+                }
+
+                return false;
+            })
+            .collect(Collectors.toList());
+    }
+
+    /** {@inheritDoc} */
+    @Nullable
+    @Override public <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException {
         byte[] keyBytes = key.keyBytes();
 
         ByteArray mapKey = new ByteArray(keyBytes);
 
-        rwLock.writeLock().lock();
+        byte[] existingDataBytes = map.get(mapKey);
 
-        try {
-            byte[] existingDataBytes = map.get(mapKey);
+        clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
 
-            clo.call(new SimpleDataRow(keyBytes, existingDataBytes));
+        switch (clo.operationType()) {
+            case WRITE:
+                map.put(mapKey, clo.newRow().valueBytes());
 
-            switch (clo.operationType()) {
-                case WRITE:
-                    map.put(mapKey, clo.newRow().valueBytes());
+                break;
 
-                    break;
+            case REMOVE:
+                map.remove(mapKey);
 
-                case REMOVE:
-                    map.remove(mapKey);
+                break;
 
-                    break;
-
-                case NOOP:
-                    break;
-            }
-        }
-        finally {
-            rwLock.writeLock().unlock();
+            case NOOP:
+                break;
         }
+
+        return clo.result();
     }
 
     /** {@inheritDoc} */
@@ -137,4 +168,9 @@ public class ConcurrentHashMapStorage implements Storage {
             }
         };
     }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        // No-op.
+    }
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/DeleteExactInvokeClosure.java
similarity index 55%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/DeleteExactInvokeClosure.java
index 71f3b79..97e73d4 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/DeleteExactInvokeClosure.java
@@ -17,41 +17,51 @@
 
 package org.apache.ignite.internal.storage.basic;
 
+import java.util.Arrays;
 import org.apache.ignite.internal.storage.DataRow;
 import org.apache.ignite.internal.storage.InvokeClosure;
 import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-/** Invoke closure implementation for read operation. */
-public class SimpleReadInvokeClosure implements InvokeClosure {
-    /** Copy of the row that was passed to {@link #call(DataRow)} method. */
-    @Nullable
-    private DataRow row;
+/**
+ * Closure that deletes a specific data row with a given key and a given value.
+ */
+public class DeleteExactInvokeClosure implements InvokeClosure<Boolean> {
+    /** Row to delete. */
+    @NotNull
+    private final DataRow row;
+
+    /** {@code true} if can delete, {@code false} otherwise. */
+    private boolean deletes = false;
+
+    /**
+     * Constructor.
+     *
+     * @param row Row to delete.
+     */
+    public DeleteExactInvokeClosure(@NotNull DataRow row) {
+        this.row = row;
+    }
 
     /** {@inheritDoc} */
-    @Override public void call(@Nullable DataRow row) {
-        this.row = row == null ? null : new SimpleDataRow(row.keyBytes(), row.valueBytes());
+    @Override public void call(@NotNull DataRow row) {
+        deletes = row.hasValueBytes() && Arrays.equals(this.row.valueBytes(), row.valueBytes());
     }
 
     /** {@inheritDoc} */
-    @Nullable
-    @Override public DataRow newRow() {
+    @Override public @Nullable DataRow newRow() {
         return null;
     }
 
     /** {@inheritDoc} */
-    @Nullable
-    @Override public OperationType operationType() {
-        return OperationType.NOOP;
+    @Override public @Nullable OperationType operationType() {
+        return deletes ? OperationType.REMOVE : OperationType.NOOP;
     }
 
-    /**
-     * Copy of the row that was passed to {@link #call(DataRow)} method.
-     *
-     * @return Copy of data row.
-     */
-    @Nullable
-    public DataRow row() {
-        return row;
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Boolean result() {
+        return deletes;
     }
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/GetAndRemoveInvokeClosure.java
similarity index 56%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/GetAndRemoveInvokeClosure.java
index 71f3b79..b40e9d7 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/GetAndRemoveInvokeClosure.java
@@ -20,38 +20,50 @@ package org.apache.ignite.internal.storage.basic;
 import org.apache.ignite.internal.storage.DataRow;
 import org.apache.ignite.internal.storage.InvokeClosure;
 import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-/** Invoke closure implementation for read operation. */
-public class SimpleReadInvokeClosure implements InvokeClosure {
-    /** Copy of the row that was passed to {@link #call(DataRow)} method. */
+/**
+ * Closure that removes a data row with a given key and returns it.
+ */
+public class GetAndRemoveInvokeClosure implements InvokeClosure<Boolean> {
+    /** Row that will be removed. */
     @Nullable
-    private DataRow row;
+    private DataRow rowToRemove;
+
+    /** {@code true} if can delete, {@code false} otherwise. */
+    private boolean deletes;
 
     /** {@inheritDoc} */
-    @Override public void call(@Nullable DataRow row) {
-        this.row = row == null ? null : new SimpleDataRow(row.keyBytes(), row.valueBytes());
+    @Override public void call(@NotNull DataRow row) {
+        this.rowToRemove = row;
+
+        this.deletes = rowToRemove.hasValueBytes();
     }
 
     /** {@inheritDoc} */
-    @Nullable
-    @Override public DataRow newRow() {
+    @Override public @Nullable DataRow newRow() {
         return null;
     }
 
     /** {@inheritDoc} */
-    @Nullable
-    @Override public OperationType operationType() {
-        return OperationType.NOOP;
+    @Override public @Nullable OperationType operationType() {
+        return deletes ? OperationType.REMOVE : OperationType.NOOP;
     }
 
     /**
-     * Copy of the row that was passed to {@link #call(DataRow)} method.
-     *
-     * @return Copy of data row.
+     * @return Row that is removed.
      */
-    @Nullable
-    public DataRow row() {
-        return row;
+    @NotNull
+    public DataRow oldRow() {
+        assert rowToRemove != null;
+
+        return rowToRemove;
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Boolean result() {
+        return deletes;
     }
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/GetAndReplaceInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/GetAndReplaceInvokeClosure.java
new file mode 100644
index 0000000..e3b18d3
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/GetAndReplaceInvokeClosure.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.basic;
+
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.InvokeClosure;
+import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Closure that replaces a data row with a given key and returns it.
+ */
+public class GetAndReplaceInvokeClosure implements InvokeClosure<Boolean> {
+    /** New row. */
+    @NotNull
+    private final DataRow newRow;
+
+    /** {@code true} if this closure should insert a new row only if a previous value exists. */
+    private final boolean onlyIfExists;
+
+    /** Previous data row. */
+    private DataRow oldRow;
+
+    /** {@code true} if this closure replaces a row, {@code false} otherwise. */
+    private boolean replaces;
+
+    /**
+     * Constructor.
+     *
+     * @param newRow New row.
+     * @param onlyIfExists Whether to insert a new row only if a previous one exists.
+     */
+    public GetAndReplaceInvokeClosure(@NotNull DataRow newRow, boolean onlyIfExists) {
+        this.newRow = newRow;
+        this.onlyIfExists = onlyIfExists;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void call(@NotNull DataRow row) {
+        oldRow = row;
+
+        replaces = row.hasValueBytes() || !onlyIfExists;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable DataRow newRow() {
+        return newRow;
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable OperationType operationType() {
+        return replaces ? OperationType.WRITE : OperationType.NOOP;
+    }
+
+    /**
+     * @return Previous data row.
+     */
+    @NotNull
+    public DataRow oldRow() {
+        assert oldRow != null;
+
+        return oldRow;
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Boolean result() {
+        return replaces;
+    }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/InsertInvokeClosure.java
similarity index 61%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/InsertInvokeClosure.java
index 4749511..ee8dcbd 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/InsertInvokeClosure.java
@@ -20,33 +20,47 @@ package org.apache.ignite.internal.storage.basic;
 import org.apache.ignite.internal.storage.DataRow;
 import org.apache.ignite.internal.storage.InvokeClosure;
 import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-/** Invoke closure implementation for a write operation. */
-public class SimpleWriteInvokeClosure implements InvokeClosure {
-    /** Data row to write into storage. */
+/**
+ * Closure that inserts a new data row.
+ */
+public class InsertInvokeClosure implements InvokeClosure<Boolean> {
+    /** New row. */
+    @NotNull
     private final DataRow newRow;
 
+    /** {@code true} if this closure inserts a new row, {@code false} otherwise. */
+    private boolean inserts = false;
+
     /**
-     * @param newRow Data row to write into the storage.
+     * Constructor.
+     *
+     * @param newRow New row.
      */
-    public SimpleWriteInvokeClosure(DataRow newRow) {
+    public InsertInvokeClosure(@NotNull DataRow newRow) {
         this.newRow = newRow;
     }
 
     /** {@inheritDoc} */
-    @Override public void call(@Nullable DataRow row) {
+    @Override public void call(@NotNull DataRow row) {
+        inserts = !row.hasValueBytes();
     }
 
     /** {@inheritDoc} */
-    @Nullable
-    @Override public DataRow newRow() {
+    @Override public @Nullable DataRow newRow() {
         return newRow;
     }
 
     /** {@inheritDoc} */
-    @Nullable
-    @Override public OperationType operationType() {
-        return OperationType.WRITE;
+    @Override public @Nullable OperationType operationType() {
+        return inserts ? OperationType.WRITE : OperationType.NOOP;
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Boolean result() {
+        return inserts;
     }
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ReplaceExactInvokeClosure.java
similarity index 51%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ReplaceExactInvokeClosure.java
index 4749511..7b5905e 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/ReplaceExactInvokeClosure.java
@@ -17,36 +17,58 @@
 
 package org.apache.ignite.internal.storage.basic;
 
+import java.util.Arrays;
 import org.apache.ignite.internal.storage.DataRow;
 import org.apache.ignite.internal.storage.InvokeClosure;
 import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-/** Invoke closure implementation for a write operation. */
-public class SimpleWriteInvokeClosure implements InvokeClosure {
-    /** Data row to write into storage. */
+/**
+ * Closure that replaces an exact row with a specified key and a specified value.
+ */
+public class ReplaceExactInvokeClosure implements InvokeClosure<Boolean> {
+    /** Expected data row. */
+    @NotNull
+    private final DataRow expectedRow;
+
+    /** New data row. */
+    @NotNull
     private final DataRow newRow;
 
+    /** {@code true} if this closure replaces a row, {@code false} otherwise. */
+    private boolean replaces;
+
     /**
-     * @param newRow Data row to write into the storage.
+     * Constructor.
+     *
+     * @param expectedRow Expected row.
+     * @param newRow New row.
      */
-    public SimpleWriteInvokeClosure(DataRow newRow) {
+    public ReplaceExactInvokeClosure(@NotNull DataRow expectedRow, @NotNull DataRow newRow) {
+        this.expectedRow = expectedRow;
         this.newRow = newRow;
     }
 
     /** {@inheritDoc} */
-    @Override public void call(@Nullable DataRow row) {
+    @Override public void call(@NotNull DataRow row) {
+        replaces = (!row.hasValueBytes() && !expectedRow.hasValueBytes())
+            || (Arrays.equals(row.valueBytes(), expectedRow.valueBytes()));
     }
 
     /** {@inheritDoc} */
-    @Nullable
-    @Override public DataRow newRow() {
+    @Override public @Nullable DataRow newRow() {
         return newRow;
     }
 
     /** {@inheritDoc} */
-    @Nullable
-    @Override public OperationType operationType() {
-        return OperationType.WRITE;
+    @Override public @Nullable OperationType operationType() {
+        return replaces ? OperationType.WRITE : OperationType.NOOP;
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public Boolean result() {
+        return replaces;
     }
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
index f90ea1d..bda70b1 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleDataRow.java
@@ -18,7 +18,9 @@
 package org.apache.ignite.internal.storage.basic;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import org.apache.ignite.internal.storage.DataRow;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -37,12 +39,13 @@ public class SimpleDataRow implements DataRow {
     }
 
     /** {@inheritDoc} */
+    @NotNull
     @Override public ByteBuffer key() {
         return ByteBuffer.wrap(key);
     }
 
     /** {@inheritDoc} */
-    @Override public byte[] keyBytes() {
+    @Override public byte @NotNull [] keyBytes() {
         return key;
     }
 
@@ -56,4 +59,24 @@ public class SimpleDataRow implements DataRow {
     @Override public byte @Nullable [] valueBytes() {
         return value;
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasValueBytes() {
+        return value != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        SimpleDataRow row = (SimpleDataRow) o;
+        return Arrays.equals(key, row.key) && Arrays.equals(value, row.value);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int result = Arrays.hashCode(key);
+        result = 31 * result + Arrays.hashCode(value);
+        return result;
+    }
 }
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
index 71f3b79..e77cf52 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleReadInvokeClosure.java
@@ -20,17 +20,18 @@ package org.apache.ignite.internal.storage.basic;
 import org.apache.ignite.internal.storage.DataRow;
 import org.apache.ignite.internal.storage.InvokeClosure;
 import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /** Invoke closure implementation for read operation. */
-public class SimpleReadInvokeClosure implements InvokeClosure {
+public class SimpleReadInvokeClosure implements InvokeClosure<Void> {
     /** Copy of the row that was passed to {@link #call(DataRow)} method. */
     @Nullable
     private DataRow row;
 
     /** {@inheritDoc} */
-    @Override public void call(@Nullable DataRow row) {
-        this.row = row == null ? null : new SimpleDataRow(row.keyBytes(), row.valueBytes());
+    @Override public void call(@NotNull DataRow row) {
+        this.row = new SimpleDataRow(row.keyBytes(), row.valueBytes());
     }
 
     /** {@inheritDoc} */
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java
index abb3db8..dc6f80d 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleRemoveInvokeClosure.java
@@ -20,12 +20,13 @@ package org.apache.ignite.internal.storage.basic;
 import org.apache.ignite.internal.storage.DataRow;
 import org.apache.ignite.internal.storage.InvokeClosure;
 import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /** Invoke closure implementation for a remove operation. */
-public class SimpleRemoveInvokeClosure implements InvokeClosure {
+public class SimpleRemoveInvokeClosure implements InvokeClosure<Void> {
     /** {@inheritDoc} */
-    @Override public void call(@Nullable DataRow row) {
+    @Override public void call(@NotNull DataRow row) {
     }
 
     /** {@inheritDoc} */
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
index 4749511..87fa289 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/basic/SimpleWriteInvokeClosure.java
@@ -20,10 +20,11 @@ package org.apache.ignite.internal.storage.basic;
 import org.apache.ignite.internal.storage.DataRow;
 import org.apache.ignite.internal.storage.InvokeClosure;
 import org.apache.ignite.internal.storage.OperationType;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /** Invoke closure implementation for a write operation. */
-public class SimpleWriteInvokeClosure implements InvokeClosure {
+public class SimpleWriteInvokeClosure implements InvokeClosure<Void> {
     /** Data row to write into storage. */
     private final DataRow newRow;
 
@@ -35,7 +36,7 @@ public class SimpleWriteInvokeClosure implements InvokeClosure {
     }
 
     /** {@inheritDoc} */
-    @Override public void call(@Nullable DataRow row) {
+    @Override public void call(@NotNull DataRow row) {
     }
 
     /** {@inheritDoc} */
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
index f225b58..1218d92 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/AbstractStorageTest.java
@@ -18,9 +18,21 @@
 package org.apache.ignite.internal.storage;
 
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.storage.basic.DeleteExactInvokeClosure;
+import org.apache.ignite.internal.storage.basic.GetAndRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.GetAndReplaceInvokeClosure;
+import org.apache.ignite.internal.storage.basic.InsertInvokeClosure;
+import org.apache.ignite.internal.storage.basic.ReplaceExactInvokeClosure;
 import org.apache.ignite.internal.storage.basic.SimpleDataRow;
 import org.apache.ignite.internal.storage.basic.SimpleReadInvokeClosure;
 import org.apache.ignite.internal.storage.basic.SimpleRemoveInvokeClosure;
@@ -34,6 +46,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -41,48 +55,25 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  * Abstract test that covers basic scenarios of the storage API.
  */
 public abstract class AbstractStorageTest {
-    /** Storage instance. */
-    protected Storage storage;
+    /** Test key. */
+    private static final String KEY = "key";
 
-    /**
-     * Wraps string key into a search row.
-     *
-     * @param key String key.
-     * @return Search row.
-     */
-    private SearchRow searchRow(String key) {
-        return new SimpleDataRow(
-            key.getBytes(StandardCharsets.UTF_8),
-            null
-        );
-    }
+    /** Test value. */
+    private static final String VALUE = "value";
 
-    /**
-     * Wraps string key/value pair into a data row.
-     *
-     * @param key String key.
-     * @param value String value.
-     * @return Data row.
-     */
-    private DataRow dataRow(String key, String value) {
-        return new SimpleDataRow(
-            key.getBytes(StandardCharsets.UTF_8),
-            value.getBytes(StandardCharsets.UTF_8)
-        );
-    }
+    /** Storage instance. */
+    protected Storage storage;
 
     /**
      * Tests that read / write / remove work consistently on the same key.
-     *
-     * @throws Exception If failed.
      */
     @Test
-    public void readWriteRemove() throws Exception {
-        SearchRow searchRow = searchRow("key");
+    public void readWriteRemove() {
+        SearchRow searchRow = searchRow(KEY);
 
         assertNull(storage.read(searchRow).value());
 
-        DataRow dataRow = dataRow("key", "value");
+        DataRow dataRow = dataRow(KEY, VALUE);
 
         storage.write(dataRow);
 
@@ -96,12 +87,10 @@ public abstract class AbstractStorageTest {
     /**
      * Tests that invoke method works consistently with default read / write / remove closures implementations on the
      *      same key.
-     *
-     * @throws Exception If failed.
      */
     @Test
-    public void invoke() throws Exception {
-        SearchRow searchRow = searchRow("key");
+    public void invoke() {
+        SearchRow searchRow = searchRow(KEY);
 
         SimpleReadInvokeClosure readClosure = new SimpleReadInvokeClosure();
 
@@ -109,7 +98,7 @@ public abstract class AbstractStorageTest {
 
         assertNull(readClosure.row().value());
 
-        DataRow dataRow = dataRow("key", "value");
+        DataRow dataRow = dataRow(KEY, VALUE);
 
         storage.invoke(searchRow, new SimpleWriteInvokeClosure(dataRow));
 
@@ -189,6 +178,510 @@ public abstract class AbstractStorageTest {
     }
 
     /**
+     * Tests that {@link InsertInvokeClosure} inserts a data row if there is no existing data row with the same key.
+     */
+    @Test
+    public void testInsertClosure() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        var closure = new InsertInvokeClosure(dataRow);
+
+        storage.invoke(dataRow, closure);
+
+        assertTrue(closure.result());
+
+        checkHasSameEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link InsertInvokeClosure} doesn't insert a data row if there is an existing data row with the same key.
+     */
+    @Test
+    public void testInsertClosure_failureBranch() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        var closure = new InsertInvokeClosure(dataRow);
+
+        storage.invoke(dataRow, closure);
+
+        assertTrue(closure.result());
+
+        DataRow sameKeyRow = dataRow(KEY, "test");
+
+        var sameClosure = new InsertInvokeClosure(sameKeyRow);
+
+        storage.invoke(sameKeyRow, sameClosure);
+
+        assertFalse(sameClosure.result());
+
+        checkHasSameEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link DeleteExactInvokeClosure} deletes a data row if a key and a value matches the ones passed
+     * in the closure.
+     */
+    @Test
+    public void testDeleteExactClosure() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        var closure = new DeleteExactInvokeClosure(dataRow);
+
+        storage.invoke(dataRow, closure);
+
+        assertTrue(closure.result());
+
+        checkHasNoEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link DeleteExactInvokeClosure} doesn't delete a data row if a key and a value don't match
+     * the ones passed in the closure.
+     */
+    @Test
+    public void testDeleteExactClosure_failureBranch() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        var closure = new DeleteExactInvokeClosure(dataRow(KEY, "test"));
+
+        storage.invoke(dataRow, closure);
+
+        assertFalse(closure.result());
+
+        checkHasSameEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link GetAndRemoveInvokeClosure} successfully retrieves and removes a data row.
+     */
+    @Test
+    public void testGetAndRemoveClosure() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        var closure = new GetAndRemoveInvokeClosure();
+
+        storage.invoke(dataRow, closure);
+
+        assertTrue(closure.result());
+
+        checkRowsEqual(dataRow, closure.oldRow());
+
+        checkHasNoEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link GetAndRemoveInvokeClosure} doesn't retrieve and remove a data row if it doesn't exist.
+     */
+    @Test
+    public void testGetAndRemoveClosure_failureBranch() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        var closure = new GetAndRemoveInvokeClosure();
+
+        storage.invoke(searchRow("test"), closure);
+
+        assertFalse(closure.result());
+
+        assertNull(closure.oldRow().valueBytes());
+
+        checkHasSameEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to
+     * {@code false} retrieves and replaces the existing entry in the storage.
+     */
+    @Test
+    public void testGetAndReplaceClosureIfExistsFalse_entryExists() {
+        String newValue = "newValue";
+
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        DataRow newRow = dataRow(KEY, newValue);
+
+        var closure = new GetAndReplaceInvokeClosure(newRow, false);
+
+        storage.invoke(dataRow, closure);
+
+        DataRow replaced = closure.oldRow();
+
+        assertNotNull(replaced);
+
+        assertTrue(closure.result());
+
+        checkRowsEqual(dataRow, replaced);
+
+        checkHasDifferentEntry(dataRow);
+        checkHasSameEntry(newRow);
+    }
+
+    /**
+     * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to
+     * {@code false} successfully inserts a new data row and returns an empty row if a previous row with the same key
+     * doesn't exist .
+     */
+    @Test
+    public void testGetAndReplaceClosureIfExistsFalse_entryNotExists() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        var closure = new GetAndReplaceInvokeClosure(dataRow, false);
+
+        storage.invoke(dataRow, closure);
+
+        DataRow replaced = closure.oldRow();
+
+        assertNotNull(replaced);
+
+        assertTrue(closure.result());
+
+        assertFalse(replaced.hasValueBytes());
+
+        checkHasSameEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to
+     * {@code true} retrieves and replaces the existing entry in the storage.
+     */
+    @Test
+    public void testGetAndReplaceClosureIfExistsTrue_entryExists() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        DataRow newRow = dataRow(KEY, "test");
+
+        var closure = new GetAndReplaceInvokeClosure(newRow, true);
+
+        storage.invoke(dataRow, closure);
+
+        DataRow replaced = closure.oldRow();
+
+        assertNotNull(replaced);
+
+        assertTrue(closure.result());
+
+        assertTrue(replaced.hasValueBytes());
+
+        checkHasDifferentEntry(dataRow);
+        checkHasSameEntry(newRow);
+    }
+
+    /**
+     * Tests that {@link GetAndReplaceInvokeClosure} with the {@link GetAndReplaceInvokeClosure#onlyIfExists} set to
+     * {@code true} doesn't insert a new entry if a previous one doesn't exist.
+     */
+    @Test
+    public void testGetAndReplaceClosureIfExistsTrue_entryNotExists() {
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        var closure = new GetAndReplaceInvokeClosure(dataRow, true);
+
+        storage.invoke(dataRow, closure);
+
+        DataRow replaced = closure.oldRow();
+
+        assertNotNull(replaced);
+
+        assertFalse(closure.result());
+
+        assertFalse(replaced.hasValueBytes());
+
+        checkHasNoEntry(dataRow);
+    }
+
+    /**
+     * Tests that {@link ReplaceExactInvokeClosure} replaces the data row with a given key and a given value
+     * in the storage.
+     */
+    @Test
+    public void testReplaceExactClosure() {
+        String newValue = "newValue";
+
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        DataRow newRow = dataRow(KEY, newValue);
+
+        var closure = new ReplaceExactInvokeClosure(dataRow, newRow);
+
+        storage.invoke(dataRow, closure);
+
+        assertTrue(closure.result());
+
+        checkHasDifferentEntry(dataRow);
+        checkHasSameEntry(newRow);
+    }
+
+    /**
+     * Tests that {@link ReplaceExactInvokeClosure} doesn't replace the data row with a given key and a given value
+     * if the value in the storage doesn't match the value passed via the closure.
+     */
+    @Test
+    public void testReplaceExactClosure_failureBranch() {
+        String newValue = "newValue";
+
+        DataRow dataRow = dataRow(KEY, VALUE);
+
+        storage.write(dataRow);
+
+        checkHasSameEntry(dataRow);
+
+        DataRow newRow = dataRow(KEY, newValue);
+
+        var closure = new ReplaceExactInvokeClosure(newRow, dataRow);
+
+        storage.invoke(dataRow, closure);
+
+        assertFalse(closure.result());
+
+        checkHasSameEntry(dataRow);
+    }
+
+    /**
+     * Tests the {@link Storage#readAll(Collection)} operation successfully reads data rows from the storage.
+     */
+    @Test
+    public void testReadAll() {
+        List<DataRow> rows = insertBulk(100);
+
+        List<DataRow> rowsFromStorage = new ArrayList<>(storage.readAll(rows));
+
+        Comparator<DataRow> comparator = Comparator.comparing(DataRow::keyBytes, Arrays::compare)
+            .thenComparing(DataRow::valueBytes, Arrays::compare);
+
+        rows.sort(comparator);
+        rowsFromStorage.sort(comparator);
+
+        assertEquals(rows, rowsFromStorage);
+    }
+
+    /**
+     * Tests that {@link Storage#writeAll(Collection)} operation successfully writes a collection of data rows into the
+     * storage.
+     */
+    @Test
+    public void testWriteAll() {
+        List<DataRow> rows = IntStream.range(0, 100)
+            .mapToObj(i -> dataRow(KEY + i, VALUE + i))
+            .collect(Collectors.toList());
+
+        storage.writeAll(rows);
+        rows.forEach(this::checkHasSameEntry);
+    }
+
+    /**
+     * Tests that {@link Storage#insertAll(Collection)} operation doesn't insert data rows which keys
+     * are already present in the storage. This operation must also return the list of such data rows.
+     */
+    @Test
+    public void testInsertAll() {
+        List<DataRow> rows = insertBulk(100);
+
+        List<DataRow> oldRows = rows.subList(0, 50);
+
+        List<DataRow> newInsertion = Stream.concat(
+            oldRows.stream(),
+            IntStream.range(100, 150).mapToObj(i -> dataRow(KEY + "_" + i, VALUE + "_" + i))
+        ).collect(Collectors.toList());
+
+        Collection<DataRow> cantInsert = storage.insertAll(newInsertion);
+
+        assertEquals(oldRows, cantInsert);
+    }
+
+    /**
+     * Tests that {@link Storage#removeAll(Collection)} operation successfully retrieves and removes a collection of
+     * {@link SearchRow}s.
+     */
+    @Test
+    public void testRemoveAll() throws Exception {
+        List<DataRow> rows = insertBulk(100);
+
+        Collection<DataRow> removed = storage.removeAll(
+            rows.stream().map(r -> new SimpleDataRow(r.keyBytes(), null)).collect(Collectors.toList())
+        );
+
+        assertEquals(rows, removed);
+
+        Cursor<DataRow> scan = storage.scan(row -> true);
+
+        assertFalse(scan.hasNext());
+
+        scan.close();
+    }
+
+    @Test
+    public void testRemoveAllKeyNotExists() throws Exception {
+        Collection<DataRow> removed = storage.removeAll(Collections.singleton(searchRow(KEY)));
+
+        assertNotNull(removed);
+
+        assertTrue(removed.isEmpty());
+    }
+
+    /**
+     * Tests that {@link Storage#removeAllExact(Collection)} operation successfully removes and retrieves a collection
+     * of data rows with the given exact keys and values from the storage.
+     */
+    @Test
+    public void testRemoveAllExact() throws Exception {
+        List<DataRow> rows = insertBulk(100);
+
+        Collection<DataRow> removed = storage.removeAllExact(rows);
+
+        assertEquals(rows, removed);
+
+        Cursor<DataRow> scan = storage.scan(row -> true);
+
+        assertFalse(scan.hasNext());
+
+        scan.close();
+    }
+
+    /**
+     * Tests that {@link Storage#removeAllExact(Collection)} operation doesn't remove and retrieve a collection
+     * of data rows with the given exact keys and values from the storage if the value in the storage doesn't match
+     * the given value.
+     */
+    @Test
+    public void testRemoveAllExact_failureBranch() throws Exception {
+        List<DataRow> rows = insertBulk(100);
+
+        List<DataRow> notExactRows = IntStream.range(0, 100)
+            .mapToObj(i -> dataRow(KEY + i, VALUE + (i + 1)))
+            .collect(Collectors.toList());
+
+        Collection<DataRow> removed = storage.removeAllExact(notExactRows);
+
+        assertEquals(0, removed.size());
+
+        rows.forEach(this::checkHasSameEntry);
+    }
+
+    /**
+     * Inserts and returns a given amount of data rows with {@link #KEY}_i as a key and {@link #VALUE}_i as a value
+     * where i is an index of the data row.
+     * 
+     * @param numberOfEntries Amount of entries to insert.
+     * @return List of inserted rows.
+     */
+    private List<DataRow> insertBulk(int numberOfEntries) {
+        List<DataRow> rows = IntStream.range(0, numberOfEntries)
+            .mapToObj(i -> dataRow(KEY + "_" + i, VALUE + "_" + i))
+            .collect(Collectors.toList());
+
+        storage.insertAll(rows);
+        rows.forEach(this::checkHasSameEntry);
+
+        // Clone key and value byte arrays so that returned rows have new references.
+        // This way we check that the ConcurrentHashMapStorage performs an actual array comparison.
+        return rows.stream().map(row -> {
+            byte[] valueBytes = row.valueBytes();
+
+            assert valueBytes != null;
+
+            return new SimpleDataRow(row.keyBytes().clone(), valueBytes.clone());
+        }).collect(Collectors.toList());
+    }
+
+    /**
+     * Checks that the storage has no value for the given search row.
+     *
+     * @param row Search row.
+     */
+    private void checkHasNoEntry(SearchRow row) {
+        DataRow read = storage.read(row);
+
+        assertFalse(read.hasValueBytes());
+    }
+
+    /**
+     * Checks that the storage contains a row with a given key but with a different value.
+     *
+     * @param row Data row.
+     */
+    private void checkHasDifferentEntry(DataRow row) {
+        DataRow read = storage.read(row);
+
+        assertFalse(Arrays.equals(row.valueBytes(), read.valueBytes()));
+    }
+
+    /**
+     * Checks that the storage contains a specific data row.
+     *
+     * @param row Expected data row.
+     */
+    private void checkHasSameEntry(DataRow row) {
+        DataRow read = storage.read(row);
+
+        checkRowsEqual(row, read);
+    }
+
+    /**
+     * Checks that two rows are equal.
+     *
+     * @param expected Expected data row.
+     * @param actual Actual data row.
+     */
+    private void checkRowsEqual(DataRow expected, DataRow actual) {
+        assertArrayEquals(expected.keyBytes(), actual.keyBytes());
+        assertArrayEquals(expected.valueBytes(), actual.valueBytes());
+    }
+
+    /**
+     * Wraps string key into a search row.
+     *
+     * @param key String key.
+     * @return Search row.
+     */
+    private SearchRow searchRow(String key) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            null
+        );
+    }
+
+    /**
+     * Wraps string key/value pair into a data row.
+     *
+     * @param key String key.
+     * @param value String value.
+     * @return Data row.
+     */
+    private DataRow dataRow(String key, String value) {
+        return new SimpleDataRow(
+            key.getBytes(StandardCharsets.UTF_8),
+            value.getBytes(StandardCharsets.UTF_8)
+        );
+    }
+
+    /**
      * Converts cursor to list.
      *
      * @param cursor Cursor.
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
index e05cc76..fbb3652 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorage.java
@@ -19,12 +19,15 @@ package org.apache.ignite.internal.storage.rocksdb;
 
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.storage.DataRow;
 import org.apache.ignite.internal.storage.InvokeClosure;
 import org.apache.ignite.internal.storage.SearchRow;
@@ -32,19 +35,23 @@ import org.apache.ignite.internal.storage.Storage;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.basic.SimpleDataRow;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 import org.rocksdb.AbstractComparator;
 import org.rocksdb.ComparatorOptions;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
 
 /**
  * Storage implementation based on a single RocksDB instance.
  */
-public class RocksDbStorage implements Storage, AutoCloseable {
+public class RocksDbStorage implements Storage {
     static {
         RocksDB.loadLibrary();
     }
@@ -61,9 +68,6 @@ public class RocksDbStorage implements Storage, AutoCloseable {
     /** RocksDb instance. */
     private final RocksDB db;
 
-    /** RW lock. */
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
     /**
      * @param dbPath Path to the folder to store data.
      * @param comparator Keys comparator.
@@ -94,7 +98,12 @@ public class RocksDbStorage implements Storage, AutoCloseable {
             this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString());
         }
         catch (RocksDBException e) {
-            close();
+            try {
+                close();
+            }
+            catch (Exception ex) {
+                e.addSuppressed(ex);
+            }
 
             throw new StorageException("Failed to start the storage", e);
         }
@@ -113,39 +122,160 @@ public class RocksDbStorage implements Storage, AutoCloseable {
     }
 
     /** {@inheritDoc} */
-    @Override public void write(DataRow row) throws StorageException {
-        rwLock.readLock().lock();
+    @Override public Collection<DataRow> readAll(Collection<? extends SearchRow> keys) throws StorageException {
+        List<DataRow> res = new ArrayList<>();
+
+        try {
+            List<byte[]> keysList = keys.stream().map(SearchRow::keyBytes).collect(Collectors.toList());
+
+            List<byte[]> values = db.multiGetAsList(keysList);
+
+            assert keys.size() == values.size();
+
+            for (int i = 0; i < keysList.size(); i++) {
+                byte[] key = keysList.get(i);
 
+                res.add(new SimpleDataRow(key, values.get(i)));
+            }
+
+            return res;
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to read data from the storage", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(DataRow row) throws StorageException {
         try {
             db.put(row.keyBytes(), row.valueBytes());
         }
         catch (RocksDBException e) {
             throw new StorageException("Filed to write data to the storage", e);
         }
-        finally {
-            rwLock.readLock().unlock();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeAll(Collection<? extends DataRow> rows) throws StorageException {
+        try (WriteBatch batch = new WriteBatch();
+             WriteOptions opts = new WriteOptions()) {
+            for (DataRow row : rows)
+                batch.put(row.keyBytes(), row.valueBytes());
+
+            db.write(opts, batch);
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
         }
     }
 
     /** {@inheritDoc} */
-    @Override public void remove(SearchRow key) throws StorageException {
-        rwLock.readLock().lock();
+    @Override public Collection<DataRow> insertAll(Collection<? extends DataRow> rows) throws StorageException {
+        List<DataRow> cantInsert = new ArrayList<>();
 
+        try (WriteBatch batch = new WriteBatch();
+             WriteOptions opts = new WriteOptions()) {
+
+            for (DataRow row : rows) {
+                if (db.get(row.keyBytes()) == null)
+                    batch.put(row.keyBytes(), row.valueBytes());
+                else
+                    cantInsert.add(row);
+            }
+
+            db.write(opts, batch);
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Filed to write data to the storage", e);
+        }
+
+        return cantInsert;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove(SearchRow key) throws StorageException {
         try {
             db.delete(key.keyBytes());
         }
         catch (RocksDBException e) {
             throw new StorageException("Failed to remove data from the storage", e);
         }
-        finally {
-            rwLock.readLock().unlock();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<DataRow> removeAll(Collection<? extends SearchRow> keys) {
+        List<DataRow> res = new ArrayList<>();
+
+        try (WriteBatch batch = new WriteBatch();
+             WriteOptions opts = new WriteOptions()) {
+
+            for (SearchRow key : keys) {
+                byte[] keyBytes = key.keyBytes();
+
+                byte[] value = db.get(keyBytes);
+
+                if (value != null) {
+                    res.add(new SimpleDataRow(keyBytes, value));
+
+                    batch.delete(keyBytes);
+                }
+            }
+
+            db.write(opts, batch);
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
         }
+
+        return res;
     }
 
     /** {@inheritDoc} */
-    @Override public void invoke(SearchRow key, InvokeClosure clo) throws StorageException {
-        rwLock.writeLock().lock();
+    @Override public Collection<DataRow> removeAllExact(Collection<? extends DataRow> keyValues) {
+        List<DataRow> res = new ArrayList<>();
+
+        try (WriteBatch batch = new WriteBatch();
+             WriteOptions opts = new WriteOptions()) {
+
+            List<byte[]> keys = new ArrayList<>();
+            List<byte[]> expectedValues = new ArrayList<>();
 
+            for (DataRow keyValue : keyValues) {
+                byte[] keyBytes = keyValue.keyBytes();
+                byte[] valueBytes = keyValue.valueBytes();
+
+                keys.add(keyBytes);
+                expectedValues.add(valueBytes);
+            }
+
+            List<byte[]> values = db.multiGetAsList(keys);
+
+            assert values.size() == expectedValues.size();
+
+            for (int i = 0; i < keys.size(); i++) {
+                byte[] key = keys.get(i);
+                byte[] expectedValue = expectedValues.get(i);
+                byte[] value = values.get(i);
+
+                if (Arrays.equals(value, expectedValue)) {
+                    res.add(new SimpleDataRow(key, value));
+
+                    batch.delete(key);
+                }
+            }
+
+            db.write(opts, batch);
+        }
+        catch (RocksDBException e) {
+            throw new StorageException("Failed to remove data from the storage", e);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable
+    @Override public <T> T invoke(SearchRow key, InvokeClosure<T> clo) throws StorageException {
         try {
             byte[] keyBytes = key.keyBytes();
             byte[] existingDataBytes = db.get(keyBytes);
@@ -166,13 +296,12 @@ public class RocksDbStorage implements Storage, AutoCloseable {
                 case NOOP:
                     break;
             }
+
+            return clo.result();
         }
         catch (RocksDBException e) {
             throw new StorageException("Failed to access data in the storage", e);
         }
-        finally {
-            rwLock.writeLock().unlock();
-        }
     }
 
     /** {@inheritDoc} */
@@ -181,10 +310,8 @@ public class RocksDbStorage implements Storage, AutoCloseable {
     }
 
     /** {@inheritDoc} */
-    @Override public void close() {
-        try (comparatorOptions; comparator; options) {
-            db.close();
-        }
+    @Override public void close() throws Exception {
+        IgniteUtils.closeAll(comparatorOptions, comparator, options, db);
     }
 
     /** Cusror wrapper over the RocksIterator object with custom filter. */
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index 6f9f25e..d005e3b 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -83,6 +83,16 @@
             <artifactId>ignite-metastorage-client</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-storage-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-storage-rocksdb</artifactId>
+        </dependency>
+
         <!-- Test dependencies -->
         <dependency>
             <groupId>org.apache.ignite</groupId>
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index cd57072..6857db8 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.distributed;
 
+import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -38,6 +39,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.command.GetCommand;
 import org.apache.ignite.internal.table.distributed.command.InsertCommand;
@@ -169,7 +171,11 @@ public class ITDistributedTableTest {
 
         List<Peer> conf = List.of(new Peer(cluster.get(0).topologyService().localMember().address()));
 
-        partSrv.startRaftGroup(grpId, new PartitionListener(), conf);
+        partSrv.startRaftGroup(
+            grpId,
+            new PartitionListener(new RocksDbStorage(dataPath.resolve("db"), ByteBuffer::compareTo)),
+            conf
+        );
 
         RaftGroupService partRaftGrp = new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000, conf, true, 200);
 
@@ -252,7 +258,11 @@ public class ITDistributedTableTest {
 
             List<Peer> conf = List.of(new Peer(partNodes.get(0).address()));
 
-            rs.startRaftGroup(grpId, new PartitionListener(), conf);
+            rs.startRaftGroup(
+                grpId,
+                new PartitionListener(new RocksDbStorage(dataPath.resolve("part" + p), ByteBuffer::compareTo)),
+                conf
+            );
 
             partMap.put(p, new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000, conf, true, 200));
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4485a6d..c99dd43 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -17,7 +17,11 @@
 
 package org.apache.ignite.internal.table.distributed;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -58,6 +62,7 @@ import org.apache.ignite.internal.schema.SchemaModificationException;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.event.SchemaEvent;
 import org.apache.ignite.internal.schema.event.SchemaEventParameters;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
@@ -69,6 +74,7 @@ import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.lang.LoggerMessageHelper;
 import org.apache.ignite.network.ClusterNode;
@@ -107,6 +113,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     /** Affinity manager. */
     private final AffinityManager affMgr;
 
+    /** Partitions store directory. */
+    private final Path partitionsStoreDir;
+
     /** Tables. */
     private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
@@ -121,19 +130,22 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @param schemaMgr Schema manager.
      * @param affMgr Affinity manager.
      * @param raftMgr Raft manager.
+     * @param partitionsStoreDir Partitions store directory.
      */
     public TableManager(
         ConfigurationManager configurationMgr,
         MetaStorageManager metaStorageMgr,
         SchemaManager schemaMgr,
         AffinityManager affMgr,
-        Loza raftMgr
+        Loza raftMgr,
+        Path partitionsStoreDir
     ) {
         this.configurationMgr = configurationMgr;
         this.metaStorageMgr = metaStorageMgr;
         this.affMgr = affMgr;
         this.raftMgr = raftMgr;
         this.schemaMgr = schemaMgr;
+        this.partitionsStoreDir = partitionsStoreDir;
     }
 
     /** {@inheritDoc} */
@@ -167,11 +179,27 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
 
+        Path storageDir = partitionsStoreDir.resolve(name);
+
+        try {
+            Files.createDirectories(storageDir);
+        } catch (IOException e) {
+            throw new IgniteInternalException(
+                "Failed to create partitions store directory for " + name + ": " + e.getMessage(),
+                e
+            );
+        }
+
         for (int p = 0; p < partitions; p++) {
+            RocksDbStorage storage = new RocksDbStorage(
+                storageDir.resolve(String.valueOf(p)),
+                ByteBuffer::compareTo
+            );
+
             partitionMap.put(p, raftMgr.prepareRaftGroup(
                 raftGroupName(tblId, p),
                 assignment.get(p),
-                new PartitionListener()
+                new PartitionListener(storage)
             ));
         }
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
index 298484f..8f6d662 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
@@ -68,6 +68,7 @@ public class ReplaceCommand implements WriteCommand {
      *
      * @return Binary row.
      */
+    @NotNull
     public BinaryRow getRow() {
         if (row == null)
             row = new ByteBufferRow(rowBytes);
@@ -80,6 +81,7 @@ public class ReplaceCommand implements WriteCommand {
      *
      * @return Binary row.
      */
+    @NotNull
     public BinaryRow getOldRow() {
         if (oldRow == null)
             oldRow = new ByteBufferRow(oldRowBytes);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 4f34d0d..dd37cb9 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -18,14 +18,22 @@
 package org.apache.ignite.internal.table.distributed.raft;
 
 import java.nio.file.Path;
-import java.util.Arrays;
 import java.util.Iterator;
-import java.util.Objects;
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.storage.DataRow;
+import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.storage.Storage;
+import org.apache.ignite.internal.storage.basic.DeleteExactInvokeClosure;
+import org.apache.ignite.internal.storage.basic.GetAndRemoveInvokeClosure;
+import org.apache.ignite.internal.storage.basic.GetAndReplaceInvokeClosure;
+import org.apache.ignite.internal.storage.basic.InsertInvokeClosure;
+import org.apache.ignite.internal.storage.basic.ReplaceExactInvokeClosure;
+import org.apache.ignite.internal.storage.basic.SimpleDataRow;
 import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
 import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
 import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
@@ -43,6 +51,7 @@ import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
 import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
 import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.raft.client.ReadCommand;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.apache.ignite.raft.client.service.CommandClosure;
@@ -53,12 +62,17 @@ import org.jetbrains.annotations.NotNull;
  * Partition command handler.
  */
 public class PartitionListener implements RaftGroupListener {
+    /** Partition storage. */
+    private final Storage storage;
+
     /**
-     * Storage.
-     * This is a temporary solution, it will apply until persistence layer would not be implemented.
-     * TODO: IGNITE-14790.
+     * Constructor.
+     *
+     * @param storage Storage.
      */
-    private ConcurrentHashMap<KeyWrapper, BinaryRow> storage = new ConcurrentHashMap<>();
+    public PartitionListener(Storage storage) {
+        this.storage = storage;
+    }
 
     /** {@inheritDoc} */
     @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
@@ -66,21 +80,29 @@ public class PartitionListener implements RaftGroupListener {
             CommandClosure<ReadCommand> clo = iterator.next();
 
             if (clo.command() instanceof GetCommand) {
-                clo.result(new SingleRowResponse(storage.get(
-                    extractAndWrapKey(((GetCommand)clo.command()).getKeyRow())
-                )));
+                DataRow readValue = storage.read(extractAndWrapKey(((GetCommand) clo.command()).getKeyRow()));
+
+                ByteBufferRow responseRow = null;
+
+                if (readValue.hasValueBytes())
+                    responseRow = new ByteBufferRow(readValue.valueBytes());
+
+                clo.result(new SingleRowResponse(responseRow));
             }
             else if (clo.command() instanceof GetAllCommand) {
                 Set<BinaryRow> keyRows = ((GetAllCommand)clo.command()).getKeyRows();
 
                 assert keyRows != null && !keyRows.isEmpty();
 
-                final Set<BinaryRow> res = keyRows.stream()
-                    .map(this::extractAndWrapKey)
-                    .map(storage::get)
-                    .filter(Objects::nonNull)
-                    .filter(BinaryRow::hasValue)
-                    .collect(Collectors.toSet());
+                List<SearchRow> keys = keyRows.stream().map(PartitionListener::extractAndWrapKey)
+                    .collect(Collectors.toList());
+
+                List<BinaryRow> res = storage
+                    .readAll(keys)
+                    .stream()
+                    .filter(DataRow::hasValueBytes)
+                    .map(read -> new ByteBufferRow(read.valueBytes()))
+                    .collect(Collectors.toList());
 
                 clo.result(new MultiRowsResponse(res));
             }
@@ -99,41 +121,41 @@ public class PartitionListener implements RaftGroupListener {
 
                 assert row.hasValue() : "Insert command should have a value.";
 
-                BinaryRow previous = storage.putIfAbsent(extractAndWrapKey(row), row);
+                DataRow newRow = extractAndWrapKeyValue(row);
+
+                InsertInvokeClosure writeIfAbsent = new InsertInvokeClosure(newRow);
 
-                clo.result(previous == null);
+                storage.invoke(newRow, writeIfAbsent);
+
+                clo.result(writeIfAbsent.result());
             }
             else if (clo.command() instanceof DeleteCommand) {
-                BinaryRow deleted = storage.remove(
-                    extractAndWrapKey(((DeleteCommand)clo.command()).getKeyRow())
-                );
+                SearchRow newRow = extractAndWrapKey(((DeleteCommand)clo.command()).getKeyRow());
+
+                var getAndRemoveClosure = new GetAndRemoveInvokeClosure();
+
+                storage.invoke(newRow, getAndRemoveClosure);
 
-                clo.result(deleted != null);
+                clo.result(getAndRemoveClosure.result());
             }
             else if (clo.command() instanceof ReplaceCommand) {
                 ReplaceCommand cmd = ((ReplaceCommand)clo.command());
 
-                BinaryRow expected = cmd.getOldRow();
+                DataRow expected = extractAndWrapKeyValue(cmd.getOldRow());
+                DataRow newRow = extractAndWrapKeyValue(cmd.getRow());
 
-                KeyWrapper key = extractAndWrapKey(expected);
+                var replaceClosure = new ReplaceExactInvokeClosure(expected, newRow);
 
-                BinaryRow current = storage.get(key);
+                storage.invoke(expected, replaceClosure);
 
-                if ((current == null && !expected.hasValue()) ||
-                    equalValues(current, expected)) {
-                    storage.put(key, cmd.getRow());
-
-                    clo.result(true);
-                }
-                else
-                    clo.result(false);
+                clo.result(replaceClosure.result());
             }
             else if (clo.command() instanceof UpsertCommand) {
                 BinaryRow row = ((UpsertCommand)clo.command()).getRow();
 
                 assert row.hasValue() : "Upsert command should have a value.";
 
-                storage.put(extractAndWrapKey(row), row);
+                storage.write(extractAndWrapKeyValue(row));
 
                 clo.result(null);
             }
@@ -142,11 +164,14 @@ public class PartitionListener implements RaftGroupListener {
 
                 assert rows != null && !rows.isEmpty();
 
-                final Set<BinaryRow> res = rows.stream()
-                    .map(k -> storage.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
-                    .filter(Objects::nonNull)
+                List<DataRow> keyValues = rows.stream().map(PartitionListener::extractAndWrapKeyValue)
+                    .collect(Collectors.toList());
+
+                List<BinaryRow> res = storage.insertAll(keyValues).stream()
+                    .filter(DataRow::hasValueBytes)
+                    .map(inserted -> new ByteBufferRow(inserted.valueBytes()))
                     .filter(BinaryRow::hasValue)
-                    .collect(Collectors.toSet());
+                    .collect(Collectors.toList());
 
                 clo.result(new MultiRowsResponse(res));
             }
@@ -155,7 +180,7 @@ public class PartitionListener implements RaftGroupListener {
 
                 assert rows != null && !rows.isEmpty();
 
-                rows.forEach(k -> storage.put(extractAndWrapKey(k), k));
+                storage.writeAll(rows.stream().map(PartitionListener::extractAndWrapKeyValue).collect(Collectors.toList()));
 
                 clo.result(null);
             }
@@ -164,16 +189,14 @@ public class PartitionListener implements RaftGroupListener {
 
                 assert rows != null && !rows.isEmpty();
 
-                final Set<BinaryRow> res = rows.stream()
-                    .map(k -> {
-                        if (k.hasValue())
-                            return null;
-                        else
-                            return storage.remove(extractAndWrapKey(k));
-                    })
-                    .filter(Objects::nonNull)
+                List<SearchRow> keys = rows.stream().map(PartitionListener::extractAndWrapKey)
+                    .collect(Collectors.toList());
+
+                List<BinaryRow> res = storage.removeAll(keys).stream()
+                    .filter(DataRow::hasValueBytes)
+                    .map(removed -> new ByteBufferRow(removed.valueBytes()))
                     .filter(BinaryRow::hasValue)
-                    .collect(Collectors.toSet());
+                    .collect(Collectors.toList());
 
                 clo.result(new MultiRowsResponse(res));
             }
@@ -183,32 +206,27 @@ public class PartitionListener implements RaftGroupListener {
                 assert row != null;
                 assert row.hasValue();
 
-                final KeyWrapper key = extractAndWrapKey(row);
-                final BinaryRow old = storage.get(key);
+                DataRow keyValue = extractAndWrapKeyValue(row);
 
-                if (old == null || !old.hasValue())
-                    clo.result(false);
-                else
-                    clo.result(equalValues(row, old) && storage.remove(key) != null);
+                var deleteExact = new DeleteExactInvokeClosure(keyValue);
+
+                storage.invoke(keyValue, deleteExact);
+
+                clo.result(deleteExact.result());
             }
             else if (clo.command() instanceof DeleteExactAllCommand) {
                 Set<BinaryRow> rows = ((DeleteExactAllCommand)clo.command()).getRows();
 
                 assert rows != null && !rows.isEmpty();
 
-                final Set<BinaryRow> res = rows.stream()
-                    .map(k -> {
-                        final KeyWrapper key = extractAndWrapKey(k);
-                        final BinaryRow old = storage.get(key);
-
-                        if (old == null || !old.hasValue() || !equalValues(k, old))
-                            return null;
+                List<DataRow> keyValues = rows.stream().map(PartitionListener::extractAndWrapKeyValue)
+                    .collect(Collectors.toList());
 
-                        return storage.remove(key);
-                    })
-                    .filter(Objects::nonNull)
+                List<BinaryRow> res = storage.removeAllExact(keyValues).stream()
+                    .filter(DataRow::hasValueBytes)
+                    .map(inserted -> new ByteBufferRow(inserted.valueBytes()))
                     .filter(BinaryRow::hasValue)
-                    .collect(Collectors.toSet());
+                    .collect(Collectors.toList());
 
                 clo.result(new MultiRowsResponse(res));
             }
@@ -217,51 +235,64 @@ public class PartitionListener implements RaftGroupListener {
 
                 assert row != null;
 
-                final KeyWrapper key = extractAndWrapKey(row);
-                final BinaryRow oldRow = storage.get(key);
+                DataRow keyValue = extractAndWrapKeyValue(row);
 
-                if (oldRow == null || !oldRow.hasValue())
-                    clo.result(false);
-                else
-                    clo.result(storage.put(key, row) == oldRow);
+                var replaceIfExists = new GetAndReplaceInvokeClosure(keyValue, true);
+
+                storage.invoke(keyValue, replaceIfExists);
+
+                clo.result(replaceIfExists.result());
             }
             else if (clo.command() instanceof GetAndDeleteCommand) {
                 BinaryRow row = ((GetAndDeleteCommand)clo.command()).getKeyRow();
 
                 assert row != null;
 
-                BinaryRow oldRow = storage.remove(extractAndWrapKey(row));
+                SearchRow keyRow = extractAndWrapKey(row);
 
-                if (oldRow == null || !oldRow.hasValue())
-                    clo.result(new SingleRowResponse(null));
+                var getAndRemoveClosure = new GetAndRemoveInvokeClosure();
+
+                storage.invoke(keyRow, getAndRemoveClosure);
+
+                if (getAndRemoveClosure.result())
+                    clo.result(new SingleRowResponse(new ByteBufferRow(getAndRemoveClosure.oldRow().valueBytes())));
                 else
-                    clo.result(new SingleRowResponse(oldRow));
+                    clo.result(new SingleRowResponse(null));
             }
             else if (clo.command() instanceof GetAndReplaceCommand) {
                 BinaryRow row = ((GetAndReplaceCommand)clo.command()).getRow();
 
                 assert row != null && row.hasValue();
 
-                BinaryRow oldRow = storage.get(extractAndWrapKey(row));
+                DataRow keyValue = extractAndWrapKeyValue(row);
 
-                storage.computeIfPresent(extractAndWrapKey(row), (key, val) -> row);
+                var getAndReplace = new GetAndReplaceInvokeClosure(keyValue, true);
 
-                if (oldRow == null || !oldRow.hasValue())
-                    clo.result(new SingleRowResponse(null));
-                else
-                    clo.result(new SingleRowResponse(oldRow));
+                storage.invoke(keyValue, getAndReplace);
+
+                DataRow oldRow = getAndReplace.oldRow();
+
+                BinaryRow res = oldRow.hasValueBytes() ? new ByteBufferRow(oldRow.valueBytes()) : null;
+
+                clo.result(new SingleRowResponse(res));
             }
             else if (clo.command() instanceof GetAndUpsertCommand) {
                 BinaryRow row = ((GetAndUpsertCommand)clo.command()).getKeyRow();
 
                 assert row != null && row.hasValue();
 
-                BinaryRow oldRow = storage.put(extractAndWrapKey(row), row);
+                DataRow keyValue = extractAndWrapKeyValue(row);
 
-                if (oldRow == null || !oldRow.hasValue())
-                    clo.result(new SingleRowResponse(null));
+                var getAndReplace = new GetAndReplaceInvokeClosure(keyValue, false);
+
+                storage.invoke(keyValue, getAndReplace);
+
+                DataRow oldRow = getAndReplace.oldRow();
+
+                if (oldRow.hasValueBytes())
+                    clo.result(new SingleRowResponse(new ByteBufferRow(oldRow.valueBytes())));
                 else
-                    clo.result(new SingleRowResponse(oldRow));
+                    clo.result(new SingleRowResponse(null));
             }
             else
                 assert false : "Command was not found [cmd=" + clo.command() + ']';
@@ -281,79 +312,37 @@ public class PartitionListener implements RaftGroupListener {
 
     /** {@inheritDoc} */
     @Override public void onShutdown() {
-        // No-op.
-    }
-
-    /**
-     * Wrapper provides correct byte[] comparison.
-     */
-    private static class KeyWrapper {
-        /** Data. */
-        private final byte[] data;
-
-        /** Hash. */
-        private final int hash;
-
-        /**
-         * Constructor.
-         *
-         * @param data Wrapped data.
-         */
-        KeyWrapper(byte[] data, int hash) {
-            assert data != null;
-
-            this.data = data;
-            this.hash = hash;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            KeyWrapper wrapper = (KeyWrapper)o;
-            return Arrays.equals(data, wrapper.data);
+        try {
+            storage.close();
         }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return hash;
+        catch (Exception e) {
+            throw new IgniteInternalException("Failed to close storage: " + e.getMessage(), e);
         }
     }
 
     /**
-     * Compares two rows.
+     * Extracts a key and a value from the {@link BinaryRow} and wraps it in a {@link DataRow}.
      *
-     * @param row Row to compare.
-     * @param row2 Row to compare.
-     * @return True if these rows is equivalent, false otherwise.
+     * @param row Binary row.
+     * @return Data row.
      */
-    private boolean equalValues(BinaryRow row, BinaryRow row2) {
-        if (row == row2)
-            return true;
-
-        if (row == null || row2 == null)
-            return false;
-
-        if (row.hasValue() ^ row2.hasValue())
-            return false;
+    @NotNull private static DataRow extractAndWrapKeyValue(@NotNull BinaryRow row) {
+        byte[] key = new byte[row.keySlice().capacity()];
+        row.keySlice().get(key);
 
-        return row.valueSlice().compareTo(row2.valueSlice()) == 0;
+        return new SimpleDataRow(key, row.bytes());
     }
 
     /**
-     * Makes a wrapped key from a table row.
+     * Extracts a key from the {@link BinaryRow} and wraps it in a {@link SearchRow}.
      *
-     * @param row Row.
-     * @return Extracted key.
+     * @param row Binary row.
+     * @return Search row.
      */
-    @NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
-        final byte[] bytes = new byte[row.keySlice().capacity()];
-        row.keySlice().get(bytes);
+    @NotNull private static SearchRow extractAndWrapKey(@NotNull BinaryRow row) {
+        byte[] key = new byte[row.keySlice().capacity()];
+        row.keySlice().get(key);
 
-        return new KeyWrapper(bytes, row.hash());
+        return new SimpleDataRow(key, null);
     }
 }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index bb0d1b5..0676f11 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.table;
 
 import java.lang.reflect.Method;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -51,6 +52,8 @@ import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConver
 import org.apache.ignite.internal.schema.event.SchemaEvent;
 import org.apache.ignite.internal.schema.event.SchemaEventParameters;
 import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.ByteArray;
@@ -91,7 +94,7 @@ import static org.mockito.Mockito.when;
 /**
  * Tests scenarios for table manager.
  */
-@ExtendWith(MockitoExtension.class)
+@ExtendWith({MockitoExtension.class, WorkDirectoryExtension.class})
 @MockitoSettings(strictness = Strictness.LENIENT)
 public class TableManagerTest {
     /** The logger. */
@@ -137,6 +140,9 @@ public class TableManagerTest {
     @Mock(lenient = true)
     private Loza rm;
 
+    @WorkDirectory
+    private Path workDir;
+
     /** Test node. */
     private final ClusterNode node = new ClusterNode(
         UUID.randomUUID().toString(),
@@ -220,7 +226,7 @@ public class TableManagerTest {
     @Disabled("https://issues.apache.org/jira/browse/IGNITE-14578")
     @Test
     public void testStaticTableConfigured() {
-        TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm);
+        TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm, workDir);
 
         assertEquals(1, tableManager.tables().size());
 
@@ -453,7 +459,7 @@ public class TableManagerTest {
             return null;
         }).when(am).listen(same(AffinityEvent.CALCULATED), any());
 
-        TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm);
+        TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm, workDir);
 
         TableImpl tbl2;
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 5050f76..d861eea 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table.distributed.raft;
 
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
@@ -30,6 +32,7 @@ import org.apache.ignite.internal.schema.NativeTypes;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.storage.rocksdb.RocksDbStorage;
 import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
 import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
 import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
@@ -47,11 +50,14 @@ import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
 import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
 import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.raft.client.Command;
 import org.apache.ignite.raft.client.service.CommandClosure;
 import org.jetbrains.annotations.NotNull;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -63,13 +69,17 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * There are a tests for a table command listener.
- * All rows should be removed before returning form each test.
+ * Tests for the table command listener.
  */
+@ExtendWith(WorkDirectoryExtension.class)
 public class PartitionCommandListenerTest {
     /** Key count. */
     public static final int KEY_COUNT = 100;
 
+    /** Work directory. */
+    @WorkDirectory
+    private Path dataPath;
+
     /** Schema. */
     public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
         1,
@@ -78,19 +88,18 @@ public class PartitionCommandListenerTest {
     );
 
     /** Table command listener. */
-    private static PartitionListener commandListener;
+    private PartitionListener commandListener;
 
     /**
      * Initializes a table listener before tests.
      */
-    @BeforeAll
-    public static void before() {
-        commandListener = new PartitionListener();
+    @BeforeEach
+    public void before() {
+        commandListener = new PartitionListener(new RocksDbStorage(dataPath.resolve("db"), ByteBuffer::compareTo));
     }
 
     /**
      * Inserts rows and checks them.
-     * All rows are removed before returning.
      */
     @Test
     public void testInsertCommands() {
@@ -109,7 +118,6 @@ public class PartitionCommandListenerTest {
 
     /**
      * Upserts rows and checks them.
-     * All rows are removed before returning.
      */
     @Test
     public void testUpsertValues() {
@@ -126,7 +134,6 @@ public class PartitionCommandListenerTest {
 
     /**
      * Adds rows, replaces and checks them.
-     * All rows are removed before returning.
      */
     @Test
     public void testReplaceCommand() {
@@ -149,7 +156,6 @@ public class PartitionCommandListenerTest {
 
     /**
      * The test checks PutIfExist command.
-     * All rows are removed before returning.
      */
     @Test
     public void testPutIfExistCommand() {
@@ -172,7 +178,6 @@ public class PartitionCommandListenerTest {
 
     /**
      * The test checks GetAndReplace command.
-     * All rows are removed before returning.
      */
     @Test
     public void testGetAndReplaceCommand() {
@@ -201,7 +206,6 @@ public class PartitionCommandListenerTest {
 
     /**
      * The test checks a batch upsert command.
-     * All rows are removed before returning.
      */
     @Test
     public void testUpsertRowsBatchedAndCheck() {
@@ -220,7 +224,6 @@ public class PartitionCommandListenerTest {
 
     /**
      * The test checks a batch insert command.
-     * All rows are removed before returning.
      */
     @Test
     public void testInsertRowsBatchedAndCheck() {