You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/06/30 08:10:09 UTC

[ignite-3] branch main updated: IGNITE-17198 Minimal implementation of pure in-memory storage (with in-memory RAFT)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new fa6d23e8d IGNITE-17198 Minimal implementation of pure in-memory storage (with in-memory RAFT)
fa6d23e8d is described below

commit fa6d23e8d62b6bf076f63f0a5a9998368833bb84
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Jun 30 12:10:04 2022 +0400

    IGNITE-17198 Minimal implementation of pure in-memory storage (with in-memory RAFT)
---
 .../management/raft/ItCmgRaftServiceTest.java      |   4 +-
 .../management/ClusterManagementGroupManager.java  |  16 +-
 .../client/ItMetaStorageRaftGroupTest.java         |   7 +-
 .../client/ItMetaStorageServiceTest.java           |   3 +-
 .../internal/metastorage/MetaStorageManager.java   |   4 +-
 .../apache/ignite/internal/raft/ItLozaTest.java    |   3 +-
 .../internal/raft/ItRaftGroupServiceTest.java      |   4 +-
 .../service/ItAbstractListenerSnapshotTest.java    |   4 +-
 .../raft/server/ItJraftCounterServerTest.java      |  23 +--
 .../raft/server/ItSimpleCounterServerTest.java     |   9 +-
 .../java/org/apache/ignite/internal/raft/Loza.java |  39 ++--
 .../internal/raft/server/RaftGroupOptions.java     |  78 ++++++++
 .../ignite/internal/raft/server/RaftServer.java    |  13 +-
 .../internal/raft/server/impl/JraftServerImpl.java |  30 ++-
 .../ignite/raft/jraft/JRaftServiceFactory.java     |  10 +-
 .../jraft/core/DefaultJRaftServiceFactory.java     |  12 --
 ...ctory.java => VolatileJRaftServiceFactory.java} |  42 ++---
 .../raft/jraft/storage/VolatileStorage.java}       |  13 +-
 .../raft/jraft/storage/impl/LocalLogStorage.java   |   5 +-
 .../storage/impl/VolatileRaftMetaStorage.java      |  75 ++++++++
 .../org/apache/ignite/internal/raft/LozaTest.java  |  11 +-
 .../internal/raft/server/impl/RaftServerImpl.java  |  13 +-
 .../raft/jraft/core/TestJRaftServiceFactory.java   |   2 +-
 ...y.java => VolatileJRaftServiceFactoryTest.java} |  27 ++-
 .../jraft/storage/impl/LocalLogStorageTest.java    |   2 +-
 .../raft/jraft/storage/impl/LogManagerTest.java    |   2 +-
 .../storage/impl/VolatileRaftMetaStorageTest.java  |  90 +++++++++
 modules/runner/pom.xml                             |   6 +
 .../internal/AbstractClusterIntegrationTest.java   |   9 +-
 .../ignite/internal/compute/ItComputeTest.java     |   7 -
 .../inmemory/ItRaftStorageVolatilenessTest.java    | 209 +++++++++++++++++++++
 .../sql/engine/exec/MockedStructuresTest.java      |   4 +-
 .../internal/storage/engine/TableStorage.java      |   7 +
 .../chm/TestConcurrentHashMapTableStorage.java     |   5 +
 .../pagememory/AbstractPageMemoryTableStorage.java |  60 +++++-
 .../PersistentPageMemoryTableStorage.java          |   5 +
 .../pagememory/VolatilePageMemoryTableStorage.java |   5 +
 .../storage/rocksdb/RocksDbTableStorage.java       |   5 +
 .../storage/rocksdb/RocksDbTableStorageTest.java   |   5 +
 .../distributed/ItInternalTableScanTest.java       |   4 +-
 .../distributed/ItTxDistributedTestSingleNode.java |   4 +-
 .../internal/table/distributed/TableManager.java   |  18 +-
 .../table/distributed/TableManagerTest.java        |   6 +-
 parent/pom.xml                                     |  13 ++
 44 files changed, 777 insertions(+), 136 deletions(-)

diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 534566ad2..451ed3ea5 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.cluster.management.raft;
 
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
@@ -90,7 +91,8 @@ public class ItCmgRaftServiceTest {
             CompletableFuture<RaftGroupService> raftService = raftManager.prepareRaftGroup(
                     TEST_GROUP,
                     List.copyOf(clusterService.topologyService().allMembers()),
-                    () -> new CmgRaftGroupListener(raftStorage)
+                    () -> new CmgRaftGroupListener(raftStorage),
+                    defaults()
             );
 
             assertThat(raftService, willCompleteSuccessfully());
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index c26653f8b..522a0b9fd 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyComm
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.properties.IgniteProductVersion;
 import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -480,11 +481,16 @@ public class ClusterManagementGroupManager implements IgniteComponent {
 
         try {
             return raftManager
-                    .prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, () -> {
-                        clusterStateStorage.start();
-
-                        return new CmgRaftGroupListener(clusterStateStorage);
-                    })
+                    .prepareRaftGroup(
+                            CMG_RAFT_GROUP_NAME,
+                            nodes,
+                            () -> {
+                                clusterStateStorage.start();
+
+                                return new CmgRaftGroupListener(clusterStateStorage);
+                            },
+                            RaftGroupOptions.defaults()
+                    )
                     .thenApply(service -> new CmgRaftService(service, clusterService));
         } catch (NodeStoppingException e) {
             return failedFuture(e);
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
index f7a1712c3..74b94bc7c 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.metastorage.client;
 
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
 import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
 import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -324,11 +325,11 @@ public class ItMetaStorageRaftGroupTest {
 
         metaStorageRaftSrv3.start();
 
-        metaStorageRaftSrv1.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers);
+        metaStorageRaftSrv1.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers, defaults());
 
-        metaStorageRaftSrv2.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers);
+        metaStorageRaftSrv2.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers, defaults());
 
-        metaStorageRaftSrv3.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers);
+        metaStorageRaftSrv3.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers, defaults());
 
         metaStorageRaftGrpSvc1 = RaftGroupServiceImpl.start(
                 METASTORAGE_RAFT_GROUP_NAME,
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
index 95464010b..0139583d4 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
@@ -28,6 +28,7 @@ import static org.apache.ignite.internal.metastorage.client.ItMetaStorageService
 import static org.apache.ignite.internal.metastorage.client.Operations.ops;
 import static org.apache.ignite.internal.metastorage.client.Operations.put;
 import static org.apache.ignite.internal.metastorage.client.Operations.remove;
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
 import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
 import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -1063,7 +1064,7 @@ public class ItMetaStorageServiceTest {
 
         metaStorageRaftSrv.start();
 
-        metaStorageRaftSrv.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers);
+        metaStorageRaftSrv.startRaftGroup(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageListener(mockStorage), peers, defaults());
 
         metaStorageRaftGrpSvc = RaftGroupServiceImpl.start(
                 METASTORAGE_RAFT_GROUP_NAME,
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 39a65a356..c76cbe5c5 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.metastorage.watch.AggregatedWatch;
 import org.apache.ignite.internal.metastorage.watch.KeyCriterion;
 import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
 import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -177,7 +178,8 @@ public class MetaStorageManager implements IgniteComponent {
             CompletableFuture<RaftGroupService> raftServiceFuture = raftMgr.prepareRaftGroup(
                     METASTORAGE_RAFT_GROUP_NAME,
                     metastorageNodes,
-                    () -> new MetaStorageListener(storage)
+                    () -> new MetaStorageListener(storage),
+                    RaftGroupOptions.defaults()
             );
 
             return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id()));
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
index 13012af8c..1e26ea10c 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.raft;
 
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.mockito.ArgumentMatchers.any;
@@ -66,7 +67,7 @@ public class ItLozaTest {
      */
     private RaftGroupService startClient(String groupId, ClusterNode node, Loza loza) throws Exception {
         return loza.prepareRaftGroup(groupId,
-                List.of(node), () -> mock(RaftGroupListener.class)
+                List.of(node), () -> mock(RaftGroupListener.class), defaults()
         ).get(10, TimeUnit.SECONDS);
     }
 
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
index cb72190bc..735ce9a4b 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItRaftGroupServiceTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.raft;
 
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
 import static org.apache.ignite.raft.jraft.test.TestUtils.waitForCondition;
 import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
 import static org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
@@ -100,7 +101,8 @@ public class ItRaftGroupServiceTest {
             CompletableFuture<RaftGroupService> raftGroupServiceFuture = raftSrvs.get(i).prepareRaftGroup(
                     RAFT_GROUP_NAME,
                     nodes,
-                    () -> mock(RaftGroupListener.class)
+                    () -> mock(RaftGroupListener.class),
+                    defaults()
             );
 
             svcFutures[i] = raftGroupServiceFuture;
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
index 8066e5535..92038256d 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.raft.client.service;
 
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -386,7 +387,8 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener
         server.startRaftGroup(
                 raftGroupId(),
                 createListener(service, listenerPersistencePath),
-                INITIAL_CONF
+                INITIAL_CONF,
+                defaults()
         );
 
         return server;
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index cd36eb2e1..fae72c1a5 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.raft.server;
 import static java.util.Comparator.comparing;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
 import static org.apache.ignite.raft.jraft.core.State.STATE_ERROR;
 import static org.apache.ignite.raft.jraft.core.State.STATE_LEADER;
 import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
@@ -266,8 +267,8 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
     private void startCluster() throws Exception {
         for (int i = 0; i < 3; i++) {
             startServer(i, raftServer -> {
-                raftServer.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
-                raftServer.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
+                raftServer.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF, defaults());
+                raftServer.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF, defaults());
             }, opts -> {});
         }
 
@@ -281,7 +282,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
     @Test
     public void testDisruptorThreadsCount() {
         startServer(0, raftServer -> {
-            raftServer.startRaftGroup("test_raft_group", listenerFactory.get(), INITIAL_CONF);
+            raftServer.startRaftGroup("test_raft_group", listenerFactory.get(), INITIAL_CONF, defaults());
         }, opts -> {});
 
         Set<Thread> threads = getAllDisruptorCurrentThreads();
@@ -294,7 +295,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
 
         servers.forEach(srv -> {
             for (int i = 0; i < 10; i++) {
-                srv.startRaftGroup("test_raft_group_" + i, listenerFactory.get(), INITIAL_CONF);
+                srv.startRaftGroup("test_raft_group_" + i, listenerFactory.get(), INITIAL_CONF, defaults());
             }
         });
 
@@ -673,9 +674,9 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
                     @Override public void run() {
                         String grp = "counter" + finalI;
 
-                        srv0.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
-                        srv1.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
-                        srv2.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF);
+                        srv0.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF, defaults());
+                        srv1.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF, defaults());
+                        srv2.startRaftGroup(grp, listenerFactory.get(), INITIAL_CONF, defaults());
                     }
                 }));
             }
@@ -803,8 +804,8 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
         }
 
         var svc2 = startServer(stopIdx, r -> {
-            r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
-            r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
+            r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF, defaults());
+            r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF, defaults());
         }, opts -> {});
 
         waitForCondition(() -> validateStateMachine(sum(20), svc2, COUNTER_GROUP_0), 5_000);
@@ -818,8 +819,8 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
         svc2.stop();
 
         var svc3 = startServer(stopIdx, r -> {
-            r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF);
-            r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF);
+            r.startRaftGroup(COUNTER_GROUP_0, listenerFactory.get(), INITIAL_CONF, defaults());
+            r.startRaftGroup(COUNTER_GROUP_1, listenerFactory.get(), INITIAL_CONF, defaults());
         }, opts -> {});
 
         waitForCondition(() -> validateStateMachine(sum(20), svc3, COUNTER_GROUP_0), 5_000);
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
index 14f343def..46da74d31 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItSimpleCounterServerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.raft.server;
 
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
 import static org.apache.ignite.raft.jraft.test.TestUtils.waitForTopology;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -106,8 +107,12 @@ class ItSimpleCounterServerTest extends RaftServerAbstractTest {
 
         ClusterNode serverNode = server.clusterService().topologyService().localMember();
 
-        assertTrue(server.startRaftGroup(COUNTER_GROUP_ID_0, new CounterListener(), List.of(new Peer(serverNode.address()))));
-        assertTrue(server.startRaftGroup(COUNTER_GROUP_ID_1, new CounterListener(), List.of(new Peer(serverNode.address()))));
+        assertTrue(
+                server.startRaftGroup(COUNTER_GROUP_ID_0, new CounterListener(), List.of(new Peer(serverNode.address())), defaults())
+        );
+        assertTrue(
+                server.startRaftGroup(COUNTER_GROUP_ID_1, new CounterListener(), List.of(new Peer(serverNode.address())), defaults())
+        );
 
         ClusterService clientNode1 = clusterService(PORT + 1, List.of(addr), true);
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index d32e4d338..af3a5ed10 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -30,6 +30,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -152,20 +153,22 @@ public class Loza implements IgniteComponent {
      * @param groupId      Raft group id.
      * @param nodes        Raft group nodes.
      * @param lsnrSupplier Raft group listener supplier.
+     * @param groupOptions Options to apply to the group.
      * @return Future representing pending completion of the operation.
      * @throws NodeStoppingException If node stopping intention was detected.
      */
     public CompletableFuture<RaftGroupService> prepareRaftGroup(
             String groupId,
             List<ClusterNode> nodes,
-            Supplier<RaftGroupListener> lsnrSupplier
+            Supplier<RaftGroupListener> lsnrSupplier,
+            RaftGroupOptions groupOptions
     ) throws NodeStoppingException {
         if (!busyLock.enterBusy()) {
             throw new NodeStoppingException();
         }
 
         try {
-            return prepareRaftGroupInternal(groupId, nodes, lsnrSupplier, () -> RaftGroupEventsListener.noopLsnr);
+            return prepareRaftGroupInternal(groupId, nodes, lsnrSupplier, () -> RaftGroupEventsListener.noopLsnr, groupOptions);
         } finally {
             busyLock.leaveBusy();
         }
@@ -178,10 +181,16 @@ public class Loza implements IgniteComponent {
      * @param nodes                   Raft group nodes.
      * @param lsnrSupplier            Raft group listener supplier.
      * @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
+     * @param groupOptions Options to apply to the group.
      * @return Future representing pending completion of the operation.
      */
-    private CompletableFuture<RaftGroupService> prepareRaftGroupInternal(String groupId, List<ClusterNode> nodes,
-            Supplier<RaftGroupListener> lsnrSupplier, Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier) {
+    private CompletableFuture<RaftGroupService> prepareRaftGroupInternal(
+            String groupId,
+            List<ClusterNode> nodes,
+            Supplier<RaftGroupListener> lsnrSupplier,
+            Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
+            RaftGroupOptions groupOptions
+    ) {
         assert !nodes.isEmpty();
 
         List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -193,7 +202,7 @@ public class Loza implements IgniteComponent {
         if (hasLocalRaft) {
             LOG.info("Start new raft node for group={} with initial peers={}", groupId, peers);
 
-            if (!raftServer.startRaftGroup(groupId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers)) {
+            if (!raftServer.startRaftGroup(groupId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers, groupOptions)) {
                 throw new IgniteInternalException(IgniteStringFormatter.format(
                         "Raft group on the node is already started [node={}, raftGrp={}]",
                         locNodeName,
@@ -223,6 +232,7 @@ public class Loza implements IgniteComponent {
      * @param deltaNodes              New raft group nodes.
      * @param lsnrSupplier            Raft group listener supplier.
      * @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
+     * @param groupOptions Options to apply to the group.
      * @throws NodeStoppingException If node stopping intention was detected.
      */
     public void startRaftGroupNode(
@@ -230,7 +240,9 @@ public class Loza implements IgniteComponent {
             Collection<ClusterNode> nodes,
             Collection<ClusterNode> deltaNodes,
             Supplier<RaftGroupListener> lsnrSupplier,
-            Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier) throws NodeStoppingException {
+            Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
+            RaftGroupOptions groupOptions
+    ) throws NodeStoppingException {
         assert !nodes.isEmpty();
 
         if (!busyLock.enterBusy()) {
@@ -245,7 +257,7 @@ public class Loza implements IgniteComponent {
             if (deltaNodes.stream().anyMatch(n -> locNodeName.equals(n.name()))) {
                 LOG.info("Start new raft node for group={} with initial peers={}", grpId, peers);
 
-                if (!raftServer.startRaftGroup(grpId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers)) {
+                if (!raftServer.startRaftGroup(grpId, raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers, groupOptions)) {
                     throw new IgniteInternalException(IgniteStringFormatter.format(
                             "Raft group on the node is already started [node={}, raftGrp={}]",
                             locNodeName,
@@ -267,6 +279,7 @@ public class Loza implements IgniteComponent {
      * @param deltaNodes              New raft group nodes.
      * @param lsnrSupplier            Raft group listener supplier.
      * @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
+     * @param groupOptions Options to apply to the group.
      * @return Future representing pending completion of the operation.
      * @throws NodeStoppingException If node stopping intention was detected.
      */
@@ -275,14 +288,15 @@ public class Loza implements IgniteComponent {
             Collection<ClusterNode> nodes,
             Collection<ClusterNode> deltaNodes,
             Supplier<RaftGroupListener> lsnrSupplier,
-            Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier
+            Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
+            RaftGroupOptions groupOptions
     ) throws NodeStoppingException {
         if (!busyLock.enterBusy()) {
             throw new NodeStoppingException();
         }
 
         try {
-            return updateRaftGroupInternal(grpId, nodes, deltaNodes, lsnrSupplier, raftGrpEvtsLsnrSupplier);
+            return updateRaftGroupInternal(grpId, nodes, deltaNodes, lsnrSupplier, raftGrpEvtsLsnrSupplier, groupOptions);
         } finally {
             busyLock.leaveBusy();
         }
@@ -296,6 +310,7 @@ public class Loza implements IgniteComponent {
      * @param deltaNodes              New raft group nodes.
      * @param lsnrSupplier            Raft group listener supplier.
      * @param raftGrpEvtsLsnrSupplier Raft group events listener supplier.
+     * @param groupOptions Options to apply to the group.
      * @return Future representing pending completion of the operation.
      */
     private CompletableFuture<RaftGroupService> updateRaftGroupInternal(
@@ -303,7 +318,9 @@ public class Loza implements IgniteComponent {
             Collection<ClusterNode> nodes,
             Collection<ClusterNode> deltaNodes,
             Supplier<RaftGroupListener> lsnrSupplier,
-            Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier) {
+            Supplier<RaftGroupEventsListener> raftGrpEvtsLsnrSupplier,
+            RaftGroupOptions groupOptions
+    ) {
         assert !nodes.isEmpty();
 
         List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -313,7 +330,7 @@ public class Loza implements IgniteComponent {
         if (deltaNodes.stream().anyMatch(n -> locNodeName.equals(n.name()))) {
             LOG.info("Start new raft node for group={} with initial peers={}", grpId, peers);
 
-            if (!raftServer.startRaftGroup(grpId,  raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers)) {
+            if (!raftServer.startRaftGroup(grpId,  raftGrpEvtsLsnrSupplier.get(), lsnrSupplier.get(), peers, groupOptions)) {
                 throw new IgniteInternalException(IgniteStringFormatter.format(
                         "Raft group on the node is already started [node={}, raftGrp={}]",
                         locNodeName,
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
new file mode 100644
index 000000000..4880e3188
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.server;
+
+/**
+ * Options specific to a Raft group that is being started.
+ */
+public class RaftGroupOptions {
+    /** Whether volatile stores should be used for the corresponding Raft Group. Classic Raft uses persistent ones. */
+    private final boolean volatileStores;
+
+    /**
+     * Returns default options as defined by classic Raft (so stores are persistent).
+     *
+     * @return Default options.
+     */
+    public static RaftGroupOptions defaults() {
+        return forPersistentStores();
+    }
+
+    /**
+     * Returns options with persistent Raft stores.
+     *
+     * @return Options with persistent Raft stores.
+     */
+    public static RaftGroupOptions forPersistentStores() {
+        return new RaftGroupOptions(false);
+    }
+
+    /**
+     * Returns options with volatile Raft stores.
+     *
+     * @return Options with volatile Raft stores.
+     */
+    public static RaftGroupOptions forVolatileStores() {
+        return new RaftGroupOptions(true);
+    }
+
+    /**
+     * Creates options derived from table configuration.
+     *
+     * @param isVolatile Whether the table is configured as volatile (in-memory) or not.
+     * @return Options derived from table configuration.
+     */
+    public static RaftGroupOptions forTable(boolean isVolatile) {
+        return isVolatile ? forVolatileStores() : forPersistentStores();
+    }
+
+    private RaftGroupOptions(boolean volatileStores) {
+        this.volatileStores = volatileStores;
+    }
+
+    /**
+     * Returns {@code true} if the Raft group should store its metadata and logs in volatile storages, or {@code false}
+     * if these storages should be persistent.
+     *
+     * @return {@code true} if the Raft group should store its metadata and logs in volatile storages, or {@code false}
+     *     if these storages should be persistent.
+     */
+    public boolean volatileStores() {
+        return volatileStores;
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
index 3e795d73f..d57690e7a 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
@@ -43,9 +43,10 @@ public interface RaftServer extends IgniteComponent {
      * @param groupId     Group id.
      * @param lsnr        The listener.
      * @param initialConf Inititial group configuration.
+     * @param groupOptions Options to apply to the group.
      * @return {@code True} if a group was successfully started, {@code False} when the group with given name is already exists.
      */
-    boolean startRaftGroup(String groupId, RaftGroupListener lsnr, List<Peer> initialConf);
+    boolean startRaftGroup(String groupId, RaftGroupListener lsnr, List<Peer> initialConf, RaftGroupOptions groupOptions);
 
     /**
      * Starts a raft group bound to this cluster node.
@@ -54,10 +55,16 @@ public interface RaftServer extends IgniteComponent {
      * @param evLsnr      Listener for group membership and other events.
      * @param lsnr        Listener for state machine events.
      * @param initialConf Inititial group configuration.
+     * @param groupOptions Options to apply to the group.
      * @return {@code True} if a group was successfully started, {@code False} when the group with given name is already exists.
      */
-    boolean startRaftGroup(String groupId, RaftGroupEventsListener evLsnr,
-            RaftGroupListener lsnr, List<Peer> initialConf);
+    boolean startRaftGroup(
+            String groupId,
+            RaftGroupEventsListener evLsnr,
+            RaftGroupListener lsnr,
+            List<Peer> initialConf,
+            RaftGroupOptions groupOptions
+    );
 
     /**
      * Synchronously stops a raft group if any.
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index d15693164..438f96729 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.function.BiPredicate;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -57,6 +58,7 @@ import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
 import org.apache.ignite.raft.jraft.core.NodeImpl;
 import org.apache.ignite.raft.jraft.core.ReadOnlyServiceImpl;
 import org.apache.ignite.raft.jraft.core.StateMachineAdapter;
+import org.apache.ignite.raft.jraft.core.VolatileJRaftServiceFactory;
 import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor;
 import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.apache.ignite.raft.jraft.error.RaftError;
@@ -71,7 +73,6 @@ import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotWriter;
 import org.apache.ignite.raft.jraft.util.ExecutorServiceHelper;
 import org.apache.ignite.raft.jraft.util.ExponentialBackoffTimeoutStrategy;
 import org.apache.ignite.raft.jraft.util.JDKMarshaller;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -316,14 +317,24 @@ public class JraftServerImpl implements RaftServer {
 
     /** {@inheritDoc} */
     @Override
-    public synchronized boolean startRaftGroup(String groupId, RaftGroupListener lsnr, @Nullable List<Peer> initialConf) {
-        return startRaftGroup(groupId, RaftGroupEventsListener.noopLsnr, lsnr, initialConf);
+    public synchronized boolean startRaftGroup(
+            String groupId,
+            RaftGroupListener lsnr,
+            @Nullable List<Peer> initialConf,
+            RaftGroupOptions groupOptions
+    ) {
+        return startRaftGroup(groupId, RaftGroupEventsListener.noopLsnr, lsnr, initialConf, groupOptions);
     }
 
     /** {@inheritDoc} */
     @Override
-    public synchronized boolean startRaftGroup(String groupId, @NotNull RaftGroupEventsListener evLsnr,
-            RaftGroupListener lsnr, @Nullable List<Peer> initialConf) {
+    public synchronized boolean startRaftGroup(
+            String groupId,
+            RaftGroupEventsListener evLsnr,
+            RaftGroupListener lsnr,
+            @Nullable List<Peer> initialConf,
+            RaftGroupOptions groupOptions
+    ) {
         if (groups.containsKey(groupId)) {
             return false;
         }
@@ -331,6 +342,7 @@ public class JraftServerImpl implements RaftServer {
         // Thread pools are shared by all raft groups.
         NodeOptions nodeOptions = opts.copy();
 
+        // TODO: IGNITE-17083 - Do not create paths for volatile stores at all when we get rid of snapshot storage on FS.
         Path serverDataPath = getServerDataPath(groupId);
 
         try {
@@ -339,13 +351,19 @@ public class JraftServerImpl implements RaftServer {
             throw new IgniteInternalException(e);
         }
 
-        nodeOptions.setRaftMetaUri(serverDataPath.resolve("meta").toString());
+        if (!groupOptions.volatileStores()) {
+            nodeOptions.setRaftMetaUri(serverDataPath.resolve("meta").toString());
+        }
         nodeOptions.setSnapshotUri(serverDataPath.resolve("snapshot").toString());
 
         nodeOptions.setFsm(new DelegatingStateMachine(lsnr));
 
         nodeOptions.setRaftGrpEvtsLsnr(evLsnr);
 
+        if (groupOptions.volatileStores()) {
+            nodeOptions.setServiceFactory(new VolatileJRaftServiceFactory());
+        }
+
         if (initialConf != null) {
             List<PeerId> mapped = initialConf.stream().map(PeerId::fromPeer).collect(Collectors.toList());
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftServiceFactory.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftServiceFactory.java
index 2ebf6c260..5ec6b4efc 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftServiceFactory.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftServiceFactory.java
@@ -17,11 +17,13 @@
 package org.apache.ignite.raft.jraft;
 
 import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
+import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
 import org.apache.ignite.raft.jraft.option.NodeOptions;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
 import org.apache.ignite.raft.jraft.storage.RaftMetaStorage;
 import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
+import org.apache.ignite.raft.jraft.util.timer.DefaultRaftTimerFactory;
 import org.apache.ignite.raft.jraft.util.timer.RaftTimerFactory;
 
 /**
@@ -60,12 +62,16 @@ public interface JRaftServiceFactory {
      *
      * @return a codec factory to create encoder/decoder for raft log entry.
      */
-    LogEntryCodecFactory createLogEntryCodecFactory();
+    default LogEntryCodecFactory createLogEntryCodecFactory() {
+        return LogEntryV1CodecFactory.getInstance();
+    }
 
     /**
      * Creates raft timer factory.
      *
      * @return The factory.
      */
-    RaftTimerFactory createRaftTimerFactory();
+    default RaftTimerFactory createRaftTimerFactory() {
+        return new DefaultRaftTimerFactory();
+    }
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java
index 39aa02b70..c6c71bcf5 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java
@@ -17,8 +17,6 @@
 package org.apache.ignite.raft.jraft.core;
 
 import org.apache.ignite.raft.jraft.JRaftServiceFactory;
-import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
-import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
 import org.apache.ignite.raft.jraft.storage.LogStorageFactory;
@@ -28,8 +26,6 @@ import org.apache.ignite.raft.jraft.storage.impl.LocalRaftMetaStorage;
 import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotStorage;
 import org.apache.ignite.raft.jraft.util.Requires;
 import org.apache.ignite.raft.jraft.util.StringUtils;
-import org.apache.ignite.raft.jraft.util.timer.DefaultRaftTimerFactory;
-import org.apache.ignite.raft.jraft.util.timer.RaftTimerFactory;
 
 /**
  * The default factory for JRaft services.
@@ -57,12 +53,4 @@ public class DefaultJRaftServiceFactory implements JRaftServiceFactory {
         Requires.requireTrue(!StringUtils.isBlank(uri), "Blank raft meta storage uri.");
         return new LocalRaftMetaStorage(uri, raftOptions);
     }
-
-    @Override public LogEntryCodecFactory createLogEntryCodecFactory() {
-        return LogEntryV1CodecFactory.getInstance();
-    }
-
-    @Override public RaftTimerFactory createRaftTimerFactory() {
-        return new DefaultRaftTimerFactory();
-    }
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/VolatileJRaftServiceFactory.java
similarity index 51%
copy from modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java
copy to modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/VolatileJRaftServiceFactory.java
index 39aa02b70..6eb22616e 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/DefaultJRaftServiceFactory.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/VolatileJRaftServiceFactory.java
@@ -17,52 +17,38 @@
 package org.apache.ignite.raft.jraft.core;
 
 import org.apache.ignite.raft.jraft.JRaftServiceFactory;
-import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory;
-import org.apache.ignite.raft.jraft.entity.codec.v1.LogEntryV1CodecFactory;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
-import org.apache.ignite.raft.jraft.storage.LogStorageFactory;
 import org.apache.ignite.raft.jraft.storage.RaftMetaStorage;
 import org.apache.ignite.raft.jraft.storage.SnapshotStorage;
-import org.apache.ignite.raft.jraft.storage.impl.LocalRaftMetaStorage;
+import org.apache.ignite.raft.jraft.storage.impl.LocalLogStorage;
+import org.apache.ignite.raft.jraft.storage.impl.VolatileRaftMetaStorage;
 import org.apache.ignite.raft.jraft.storage.snapshot.local.LocalSnapshotStorage;
 import org.apache.ignite.raft.jraft.util.Requires;
 import org.apache.ignite.raft.jraft.util.StringUtils;
-import org.apache.ignite.raft.jraft.util.timer.DefaultRaftTimerFactory;
-import org.apache.ignite.raft.jraft.util.timer.RaftTimerFactory;
 
 /**
- * The default factory for JRaft services.
+ * The factory for JRaft services producing volatile stores. Useful for Raft groups hosting partitions of in-memory tables.
  */
-public class DefaultJRaftServiceFactory implements JRaftServiceFactory {
-
-    private final LogStorageFactory logStorageFactory;
-
-    public DefaultJRaftServiceFactory(LogStorageFactory factory) {
-        logStorageFactory = factory;
-    }
-
-    @Override public LogStorage createLogStorage(final String groupId, final RaftOptions raftOptions) {
+public class VolatileJRaftServiceFactory implements JRaftServiceFactory {
+    @Override
+    public LogStorage createLogStorage(final String groupId, final RaftOptions raftOptions) {
         Requires.requireTrue(StringUtils.isNotBlank(groupId), "Blank group id.");
 
-        return logStorageFactory.getLogStorage(groupId, raftOptions);
+        return new LocalLogStorage(raftOptions);
     }
 
-    @Override public SnapshotStorage createSnapshotStorage(final String uri, final RaftOptions raftOptions) {
+    @Override
+    public SnapshotStorage createSnapshotStorage(final String uri, final RaftOptions raftOptions) {
         Requires.requireTrue(!StringUtils.isBlank(uri), "Blank snapshot storage uri.");
-        return new LocalSnapshotStorage(uri, raftOptions);
-    }
 
-    @Override public RaftMetaStorage createRaftMetaStorage(final String uri, final RaftOptions raftOptions) {
-        Requires.requireTrue(!StringUtils.isBlank(uri), "Blank raft meta storage uri.");
-        return new LocalRaftMetaStorage(uri, raftOptions);
-    }
+        // TODO: IGNITE-17083 - return an in-memory store here (or get rid of SnapshotStorage)
 
-    @Override public LogEntryCodecFactory createLogEntryCodecFactory() {
-        return LogEntryV1CodecFactory.getInstance();
+        return new LocalSnapshotStorage(uri, raftOptions);
     }
 
-    @Override public RaftTimerFactory createRaftTimerFactory() {
-        return new DefaultRaftTimerFactory();
+    @Override
+    public RaftMetaStorage createRaftMetaStorage(final String uri, final RaftOptions raftOptions) {
+        return new VolatileRaftMetaStorage();
     }
 }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/VolatileStorage.java
similarity index 68%
copy from modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java
copy to modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/VolatileStorage.java
index f689eb94c..215742468 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/VolatileStorage.java
@@ -14,14 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.raft.jraft.storage.impl;
 
-import org.apache.ignite.raft.jraft.option.RaftOptions;
-import org.apache.ignite.raft.jraft.storage.LogStorage;
+package org.apache.ignite.raft.jraft.storage;
 
-public class LocalLogStorageTest extends BaseLogStorageTest {
-    @Override
-    protected LogStorage newLogStorage() {
-        return new LocalLogStorage(this.path.toString(), new RaftOptions());
-    }
+/**
+ * Marker interface for a storage that is volatile (i.e. it keeps its data in memory).
+ */
+public interface VolatileStorage {
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorage.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorage.java
index 7531b8718..1695036b0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorage.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorage.java
@@ -32,13 +32,14 @@ import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
 import org.apache.ignite.raft.jraft.option.LogStorageOptions;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.VolatileStorage;
 import org.apache.ignite.raft.jraft.util.Describer;
 import org.apache.ignite.raft.jraft.util.Requires;
 
 /**
  * Stores log in heap.
  */
-public class LocalLogStorage implements LogStorage, Describer {
+public class LocalLogStorage implements LogStorage, Describer, VolatileStorage {
     private static final IgniteLogger LOG = IgniteLogger.forClass(LocalLogStorage.class);
 
     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -56,7 +57,7 @@ public class LocalLogStorage implements LogStorage, Describer {
 
     private volatile boolean initialized = false;
 
-    public LocalLogStorage(final String path, final RaftOptions raftOptions) {
+    public LocalLogStorage(final RaftOptions raftOptions) {
         super();
     }
 
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/VolatileRaftMetaStorage.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/VolatileRaftMetaStorage.java
new file mode 100644
index 000000000..90564fc9f
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/VolatileRaftMetaStorage.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.option.RaftMetaStorageOptions;
+import org.apache.ignite.raft.jraft.storage.RaftMetaStorage;
+import org.apache.ignite.raft.jraft.storage.VolatileStorage;
+
+/**
+ * Volatile (in-memory) implementation of {@link RaftMetaStorage}. Used for Raft groups storing partition data of
+ * volatile storages.
+ */
+public class VolatileRaftMetaStorage implements RaftMetaStorage, VolatileStorage {
+    private volatile long term;
+
+    private volatile PeerId votedFor = PeerId.emptyPeer();
+
+    @Override
+    public boolean init(RaftMetaStorageOptions opts) {
+        return true;
+    }
+
+    @Override
+    public void shutdown() {
+        // no-op
+    }
+
+    @Override
+    public boolean setTerm(long term) {
+        this.term = term;
+
+        return true;
+    }
+
+    @Override
+    public long getTerm() {
+        return term;
+    }
+
+    @Override
+    public boolean setVotedFor(PeerId peerId) {
+        this.votedFor = peerId;
+
+        return true;
+    }
+
+    @Override
+    public PeerId getVotedFor() {
+        return votedFor;
+    }
+
+    @Override
+    public boolean setTermAndVotedFor(long term, PeerId peerId) {
+        this.term = term;
+        this.votedFor = peerId;
+
+        return true;
+    }
+}
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
index 26857cafd..b13450a3c 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/LozaTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.raft;
 
+import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.List;
@@ -77,8 +78,14 @@ public class LozaTest extends IgniteAbstractTest {
 
         Supplier<RaftGroupListener> lsnrSupplier = () -> null;
 
-        assertThrows(NodeStoppingException.class, () -> loza.updateRaftGroup(raftGroupId, nodes, newNodes, lsnrSupplier, () -> null));
+        assertThrows(
+                NodeStoppingException.class,
+                () -> loza.updateRaftGroup(raftGroupId, nodes, newNodes, lsnrSupplier, () -> null, defaults())
+        );
         assertThrows(NodeStoppingException.class, () -> loza.stopRaftGroup(raftGroupId));
-        assertThrows(NodeStoppingException.class, () -> loza.prepareRaftGroup(raftGroupId, nodes, lsnrSupplier));
+        assertThrows(
+                NodeStoppingException.class,
+                () -> loza.prepareRaftGroup(raftGroupId, nodes, lsnrSupplier, defaults())
+        );
     }
 }
diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
index 296fb0157..a6b92248e 100644
--- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/server/impl/RaftServerImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiConsumer;
 import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.lang.IgniteStringFormatter;
@@ -173,7 +174,7 @@ public class RaftServerImpl implements RaftServer {
     /** {@inheritDoc} */
     @Override
     public synchronized boolean startRaftGroup(String groupId, RaftGroupListener lsnr,
-            List<Peer> initialConf) {
+            List<Peer> initialConf, RaftGroupOptions groupOptions) {
         if (listeners.containsKey(groupId)) {
             return false;
         }
@@ -185,8 +186,14 @@ public class RaftServerImpl implements RaftServer {
 
     /** {@inheritDoc} */
     @Override
-    public boolean startRaftGroup(String groupId, RaftGroupEventsListener evLsnr, RaftGroupListener lsnr, List<Peer> initialConf) {
-        return startRaftGroup(groupId, lsnr, initialConf);
+    public boolean startRaftGroup(
+            String groupId,
+            RaftGroupEventsListener evLsnr,
+            RaftGroupListener lsnr,
+            List<Peer> initialConf,
+            RaftGroupOptions groupOptions
+    ) {
+        return startRaftGroup(groupId, lsnr, initialConf, groupOptions);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java
index 96ab9938a..4766cd419 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java
@@ -27,7 +27,7 @@ public class TestJRaftServiceFactory extends DefaultJRaftServiceFactory {
 
     @Override
     public LogStorage createLogStorage(final String groupId, final RaftOptions raftOptions) {
-        return new LocalLogStorage(null, raftOptions);
+        return new LocalLogStorage(raftOptions);
     }
 
 }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/VolatileJRaftServiceFactoryTest.java
similarity index 55%
copy from modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java
copy to modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/VolatileJRaftServiceFactoryTest.java
index 96ab9938a..da74083e8 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestJRaftServiceFactory.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/VolatileJRaftServiceFactoryTest.java
@@ -14,20 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.ignite.raft.jraft.core;
 
 import org.apache.ignite.raft.jraft.option.RaftOptions;
-import org.apache.ignite.raft.jraft.storage.LogStorage;
-import org.apache.ignite.raft.jraft.storage.impl.LocalLogStorage;
+import org.apache.ignite.raft.jraft.storage.VolatileStorage;
+import org.junit.jupiter.api.Test;
 
-public class TestJRaftServiceFactory extends DefaultJRaftServiceFactory {
-    public TestJRaftServiceFactory() {
-        super(null);
-    }
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 
-    @Override
-    public LogStorage createLogStorage(final String groupId, final RaftOptions raftOptions) {
-        return new LocalLogStorage(null, raftOptions);
+class VolatileJRaftServiceFactoryTest {
+    private final VolatileJRaftServiceFactory serviceFactory = new VolatileJRaftServiceFactory();
+
+    @Test
+    void producesVolatileMetaStorage() {
+        assertThat(serviceFactory.createRaftMetaStorage("test", new RaftOptions()), is(instanceOf(VolatileStorage.class)));
     }
 
-}
+    @Test
+    void producesVolatileLogStorage() {
+        assertThat(serviceFactory.createLogStorage("test", new RaftOptions()), is(instanceOf(VolatileStorage.class)));
+    }
+}
\ No newline at end of file
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java
index f689eb94c..e183a982e 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LocalLogStorageTest.java
@@ -22,6 +22,6 @@ import org.apache.ignite.raft.jraft.storage.LogStorage;
 public class LocalLogStorageTest extends BaseLogStorageTest {
     @Override
     protected LogStorage newLogStorage() {
-        return new LocalLogStorage(this.path.toString(), new RaftOptions());
+        return new LocalLogStorage(new RaftOptions());
     }
 }
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
index 0ab6d0ba7..022fbabdd 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
@@ -107,7 +107,7 @@ public class LogManagerTest extends BaseStorageTest {
     }
 
     protected LogStorage newLogStorage(final RaftOptions raftOptions) {
-        return new LocalLogStorage(this.path.toString(), raftOptions);
+        return new LocalLogStorage(raftOptions);
     }
 
     @AfterEach
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/VolatileRaftMetaStorageTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/VolatileRaftMetaStorageTest.java
new file mode 100644
index 000000000..208818a22
--- /dev/null
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/VolatileRaftMetaStorageTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.option.RaftMetaStorageOptions;
+import org.apache.ignite.raft.jraft.storage.VolatileStorage;
+import org.junit.jupiter.api.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class VolatileRaftMetaStorageTest {
+    private final VolatileRaftMetaStorage storage = new VolatileRaftMetaStorage();
+
+    private final PeerId peerId = new PeerId("1.2.3.4", 4000);
+
+    @Test
+    void initReturnsTrue() {
+        assertTrue(storage.init(new RaftMetaStorageOptions()));
+    }
+
+    @Test
+    void shutdownDoesNothing() {
+        assertDoesNotThrow(storage::shutdown);
+    }
+
+    @Test
+    void returnsDefaultValuesWhenNothingSet() {
+        assertThat(storage.getTerm(), is(0L));
+        assertThat(storage.getVotedFor(), is(PeerId.emptyPeer()));
+    }
+
+    @Test
+    void setTermReturnsTrue() {
+        assertTrue(storage.setTerm(42));
+    }
+
+    @Test
+    void setsAndReturnsTerm() {
+        storage.setTerm(3);
+
+        assertThat(storage.getTerm(), is(3L));
+    }
+
+    @Test
+    void setVotedForReturnsTrue() {
+        assertTrue(storage.setVotedFor(peerId));
+    }
+
+    @Test
+    void setsAndReturnsPeer() {
+        storage.setVotedFor(peerId);
+
+        assertThat(storage.getVotedFor(), is(sameInstance(peerId)));
+    }
+
+    @Test
+    void setsTermAndVotedForTogether() {
+        storage.setTermAndVotedFor(4, peerId);
+
+        assertThat(storage.getTerm(), is(4L));
+        assertThat(storage.getVotedFor(), is(sameInstance(peerId)));
+    }
+
+    @Test
+    void isInstanceOfVolatileStorage() {
+        assertThat(storage, is(instanceOf(VolatileStorage.class)));
+    }
+}
\ No newline at end of file
diff --git a/modules/runner/pom.xml b/modules/runner/pom.xml
index f591a2e05..7ea282796 100644
--- a/modules/runner/pom.xml
+++ b/modules/runner/pom.xml
@@ -132,6 +132,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>ca.seinesoftware</groupId>
+            <artifactId>hamcrest-path</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-api</artifactId>
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
index 256f9e5b1..675d57556 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal;
 
 import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -68,7 +69,7 @@ public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractT
 
     /** Work directory. */
     @WorkDirectory
-    private static Path WORK_DIR;
+    protected static Path WORK_DIR;
 
     /**
      * Before all.
@@ -153,4 +154,10 @@ public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractT
     protected final IgniteImpl node(int index) {
         return (IgniteImpl) clusterNodes.get(index);
     }
+
+    protected List<List<Object>> executeSql(String sql, Object... args) {
+        return getAllFromCursor(
+                node(0).queryEngine().queryAsync("PUBLIC", sql, args).get(0).join()
+        );
+    }
 }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
index a2dc8f327..2857a6938 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.compute;
 
 import static java.util.stream.Collectors.joining;
 import static java.util.stream.Collectors.toList;
-import static org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.aMapWithSize;
@@ -279,12 +278,6 @@ class ItComputeTest extends AbstractClusterIntegrationTest {
         assertThat(actualNodeName, in(allNodeNames()));
     }
 
-    private List<List<Object>> executeSql(String sql, Object... args) {
-        return getAllFromCursor(
-                node(0).queryEngine().queryAsync("PUBLIC", sql, args).get(0).join()
-        );
-    }
-
     private static class ConcatJob implements ComputeJob<String> {
         /** {@inheritDoc} */
         @Override
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilenessTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilenessTest.java
new file mode 100644
index 000000000..e187435e6
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilenessTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.inmemory;
+
+import static ca.seinesoftware.hamcrest.path.PathMatcher.exists;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.rocksdb.RocksDB.DEFAULT_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.AbstractClusterIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.junit.jupiter.api.Test;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+
+/**
+ * Tests for making sure that RAFT groups corresponding to partition stores of in-memory tables use volatile
+ * storages for storing RAFT meta and RAFT log, while they are persistent for persistent storages.
+ */
+class ItRaftStorageVolatilenessTest extends AbstractClusterIntegrationTest {
+    private static final String TABLE_NAME = "test";
+
+    @Override
+    protected int nodes() {
+        return 1;
+    }
+
+    @Test
+    void raftMetaStorageIsVolatileForVolatilePartitions() {
+        createInMemoryTable();
+
+        IgniteImpl ignite = node(0);
+
+        assertThat(partitionRaftMetaPaths(ignite), everyItem(not(exists())));
+    }
+
+    private void createInMemoryTable() {
+        executeSql("CREATE TABLE " + TABLE_NAME + " (k int, v int, CONSTRAINT PK PRIMARY KEY (k)) ENGINE aimem");
+    }
+
+    /**
+     * Returns paths for 'meta' directories corresponding to Raft meta storages for partitions of the test table.
+     *
+     * @param ignite Ignite instance.
+     * @return Paths for 'meta' directories corresponding to Raft meta storages for partitions of the test table.
+     */
+    private List<Path> partitionRaftMetaPaths(IgniteImpl ignite) {
+        try (Stream<Path> paths = Files.list(WORK_DIR.resolve(ignite.name()))) {
+            return paths
+                    .filter(path -> isPartitionDir(path, ignite))
+                    .map(path -> path.resolve("meta"))
+                    .collect(toList());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private boolean isPartitionDir(Path path, IgniteImpl ignite) {
+        return path.getFileName().toString().startsWith(testTablePartitionPrefix(ignite));
+    }
+
+    private String testTablePartitionPrefix(IgniteImpl ignite) {
+        return testTableId(ignite) + "_part_";
+    }
+
+    private UUID testTableId(IgniteImpl ignite) {
+        TableManager tables = (TableManager) ignite.tables();
+        return tables.tableImpl("PUBLIC." + TABLE_NAME).tableId();
+    }
+
+    @Test
+    void raftLogStorageIsVolatileForVolatilePartitions() throws Exception {
+        createInMemoryTable();
+
+        IgniteImpl ignite = node(0);
+        String nodeName = ignite.name();
+        String tablePartitionPrefix = testTablePartitionPrefix(ignite);
+
+        node(0).close();
+
+        Path logRocksDbDir = WORK_DIR.resolve(nodeName).resolve("log");
+
+        List<ColumnFamilyDescriptor> cfDescriptors = List.of(
+                // Column family to store configuration log entry.
+                new ColumnFamilyDescriptor("Configuration".getBytes(UTF_8)),
+                // Default column family to store user data log entry.
+                new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY)
+        );
+
+        List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+
+        try (RocksDB db = RocksDB.open(logRocksDbDir.toString(), cfDescriptors, cfHandles)) {
+            assertThatFamilyHasNoDataForPartition(db, tablePartitionPrefix, cfHandles.get(0));
+            assertThatFamilyHasNoDataForPartition(db, tablePartitionPrefix, cfHandles.get(1));
+        }
+    }
+
+    private void assertThatFamilyHasNoDataForPartition(RocksDB db, String tablePartitionPrefix, ColumnFamilyHandle cfHandle) {
+        try (
+                ReadOptions readOptions = new ReadOptions().setIterateLowerBound(new Slice(tablePartitionPrefix.getBytes(UTF_8)));
+                RocksIterator iterator = db.newIterator(cfHandle, readOptions)
+        ) {
+            iterator.seekToFirst();
+
+            if (iterator.isValid()) {
+                String key = new String(iterator.key(), UTF_8);
+                assertThat(key, not(startsWith(tablePartitionPrefix)));
+            }
+        }
+    }
+
+    @Test
+    void raftMetaStorageIsPersistentForPersistentPartitions() {
+        createPersistentTable();
+
+        IgniteImpl ignite = node(0);
+
+        assertThat(partitionRaftMetaPaths(ignite), everyItem(exists()));
+    }
+
+    private void createPersistentTable() {
+        executeSql("CREATE TABLE " + TABLE_NAME + " (k int, v int, CONSTRAINT PK PRIMARY KEY (k)) ENGINE rocksdb");
+    }
+
+    @Test
+    void raftLogStorageIsPersistentForPersistentPartitions() throws Exception {
+        createPersistentTable();
+
+        IgniteImpl ignite = node(0);
+        String nodeName = ignite.name();
+        String tablePartitionPrefix = testTablePartitionPrefix(ignite);
+
+        node(0).close();
+
+        Path logRocksDbDir = WORK_DIR.resolve(nodeName).resolve("log");
+
+        List<ColumnFamilyDescriptor> cfDescriptors = List.of(
+                // Column family to store configuration log entry.
+                new ColumnFamilyDescriptor("Configuration".getBytes(UTF_8)),
+                // Default column family to store user data log entry.
+                new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY)
+        );
+
+        List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+
+        try (RocksDB db = RocksDB.open(logRocksDbDir.toString(), cfDescriptors, cfHandles)) {
+            assertThatFamilyHasDataForPartition(db, tablePartitionPrefix, cfHandles.get(0));
+            assertThatFamilyHasDataForPartition(db, tablePartitionPrefix, cfHandles.get(1));
+        }
+    }
+
+    private void assertThatFamilyHasDataForPartition(RocksDB db, String tablePartitionPrefix, ColumnFamilyHandle cfHandle) {
+        try (
+                ReadOptions readOptions = new ReadOptions().setIterateLowerBound(new Slice(tablePartitionPrefix.getBytes(UTF_8)));
+                RocksIterator iterator = db.newIterator(cfHandle, readOptions)
+        ) {
+            iterator.seekToFirst();
+
+            assertThat(iterator.isValid(), is(true));
+
+            String key = new String(iterator.key(), UTF_8);
+            assertThat(key, startsWith(tablePartitionPrefix));
+        }
+    }
+
+    @Test
+    void inMemoryTableWorks() {
+        createInMemoryTable();
+
+        executeSql("INSERT INTO " + TABLE_NAME + "(k, v) VALUES (1, 101)");
+
+        List<List<Object>> tuples = executeSql("SELECT k, v FROM " + TABLE_NAME);
+
+        assertThat(tuples, equalTo(List.of(List.of(1, 101))));
+    }
+}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 5bf3f7485..7c43644c0 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -628,7 +628,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
      * @return Table manager.
      */
     private TableManager mockManagers() throws NodeStoppingException {
-        when(rm.prepareRaftGroup(any(), any(), any())).thenAnswer(mock -> {
+        when(rm.prepareRaftGroup(any(), any(), any(), any())).thenAnswer(mock -> {
             RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
 
             when(raftGrpSrvcMock.leader()).thenReturn(new Peer(new NetworkAddress("localhost", 47500)));
@@ -636,7 +636,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
             return completedFuture(raftGrpSrvcMock);
         });
 
-        when(rm.updateRaftGroup(any(), any(), any(), any(), any())).thenAnswer(mock -> {
+        when(rm.updateRaftGroup(any(), any(), any(), any(), any(), any())).thenAnswer(mock -> {
             RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
 
             when(raftGrpSrvcMock.leader()).thenReturn(new Peer(new NetworkAddress("localhost", 47500)));
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
index cb6b9dd37..7ab3ec975 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
@@ -78,6 +78,13 @@ public interface TableStorage {
      */
     void dropIndex(String indexName);
 
+    /**
+     * Returns {@code true} if this storage is volatile (i.e. stores its data in memory), or {@code false} if it's persistent.
+     *
+     * @return {@code true} if this storage is volatile (i.e. stores its data in memory), or {@code false} if it's persistent.
+     */
+    boolean isVolatile();
+
     /**
      * Returns the table configuration.
      */
diff --git a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapTableStorage.java b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapTableStorage.java
index d2c4eecbd..42c8705dd 100644
--- a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapTableStorage.java
+++ b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapTableStorage.java
@@ -85,6 +85,11 @@ public class TestConcurrentHashMapTableStorage implements TableStorage {
         throw new UnsupportedOperationException("Not supported yet");
     }
 
+    @Override
+    public boolean isVolatile() {
+        return true;
+    }
+
     /** {@inheritDoc} */
     @Override
     public TableConfiguration configuration() {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 6dff5ce59..058421613 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -20,13 +20,16 @@ package org.apache.ignite.internal.storage.pagememory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableView;
 import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.PartitionStorage;
 import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.engine.TableStorage;
 import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.storage.pagememory.mv.PageMemoryMvPartitionStorage;
@@ -39,7 +42,7 @@ import org.jetbrains.annotations.TestOnly;
  * Abstract table storage implementation based on {@link PageMemory}.
  */
 // TODO: IGNITE-16642 Support indexes.
-public abstract class AbstractPageMemoryTableStorage implements TableStorage {
+public abstract class AbstractPageMemoryTableStorage implements TableStorage, MvTableStorage {
     protected final TableConfiguration tableCfg;
 
     /** List of objects to be closed on the {@link #stop}. */
@@ -49,6 +52,8 @@ public abstract class AbstractPageMemoryTableStorage implements TableStorage {
 
     protected volatile AtomicReferenceArray<PartitionStorage> partitions;
 
+    protected volatile AtomicReferenceArray<MvPartitionStorage> mvPartitions;
+
     /**
      * Constructor.
      *
@@ -71,6 +76,8 @@ public abstract class AbstractPageMemoryTableStorage implements TableStorage {
 
         partitions = new AtomicReferenceArray<>(tableView.partitions());
 
+        mvPartitions = new AtomicReferenceArray<>(tableView.partitions());
+
         started = true;
     }
 
@@ -147,6 +154,57 @@ public abstract class AbstractPageMemoryTableStorage implements TableStorage {
      */
     protected abstract VolatilePageMemoryPartitionStorage createPartitionStorage(int partId) throws StorageException;
 
+    /** {@inheritDoc} */
+    @Override
+    public MvPartitionStorage getOrCreateMvPartition(int partitionId) throws StorageException {
+        MvPartitionStorage partition = getMvPartition(partitionId);
+
+        if (partition != null) {
+            return partition;
+        }
+
+        partition = createMvPartitionStorage(partitionId);
+
+        mvPartitions.set(partitionId, partition);
+
+        return partition;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public MvPartitionStorage getMvPartition(int partitionId) {
+        assert started : "Storage has not started yet";
+
+        if (partitionId < 0 || partitionId >= mvPartitions.length()) {
+            throw new IllegalArgumentException(S.toString(
+                    "Unable to access partition with id outside of configured range",
+                    "table", tableCfg.name().value(), false,
+                    "partitionId", partitionId, false,
+                    "partitions", mvPartitions.length(), false
+            ));
+        }
+
+        return mvPartitions.get(partitionId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<?> destroyPartition(int partitionId) throws StorageException {
+        assert started : "Storage has not started yet";
+
+        MvPartitionStorage partition = getMvPartition(partitionId);
+
+        if (partition != null) {
+            mvPartitions.set(partitionId, null);
+
+            // TODO: IGNITE-17197 Actually destroy the partition.
+            //partition.destroy();
+        }
+
+        // TODO: IGNITE-17197 Convert this to true async code.
+        return CompletableFuture.completedFuture(null);
+    }
+
     /**
      * This API is not yet ready. But we need to test mv storages anyways.
      */
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 4f5b6e70a..3d9e57d47 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -56,6 +56,11 @@ class PersistentPageMemoryTableStorage extends AbstractPageMemoryTableStorage {
         this.dataRegion = dataRegion;
     }
 
+    @Override
+    public boolean isVolatile() {
+        return false;
+    }
+
     /** {@inheritDoc} */
     @Override
     public void start() throws StorageException {
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
index 503c5e6da..9cffb7c37 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/VolatilePageMemoryTableStorage.java
@@ -60,6 +60,11 @@ class VolatilePageMemoryTableStorage extends AbstractPageMemoryTableStorage {
         );
     }
 
+    @Override
+    public boolean isVolatile() {
+        return true;
+    }
+
     /** {@inheritDoc} */
     @Override
     public void destroy() throws StorageException {
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 8315fb3a4..8a94317ca 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -312,6 +312,11 @@ class RocksDbTableStorage implements TableStorage, MvTableStorage {
         });
     }
 
+    @Override
+    public boolean isVolatile() {
+        return false;
+    }
+
     /**
      * Checks that a passed partition id is within the proper bounds.
      *
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
index 49362bda4..23c86dec5 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
@@ -298,4 +298,9 @@ public class RocksDbTableStorageTest {
         assertThat(partitionStorage1.read(overwriteData), is(testData));
         assertThat(partitionStorage2.read(overwriteData), is(testData));
     }
+
+    @Test
+    void storageAdvertisesItIsPersistent() {
+        assertThat(storage.isVolatile(), is(false));
+    }
 }
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
index 70be355f2..b9d0fd0eb 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.server.RaftServer;
 import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -160,7 +161,8 @@ public class ItInternalTableScanTest {
         raftSrv.startRaftGroup(
                 grpName,
                 new PartitionListener(tblId, new VersionedRowStore(mockStorage, txManager)),
-                conf
+                conf,
+                RaftGroupOptions.defaults()
         );
 
         executor = new ScheduledThreadPoolExecutor(20, new NamedThreadFactory(Loza.CLIENT_POOL_NAME));
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index d5e46ab37..84bd2f393 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -36,6 +36,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.affinity.RendezvousAffinityFunction;
 import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.storage.basic.TestMvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.TableStorage;
@@ -275,7 +276,8 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
                         grpId,
                         partNodes,
                         () -> new PartitionListener(tblId,
-                                new VersionedRowStore(new TestMvPartitionStorage(List.of(), 0), txManagers.get(node)))
+                                new VersionedRowStore(new TestMvPartitionStorage(List.of(), 0), txManagers.get(node))),
+                        RaftGroupOptions.defaults()
                 );
             }
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index ff07d7806..23f99d201 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -79,6 +79,7 @@ import org.apache.ignite.internal.metastorage.client.WatchEvent;
 import org.apache.ignite.internal.metastorage.client.WatchListener;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.schema.SchemaUtils;
@@ -480,7 +481,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                     partId,
                                     busyLock,
                                     movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
-                                    rebalanceScheduler)
+                                    rebalanceScheduler),
+                            groupOptionsForInternalTable(internalTbl)
                     ).thenAccept(
                             updatedRaftGroupService -> ((InternalTableImpl) internalTbl)
                                     .updateInternalTableRaftGroupService(partId, updatedRaftGroupService)
@@ -500,6 +502,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         CompletableFuture.allOf(futures).join();
     }
 
+    private RaftGroupOptions groupOptionsForInternalTable(InternalTable internalTbl) {
+        return RaftGroupOptions.forTable(internalTbl.storage().isVolatile());
+    }
+
     /** {@inheritDoc} */
     @Override
     public void stop() {
@@ -1291,8 +1297,14 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                         + "check if current node={} should start new raft group node for partition rebalance.",
                                 pendingAssignmentsWatchEvent.key(), part, tbl.name(), localMember.address());
 
-                        raftMgr.startRaftGroupNode(partId, assignments, deltaPeers, raftGrpLsnrSupplier,
-                                raftGrpEvtsLsnrSupplier);
+                        raftMgr.startRaftGroupNode(
+                                partId,
+                                assignments,
+                                deltaPeers,
+                                raftGrpLsnrSupplier,
+                                raftGrpEvtsLsnrSupplier,
+                                groupOptionsForInternalTable(tbl.internalTable())
+                        );
                     } catch (NodeStoppingException e) {
                         // no-op
                     }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 568e2e049..06cd02022 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -270,7 +270,7 @@ public class TableManagerTest extends IgniteAbstractTest {
      */
     @Test
     public void testPreconfiguredTable() throws Exception {
-        when(rm.updateRaftGroup(any(), any(), any(), any(), any())).thenAnswer(mock ->
+        when(rm.updateRaftGroup(any(), any(), any(), any(), any(), any())).thenAnswer(mock ->
                 CompletableFuture.completedFuture(mock(RaftGroupService.class)));
 
         TableManager tableManager = createTableManager(tblManagerFut, false);
@@ -446,7 +446,7 @@ public class TableManagerTest extends IgniteAbstractTest {
 
         mockManagersAndCreateTable(scmTbl, tblManagerFut);
 
-        verify(rm, times(PARTITIONS)).updateRaftGroup(anyString(), any(), any(), any(), any());
+        verify(rm, times(PARTITIONS)).updateRaftGroup(anyString(), any(), any(), any(), any(), any());
 
         TableManager tableManager = tblManagerFut.join();
 
@@ -555,7 +555,7 @@ public class TableManagerTest extends IgniteAbstractTest {
             CompletableFuture<TableManager> tblManagerFut,
             Phaser phaser
     ) throws Exception {
-        when(rm.updateRaftGroup(any(), any(), any(), any(), any())).thenAnswer(mock -> {
+        when(rm.updateRaftGroup(any(), any(), any(), any(), any(), any())).thenAnswer(mock -> {
             RaftGroupService raftGrpSrvcMock = mock(RaftGroupService.class);
 
             when(raftGrpSrvcMock.leader()).thenReturn(new Peer(new NetworkAddress("localhost", 47500)));
diff --git a/parent/pom.xml b/parent/pom.xml
index d69aea3a3..4da425afd 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -81,6 +81,7 @@
         <typesafe.version>1.4.1</typesafe.version>
         <hamcrest.version>2.2</hamcrest.version>
         <hamcrest.optional.version>2.0.0</hamcrest.optional.version>
+        <hamcrest.path.version>1.0.1</hamcrest.path.version>
         <scalecube.version>2.6.12</scalecube.version>
         <calcite.version>1.30.0</calcite.version>
         <immutables.version>2.8.8</immutables.version>
@@ -806,6 +807,18 @@
                 </exclusions>
             </dependency>
 
+            <dependency>
+                <groupId>ca.seinesoftware</groupId>
+                <artifactId>hamcrest-path</artifactId>
+                <version>${hamcrest.path.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.hamcrest</groupId>
+                        <artifactId>hamcrest-core</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
             <dependency>
                 <groupId>io.scalecube</groupId>
                 <artifactId>scalecube-cluster</artifactId>