You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/08/30 14:43:43 UTC

[ignite-3] branch main updated: IGNITE-17196 In-memory raft group reconfiguration on node failure (#1016)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 467f36afb1 IGNITE-17196 In-memory raft group reconfiguration on node failure (#1016)
467f36afb1 is described below

commit 467f36afb16d42ad8340ebe5e702d4f5cb999d7e
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Tue Aug 30 18:43:39 2022 +0400

    IGNITE-17196 In-memory raft group reconfiguration on node failure (#1016)
---
 .../ignite/internal/affinity/AffinityUtils.java    |  57 +++-
 .../affinity/RendezvousAffinityFunction.java       |  52 ++-
 .../org/apache/ignite/network/ClusterNode.java     |   5 +-
 .../org/apache/ignite/network/NetworkAddress.java  |   5 +-
 .../internal/compute/ComputeMessageTypes.java      |   2 +-
 .../ignite/internal/thread/NamedThreadFactory.java |  12 +
 .../org/apache/ignite/internal/util/ByteUtils.java |   4 +-
 .../client/ItMetaStorageRaftGroupTest.java         |   2 +-
 .../ItMetaStorageServicePersistenceTest.java       |   2 +-
 .../client/ItMetaStorageServiceTest.java           |   4 +-
 .../metastorage/client/MetaStorageServiceImpl.java |  10 +-
 .../metastorage/client/SimpleCondition.java        |  12 +
 .../internal/metastorage/MetaStorageManager.java   |   2 +-
 .../metastorage/MetaStorageRangeCursorTest.java    |   2 +-
 .../processor/messages/MessageImplGenerator.java   |  13 +
 .../ignite/internal/network/netty/NettyUtils.java  |   6 +-
 .../java/org/apache/ignite/internal/raft/Loza.java |  20 ++
 .../raft/jraft/rpc/impl/RaftGroupServiceImpl.java  |  53 ++-
 .../rpc/impl/cli/IgniteCliRpcRequestClosure.java   |   6 +-
 .../storage/ItRebalanceDistributedTest.java        |   7 +-
 .../app/ItIgniteInMemoryNodeRestartTest.java       | 354 +++++++++++++++++++++
 .../runner/app/ItIgniteNodeRestartTest.java        |   3 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   8 +
 .../sql/engine/exec/MockedStructuresTest.java      |   4 +
 modules/table/pom.xml                              |  10 +
 .../internal/table/distributed/TableManager.java   | 265 ++++++++++++---
 .../raft/RebalanceRaftGroupEventsListener.java     | 297 ++++++++++++-----
 .../internal/table/message/HasDataRequest.java}    |  26 +-
 .../internal/table/message/HasDataResponse.java}   |  24 +-
 .../internal/table/message/TableMessageGroup.java} |  22 +-
 .../ignite/internal/utils/RebalanceUtil.java       | 278 +++++++++++++++-
 .../table/distributed/TableManagerTest.java        |  10 +-
 .../raft/RebalanceRaftGroupEventsListenerTest.java |   2 +-
 33 files changed, 1354 insertions(+), 225 deletions(-)

diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
index 557d512124..58dbbc4eeb 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityUtils.java
@@ -17,8 +17,12 @@
 
 package org.apache.ignite.internal.affinity;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.function.IntFunction;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.NotNull;
 
@@ -30,7 +34,31 @@ public class AffinityUtils {
      * Calculates affinity assignments.
      *
      * @param partitions Partitions count.
-     * @param replicas   Replicas count.
+     * @param replicas Replicas count.
+     * @param aggregator Function that creates a collection for the partition assignments.
+     * @return List nodes by partition.
+     */
+    public static <T extends Collection<ClusterNode>> List<T> calculateAssignments(
+            @NotNull Collection<ClusterNode> baselineNodes,
+            int partitions,
+            int replicas,
+            IntFunction<T> aggregator
+    ) {
+        return RendezvousAffinityFunction.assignPartitions(
+                baselineNodes,
+                partitions,
+                replicas,
+                false,
+                null,
+                aggregator
+        );
+    }
+
+    /**
+     * Calculates affinity assignments.
+     *
+     * @param partitions Partitions count.
+     * @param replicas Replicas count.
      * @return List nodes by partition.
      */
     public static List<List<ClusterNode>> calculateAssignments(
@@ -38,12 +66,35 @@ public class AffinityUtils {
             int partitions,
             int replicas
     ) {
-        return RendezvousAffinityFunction.assignPartitions(
+        return calculateAssignments(
                 baselineNodes,
                 partitions,
                 replicas,
+                ArrayList::new
+        );
+    }
+
+    /**
+     * Calculates affinity assignments for single partition.
+     *
+     * @param baselineNodes Nodes.
+     * @param partition Partition id.
+     * @param replicas Replicas count.
+     * @return List of nodes.
+     */
+    public static Set<ClusterNode> calculateAssignmentForPartition(
+            Collection<ClusterNode> baselineNodes,
+            int partition,
+            int replicas
+    ) {
+        return RendezvousAffinityFunction.assignPartition(
+                partition,
+                new ArrayList<>(baselineNodes),
+                replicas,
+                null,
                 false,
-                null
+                null,
+                HashSet::new
         );
     }
 }
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
index fe16b07b0f..0a60caa22a 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.function.BiPredicate;
+import java.util.function.IntFunction;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -85,18 +86,24 @@ public class RendezvousAffinityFunction {
      * @param neighborhoodCache Neighborhood.
      * @param exclNeighbors     If true neighbors are excluded, false otherwise.
      * @param nodeFilter        Filter for nodes.
+     * @param aggregator        Function that creates a collection for the partition assignments.
      * @return Assignment.
      */
-    public static List<ClusterNode> assignPartition(
+    public static <T extends Collection<ClusterNode>> T assignPartition(
             int part,
             List<ClusterNode> nodes,
             int replicas,
             Map<String, Collection<ClusterNode>> neighborhoodCache,
             boolean exclNeighbors,
-            BiPredicate<ClusterNode, List<ClusterNode>> nodeFilter
+            BiPredicate<ClusterNode, T> nodeFilter,
+            IntFunction<T> aggregator
     ) {
         if (nodes.size() <= 1) {
-            return nodes;
+            T res = aggregator.apply(1);
+
+            res.addAll(nodes);
+
+            return res;
         }
 
         IgniteBiTuple<Long, ClusterNode>[] hashArr =
@@ -118,12 +125,12 @@ public class RendezvousAffinityFunction {
 
         // REPLICATED cache case
         if (replicas == Integer.MAX_VALUE) {
-            return replicatedAssign(nodes, sortedNodes);
+            return replicatedAssign(nodes, sortedNodes, aggregator);
         }
 
         Iterator<ClusterNode> it = sortedNodes.iterator();
 
-        List<ClusterNode> res = new ArrayList<>(effectiveReplicas);
+        T res = aggregator.apply(effectiveReplicas);
 
         Collection<ClusterNode> allNeighbors = new HashSet<>();
 
@@ -188,13 +195,14 @@ public class RendezvousAffinityFunction {
      *
      * @param nodes       Topology.
      * @param sortedNodes Sorted for specified partitions nodes.
+     * @param aggregator  Function that creates a collection for the partition assignments.
      * @return Assignment.
      */
-    private static List<ClusterNode> replicatedAssign(List<ClusterNode> nodes,
-            Iterable<ClusterNode> sortedNodes) {
+    private static <T extends Collection<ClusterNode>> T replicatedAssign(List<ClusterNode> nodes,
+            Iterable<ClusterNode> sortedNodes, IntFunction<T> aggregator) {
         ClusterNode first = sortedNodes.iterator().next();
 
-        List<ClusterNode> res = new ArrayList<>(nodes.size());
+        T res = aggregator.apply(nodes.size());
 
         res.add(first);
 
@@ -238,7 +246,7 @@ public class RendezvousAffinityFunction {
      * @param currentTopologySnapshot List of topology nodes.
      * @param partitions              Number of table partitions.
      * @param replicas                Number partition replicas.
-     * @param exclNeighbors           If true neighbors are excluded fro the one partition assignment, false otherwise.
+     * @param exclNeighbors           If true neighbors are excluded from the one partition assignment, false otherwise.
      * @param nodeFilter              Filter for nodes.
      * @return List nodes by partition.
      */
@@ -248,19 +256,41 @@ public class RendezvousAffinityFunction {
             int replicas,
             boolean exclNeighbors,
             BiPredicate<ClusterNode, List<ClusterNode>> nodeFilter
+    ) {
+        return assignPartitions(currentTopologySnapshot, partitions, replicas, exclNeighbors, nodeFilter, ArrayList::new);
+    }
+
+    /**
+     * Generates an assignment by the given parameters.
+     *
+     * @param currentTopologySnapshot List of topology nodes.
+     * @param partitions              Number of table partitions.
+     * @param replicas                Number partition replicas.
+     * @param exclNeighbors           If true neighbors are excluded from the one partition assignment, false otherwise.
+     * @param nodeFilter              Filter for nodes.
+     * @param aggregator              Function that creates a collection for the partition assignments.
+     * @return List nodes by partition.
+     */
+    public static <T extends Collection<ClusterNode>> List<T> assignPartitions(
+            Collection<ClusterNode> currentTopologySnapshot,
+            int partitions,
+            int replicas,
+            boolean exclNeighbors,
+            BiPredicate<ClusterNode, T> nodeFilter,
+            IntFunction<T> aggregator
     ) {
         assert partitions <= MAX_PARTITIONS_COUNT : "partitions <= " + MAX_PARTITIONS_COUNT;
         assert partitions > 0 : "parts > 0";
         assert replicas > 0 : "replicas > 0";
 
-        List<List<ClusterNode>> assignments = new ArrayList<>(partitions);
+        List<T> assignments = new ArrayList<>(partitions);
 
         Map<String, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ? neighbors(currentTopologySnapshot) : null;
 
         List<ClusterNode> nodes = new ArrayList<>(currentTopologySnapshot);
 
         for (int i = 0; i < partitions; i++) {
-            List<ClusterNode> partAssignment = assignPartition(i, nodes, replicas, neighborhoodCache, exclNeighbors, nodeFilter);
+            T partAssignment = assignPartition(i, nodes, replicas, neighborhoodCache, exclNeighbors, nodeFilter, aggregator);
 
             assignments.add(partAssignment);
         }
diff --git a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
index e267d292e4..f9e8088602 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.network;
 
 import java.io.Serializable;
-import java.util.Objects;
 import org.apache.ignite.internal.tostring.S;
 
 /**
@@ -90,7 +89,9 @@ public class ClusterNode implements Serializable {
     /** {@inheritDoc} */
     @Override
     public int hashCode() {
-        return Objects.hash(name, address);
+        int result = name.hashCode();
+        result = 31 * result + address.hashCode();
+        return result;
     }
 
     /** {@inheritDoc} */
diff --git a/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java b/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java
index 5ca50da82c..fdeff7ac8a 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/NetworkAddress.java
@@ -149,7 +149,10 @@ public class NetworkAddress implements Serializable {
     /** {@inheritDoc} */
     @Override
     public int hashCode() {
-        return Objects.hash(host, port, consistentId);
+        int result = host.hashCode();
+        result = 31 * result + port;
+        result = 31 * result + (consistentId != null ? consistentId.hashCode() : 0);
+        return result;
     }
 
     /** {@inheritDoc} */
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
index 8f684ba0c2..1e5a4f54ce 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
+++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
@@ -24,7 +24,7 @@ import org.apache.ignite.network.annotations.MessageGroup;
 /**
  * Message types for the Compute module.
  */
-@MessageGroup(groupName = "ComputeMessages", groupType = 6)
+@MessageGroup(groupType = 6, groupName = "ComputeMessages")
 public class ComputeMessageTypes {
     /**
      * Type for {@link ExecuteRequest}.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
index ec9869cc4f..20b065717a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
@@ -91,4 +91,16 @@ public class NamedThreadFactory implements ThreadFactory {
     public static String threadPrefix(String nodeName, String poolName) {
         return IgniteThread.threadPrefix(nodeName, poolName);
     }
+
+    /**
+     * Creates a thread factory based on a node's name and a name of the pool.
+     *
+     * @param nodeName Node name.
+     * @param poolName Pool name.
+     * @param logger Logger.
+     * @return Thread factory.
+     */
+    public static NamedThreadFactory create(String nodeName, String poolName, IgniteLogger logger) {
+        return new NamedThreadFactory(threadPrefix(nodeName, poolName), logger);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
index abcf06e3c3..180aca3a5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ByteUtils.java
@@ -123,12 +123,12 @@ public class ByteUtils {
      * @param bytes Byte array.
      * @return Object.
      */
-    public static Object fromBytes(byte[] bytes) {
+    public static <T> T fromBytes(byte[] bytes) {
         try (
                 var bis = new ByteArrayInputStream(bytes);
                 var in = new ObjectInputStream(bis)
         ) {
-            return in.readObject();
+            return (T) in.readObject();
         } catch (IOException | ClassNotFoundException e) {
             throw new IgniteInternalException("Could not deserialize an object", e);
         }
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
index 03ff8611a8..566db7580e 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
@@ -259,7 +259,7 @@ public class ItMetaStorageRaftGroupTest {
                 .findFirst().get().value;
 
         MetaStorageService metaStorageSvc = new MetaStorageServiceImpl(
-                raftGroupServiceOfLiveServer, "some_node");
+                raftGroupServiceOfLiveServer, "some_node", "some_node");
 
         Cursor<Entry> cursor = metaStorageSvc.range(EXPECTED_RESULT_ENTRY1.key(), new ByteArray(new byte[]{4}));
 
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java
index 5b2781c5f7..57972b0e6d 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServicePersistenceTest.java
@@ -65,7 +65,7 @@ public class ItMetaStorageServicePersistenceTest extends ItAbstractListenerSnaps
     /** {@inheritDoc} */
     @Override
     public void beforeFollowerStop(RaftGroupService service) throws Exception {
-        metaStorage = new MetaStorageServiceImpl(service, null);
+        metaStorage = new MetaStorageServiceImpl(service, null, null);
 
         // Put some data in the metastorage
         metaStorage.put(FIRST_KEY, FIRST_VALUE).get();
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
index fcd9dae271..ffc2c383b6 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
@@ -923,7 +923,7 @@ public class ItMetaStorageServiceTest {
         ).get(3, TimeUnit.SECONDS);
 
         try {
-            MetaStorageService metaStorageSvc2 = new MetaStorageServiceImpl(metaStorageRaftGrpSvc, NODE_ID_1);
+            MetaStorageService metaStorageSvc2 = new MetaStorageServiceImpl(metaStorageRaftGrpSvc, NODE_ID_1, NODE_ID_1);
 
             Cursor<Entry> cursorNode0 = metaStorageSvc.range(EXPECTED_RESULT_ENTRY.key(), null);
 
@@ -1078,6 +1078,6 @@ public class ItMetaStorageServiceTest {
                 executor
         ).get(3, TimeUnit.SECONDS);
 
-        return new MetaStorageServiceImpl(metaStorageRaftGrpSvc, NODE_ID_0);
+        return new MetaStorageServiceImpl(metaStorageRaftGrpSvc, NODE_ID_0, NODE_ID_0);
     }
 }
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
index eb2895c7f3..d2102beabc 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
@@ -91,16 +91,21 @@ public class MetaStorageServiceImpl implements MetaStorageService {
     /** Local node id. */
     private final String localNodeId;
 
+    /** Local node name. */
+    private final String localNodeName;
+
     /**
      * Constructor.
      *
      * @param metaStorageRaftGrpSvc Meta storage raft group service.
      * @param localNodeId           Local node id.
+     * @param localNodeName         Local node name.
      */
-    public MetaStorageServiceImpl(RaftGroupService metaStorageRaftGrpSvc, String localNodeId) {
+    public MetaStorageServiceImpl(RaftGroupService metaStorageRaftGrpSvc, String localNodeId, String localNodeName) {
         this.metaStorageRaftGrpSvc = metaStorageRaftGrpSvc;
         this.watchProcessor = new WatchProcessor();
         this.localNodeId = localNodeId;
+        this.localNodeName = localNodeName;
     }
 
     /** {@inheritDoc} */
@@ -533,6 +538,7 @@ public class MetaStorageServiceImpl implements MetaStorageService {
              * @param lsnr   The listener which receives and handles watch updates.
              */
             Watcher(Cursor<WatchEvent> cursor, WatchListener lsnr) {
+                setName("ms-watcher-" + localNodeName);
                 this.cursor = cursor;
                 this.lsnr = lsnr;
             }
@@ -554,6 +560,8 @@ public class MetaStorageServiceImpl implements MetaStorageService {
                                 watchEvt = watchEvtsIter.next();
                             } catch (Throwable e) {
                                 lsnr.onError(e);
+
+                                throw e;
                             }
 
                             assert watchEvt != null;
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/SimpleCondition.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/SimpleCondition.java
index 6382a0daee..36be2553a9 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/SimpleCondition.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/SimpleCondition.java
@@ -77,6 +77,8 @@ public final class SimpleCondition implements Condition {
          * @throws IllegalStateException In the case when the condition is already defined.
          */
         public SimpleCondition eq(long rev) {
+            assert rev > 0 : "Revision must be positive.";
+
             validate(type());
 
             type(ConditionType.REV_EQUAL);
@@ -95,6 +97,8 @@ public final class SimpleCondition implements Condition {
          * @throws IllegalStateException In the case when the condition is already defined.
          */
         public SimpleCondition ne(long rev) {
+            assert rev > 0 : "Revision must be positive.";
+
             validate(type());
 
             type(ConditionType.REV_NOT_EQUAL);
@@ -113,6 +117,8 @@ public final class SimpleCondition implements Condition {
          * @throws IllegalStateException In the case when the condition is already defined.
          */
         public SimpleCondition gt(long rev) {
+            assert rev > 0 : "Revision must be positive.";
+
             validate(type());
 
             type(ConditionType.REV_GREATER);
@@ -131,6 +137,8 @@ public final class SimpleCondition implements Condition {
          * @throws IllegalStateException In the case when the condition is already defined.
          */
         public SimpleCondition ge(long rev) {
+            assert rev > 0 : "Revision must be positive.";
+
             validate(type());
 
             type(ConditionType.REV_GREATER_OR_EQUAL);
@@ -149,6 +157,8 @@ public final class SimpleCondition implements Condition {
          * @throws IllegalStateException In the case when the condition is already defined.
          */
         public SimpleCondition lt(long rev) {
+            assert rev > 0 : "Revision must be positive.";
+
             validate(type());
 
             type(ConditionType.REV_LESS);
@@ -167,6 +177,8 @@ public final class SimpleCondition implements Condition {
          * @throws IllegalStateException In the case when the condition is already defined.
          */
         public SimpleCondition le(long rev) {
+            assert rev > 0 : "Revision must be positive.";
+
             validate(type());
 
             type(ConditionType.REV_LESS_OR_EQUAL);
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 542b3e544c..6284048e92 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -185,7 +185,7 @@ public class MetaStorageManager implements IgniteComponent {
                     RaftGroupOptions.defaults()
             );
 
-            return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id()));
+            return raftServiceFuture.thenApply(service -> new MetaStorageServiceImpl(service, thisNode.id(), thisNode.name()));
         } catch (NodeStoppingException e) {
             return CompletableFuture.failedFuture(e);
         }
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/MetaStorageRangeCursorTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/MetaStorageRangeCursorTest.java
index 44d274bdfd..fb4176bf48 100644
--- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/MetaStorageRangeCursorTest.java
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/MetaStorageRangeCursorTest.java
@@ -84,7 +84,7 @@ public class MetaStorageRangeCursorTest {
 
         when(raftGroupService.run(any())).thenAnswer(invocation -> runCommand(invocation.getArgument(0)));
 
-        MetaStorageService metaStorageService = new MetaStorageServiceImpl(raftGroupService, "test");
+        MetaStorageService metaStorageService = new MetaStorageServiceImpl(raftGroupService, "test", "test");
 
         checkCursor(metaStorageService.range(intToBytes(0), intToBytes(keyTo)), limit);
         checkCursor(metaStorageService.range(intToBytes(0), intToBytes(keyTo), 0), limit);
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index a8393dc84c..f5b28328fe 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -49,6 +49,8 @@ import javax.tools.Diagnostic;
 import org.apache.ignite.internal.network.processor.MessageClass;
 import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
 import org.apache.ignite.internal.network.processor.TypeUtils;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.annotations.Marshallable;
 
@@ -105,6 +107,7 @@ public class MessageImplGenerator {
             String getterName = getter.getSimpleName().toString();
 
             FieldSpec.Builder fieldBuilder = FieldSpec.builder(getterReturnType, getterName)
+                    .addAnnotation(IgniteToStringInclude.class)
                     .addModifiers(Modifier.PRIVATE);
 
             boolean isMarshallable = getter.getAnnotation(Marshallable.class) != null;
@@ -163,6 +166,16 @@ public class MessageImplGenerator {
 
         messageImpl.addMethod(groupTypeMethod);
 
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-17591
+        MethodSpec toStringMethod = MethodSpec.methodBuilder("toString")
+                .addAnnotation(Override.class)
+                .addModifiers(Modifier.PUBLIC)
+                .returns(String.class)
+                .addStatement("return $T.toString($T.class, this)", S.class, messageImplClassName)
+                .build();
+
+        messageImpl.addMethod(toStringMethod);
+
         // message type constant and getter
         FieldSpec messageTypeField = FieldSpec.builder(short.class, "TYPE")
                 .addModifiers(Modifier.PUBLIC, Modifier.STATIC, Modifier.FINAL)
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java
index e8709c84b4..53747ef562 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyUtils.java
@@ -22,6 +22,8 @@ import io.netty.channel.ChannelFuture;
 import io.netty.util.concurrent.Future;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import org.jetbrains.annotations.Async.Execute;
+import org.jetbrains.annotations.Async.Schedule;
 
 /**
  * Netty utilities.
@@ -38,12 +40,12 @@ public class NettyUtils {
      * @return CompletableFuture.
      */
     public static <T, R, F extends Future<R>> CompletableFuture<T> toCompletableFuture(
-            F nettyFuture,
+            @Schedule F nettyFuture,
             Function<F, T> mapper
     ) {
         var fut = new CompletableFuture<T>();
 
-        nettyFuture.addListener((F future) -> {
+        nettyFuture.addListener((@Execute F future) -> {
             if (future.isSuccess()) {
                 fut.complete(mapper.apply(future));
             } else if (future.isCancelled()) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index eb5f0c325d..3fec3bcdaf 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -43,6 +43,8 @@ import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.client.Peer;
 import org.apache.ignite.raft.client.service.RaftGroupListener;
 import org.apache.ignite.raft.client.service.RaftGroupService;
@@ -325,6 +327,24 @@ public class Loza implements IgniteComponent {
         }
     }
 
+    /**
+     * Returns messaging service.
+     *
+     * @return Messaging service.
+     */
+    public MessagingService messagingService() {
+        return clusterNetSvc.messagingService();
+    }
+
+    /**
+     * Returns topology service.
+     *
+     * @return Topology service.
+     */
+    public TopologyService topologyService() {
+        return clusterNetSvc.topologyService();
+    }
+
     /**
      * Returns a cluster service.
      *
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index fc9ed2b9dd..880c91a612 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -46,6 +46,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
@@ -582,7 +583,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
                             LOG.warn("Recoverable error during the request type={} occurred (will be retried on the randomly selected node): ",
                                     err, req.getClass().getSimpleName());
 
-                            sendWithRetry(randomNode(), req, stopTime, fut);
+                            sendWithRetry(randomNode(peer), req, stopTime, fut);
 
                             return null;
                         }, retryDelay, TimeUnit.MILLISECONDS);
@@ -603,7 +604,17 @@ public class RaftGroupServiceImpl implements RaftGroupService {
                         resp0.errorCode() == (RaftError.EAGAIN.getNumber()) ||
                         resp0.errorCode() == (RaftError.ENOENT.getNumber())) { // Possibly a node has not been started.
                         executor.schedule(() -> {
-                            sendWithRetry(peer, req, stopTime, fut);
+                            Peer targetPeer = peer;
+
+                            if (resp0.errorCode() == RaftError.ENOENT.getNumber()) {
+                                // If changing peers or requesting a leader and something is not found
+                                // probably target peer is doing rebalancing, try another peer.
+                                if (req instanceof GetLeaderRequest || req instanceof ChangePeersAsyncRequest) {
+                                    targetPeer = randomNode(peer);
+                                }
+                            }
+
+                            sendWithRetry(targetPeer, req, stopTime, fut);
 
                             return null;
                         }, retryDelay, TimeUnit.MILLISECONDS);
@@ -614,7 +625,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
                         resp0.errorCode() == RaftError.EINTERNAL.getNumber()) {
                         if (resp0.leaderId() == null) {
                             executor.schedule(() -> {
-                                sendWithRetry(randomNode(), req, stopTime, fut);
+                                sendWithRetry(randomNode(peer), req, stopTime, fut);
 
                                 return null;
                             }, retryDelay, TimeUnit.MILLISECONDS);
@@ -679,15 +690,45 @@ public class RaftGroupServiceImpl implements RaftGroupService {
         return t instanceof TimeoutException || t instanceof IOException;
     }
 
+    private Peer randomNode() {
+        return randomNode(null);
+    }
+
     /**
-     * @return Random node.
+     * Returns a random peer. Tries 5 times finding a peer different from the excluded peer.
+     * If excluded peer is null, just returns a random peer.
+     *
+     * @param excludedPeer Excluded peer.
+     * @return Random peer.
      */
-    private Peer randomNode() {
+    private Peer randomNode(Peer excludedPeer) {
         List<Peer> peers0 = peers;
 
         assert peers0 != null && !peers0.isEmpty();
 
-        return peers0.get(current().nextInt(peers0.size()));
+        int lastPeerIndex = -1;
+
+        if (excludedPeer != null) {
+            lastPeerIndex = peers0.indexOf(excludedPeer);
+        }
+
+        int retries = 0;
+
+        ThreadLocalRandom random = current();
+
+        int newIdx = 0;
+
+        while (retries < 5) {
+            newIdx = random.nextInt(peers0.size());
+
+            if (newIdx != lastPeerIndex) {
+                break;
+            }
+
+            retries++;
+        }
+
+        return peers0.get(newIdx);
     }
 
     /**
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/IgniteCliRpcRequestClosure.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/IgniteCliRpcRequestClosure.java
index 42f8c2a883..dd8a1acc65 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/IgniteCliRpcRequestClosure.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/cli/IgniteCliRpcRequestClosure.java
@@ -74,9 +74,9 @@ public class IgniteCliRpcRequestClosure implements Closure {
             if (err.errorCode() == RaftError.EPERM.getNumber()) {
                 newLeader = node.getLeaderId();
 
-                delegate.sendResponse(
-                    RaftRpcFactory.DEFAULT
-                        .newResponse(newLeader.toString(), getMsgFactory(), RaftError.EPERM, err.errorMsg()));
+                String leaderId = newLeader != null ? newLeader.toString() : null;
+
+                delegate.sendResponse(RaftRpcFactory.DEFAULT.newResponse(leaderId, getMsgFactory(), RaftError.EPERM, err.errorMsg()));
                 return;
             }
         }
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index b8afff7126..e064f0207a 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -335,7 +335,7 @@ public class ItRebalanceDistributedTest {
         return nodes.stream().filter(n -> n.address().equals(addr)).findFirst().get();
     }
 
-    private List<ClusterNode> getPartitionClusterNodes(int nodeNum, int partNum) {
+    private Set<ClusterNode> getPartitionClusterNodes(int nodeNum, int partNum) {
         var table = ((ExtendedTableConfiguration) nodes.get(nodeNum).clusterCfgMgr.configurationRegistry()
                 .getConfiguration(TablesConfiguration.KEY).tables().get("PUBLIC.TBL1"));
 
@@ -343,11 +343,11 @@ public class ItRebalanceDistributedTest {
             var assignments = table.assignments().value();
 
             if (assignments != null) {
-                return ((List<List<ClusterNode>>) ByteUtils.fromBytes(assignments)).get(partNum);
+                return ((List<Set<ClusterNode>>) ByteUtils.fromBytes(assignments)).get(partNum);
             }
         }
 
-        return List.of();
+        return Set.of();
     }
 
     private static class Node {
@@ -480,6 +480,7 @@ public class ItRebalanceDistributedTest {
             schemaManager = new SchemaManager(registry, tablesCfg);
 
             tableManager = new TableManager(
+                    name,
                     registry,
                     tablesCfg,
                     raftManager,
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
new file mode 100644
index 0000000000..41179fe67b
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.IntFunction;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryDataStorageChange;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.schema.SchemaBuilders;
+import org.apache.ignite.schema.definition.ColumnType;
+import org.apache.ignite.schema.definition.TableDefinition;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * These tests check in-memory node restart scenarios.
+ */
+public class ItIgniteInMemoryNodeRestartTest extends IgniteAbstractTest {
+    /** Default node port. */
+    private static final int DEFAULT_NODE_PORT = 3344;
+
+    /** Value producer for table data, is used to create data and check it later. */
+    private static final IntFunction<String> VALUE_PRODUCER = i -> "val " + i;
+
+    /** Prefix for full table name. */
+    private static final String SCHEMA_PREFIX = "PUBLIC.";
+
+    /** Test table name. */
+    private static final String TABLE_NAME = "Table1";
+
+    /** Nodes bootstrap configuration pattern. */
+    private static final String NODE_BOOTSTRAP_CFG = "{\n"
+            + "  network.port: {},\n"
+            + "  network.nodeFinder.netClusterNodes: {}\n"
+            + "}";
+
+    /** Cluster nodes. */
+    private static final List<Ignite> CLUSTER_NODES = new ArrayList<>();
+
+    private static final List<String> CLUSTER_NODES_NAMES = new ArrayList<>();
+
+    /**
+     * Stops all started nodes.
+     */
+    @AfterEach
+    public void afterEach() throws Exception {
+        var closeables = new ArrayList<AutoCloseable>();
+
+        for (String name : CLUSTER_NODES_NAMES) {
+            if (name != null) {
+                closeables.add(() -> IgnitionManager.stop(name));
+            }
+        }
+
+        IgniteUtils.closeAll(closeables);
+
+        CLUSTER_NODES.clear();
+        CLUSTER_NODES_NAMES.clear();
+    }
+
+    /**
+     * Start node with the given parameters.
+     *
+     * @param idx Node index, is used to stop the node later, see {@link #stopNode(int)}.
+     * @param nodeName Node name.
+     * @param cfgString Configuration string.
+     * @param workDir Working directory.
+     * @return Created node instance.
+     */
+    private static IgniteImpl startNode(int idx, String nodeName, @Nullable String cfgString, Path workDir) {
+        assertTrue(CLUSTER_NODES.size() == idx || CLUSTER_NODES.get(idx) == null);
+
+        CLUSTER_NODES_NAMES.add(idx, nodeName);
+
+        CompletableFuture<Ignite> future = IgnitionManager.start(nodeName, cfgString, workDir.resolve(nodeName));
+
+        if (CLUSTER_NODES.isEmpty()) {
+            IgnitionManager.init(nodeName, List.of(nodeName), "cluster");
+        }
+
+        assertThat(future, willCompleteSuccessfully());
+
+        Ignite ignite = future.join();
+
+        CLUSTER_NODES.add(idx, ignite);
+
+        return (IgniteImpl) ignite;
+    }
+
+    /**
+     * Start node with the given parameters.
+     *
+     * @param testInfo Test info.
+     * @param idx Node index, is used to stop the node later, see {@link #stopNode(int)}.
+     * @return Created node instance.
+     */
+    private IgniteImpl startNode(TestInfo testInfo, int idx) {
+        int port = DEFAULT_NODE_PORT + idx;
+        String nodeName = testNodeName(testInfo, port);
+        String cfgString = configurationString(idx);
+
+        return startNode(idx, nodeName, cfgString, workDir.resolve(nodeName));
+    }
+
+    /**
+     * Build a configuration string.
+     *
+     * @param idx Node index.
+     * @return Configuration string.
+     */
+    private static String configurationString(int idx) {
+        int port = DEFAULT_NODE_PORT + idx;
+
+        // The address of the first node.
+        @Language("HOCON") String connectAddr = "[localhost\":\"" + DEFAULT_NODE_PORT + "]";
+
+        return IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, port, connectAddr);
+    }
+
+    /**
+     * Stop the node with given index.
+     *
+     * @param idx Node index.
+     */
+    private static void stopNode(int idx) {
+        Ignite node = CLUSTER_NODES.get(idx);
+
+        if (node != null) {
+            IgnitionManager.stop(node.name());
+
+            CLUSTER_NODES.set(idx, null);
+            CLUSTER_NODES_NAMES.set(idx, null);
+        }
+    }
+
+    /**
+     * Restarts an in-memory node that is not a leader of the table's partition.
+     */
+    @Test
+    public void inMemoryNodeRestartNotLeader(TestInfo testInfo) throws Exception {
+        // Start three nodes, the first one is going to be CMG and MetaStorage leader.
+        IgniteImpl ignite = startNode(testInfo, 0);
+        startNode(testInfo, 1);
+        startNode(testInfo, 2);
+
+        // Create a table with replica on every node.
+        createTableWithData(ignite, TABLE_NAME, 3, 1);
+
+        String tableName = SCHEMA_PREFIX + TABLE_NAME;
+
+        TableImpl table = (TableImpl) ignite.tables().table(tableName);
+        String tableId = table.tableId().toString();
+
+        // Find the leader of the table's partition group.
+        RaftGroupService raftGroupService = table.internalTable().partitionRaftGroupService(0);
+        IgniteBiTuple<Peer, Long> leaderWithTerm = raftGroupService.refreshAndGetLeaderWithTerm().join();
+        NetworkAddress leaderAddress = leaderWithTerm.get1().address();
+
+        // Find the index of any node that is not a leader of the partition group.
+        int idxToStop = IntStream.range(1, 3)
+                .filter(idx -> !leaderAddress.equals(ignite(idx).node().address()))
+                .findFirst().getAsInt();
+
+        // Restart the node.
+        stopNode(idxToStop);
+
+        IgniteImpl restartingNode = startNode(testInfo, idxToStop);
+
+        Loza loza = restartingNode.raftManager();
+
+        // Check that it restarts.
+        assertTrue(IgniteTestUtils.waitForCondition(
+                () -> loza.startedGroups().stream().anyMatch(grpName -> grpName.contains(tableId)),
+                TimeUnit.SECONDS.toMillis(10)
+        ));
+
+        // Check the data rebalanced correctly.
+        checkTableWithData(restartingNode, TABLE_NAME);
+    }
+
+    /**
+     * Restarts multiple nodes so the majority is lost.
+     */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17586")
+    @Test
+    public void inMemoryNodeRestartNoMajority(TestInfo testInfo) throws Exception {
+        // Start three nodes, the first one is going to be CMG and MetaStorage leader.
+        IgniteImpl ignite0 = startNode(testInfo, 0);
+        startNode(testInfo, 1);
+        startNode(testInfo, 2);
+
+        // Create a table with replica on every node.
+        createTableWithData(ignite0, TABLE_NAME, 3, 1);
+
+        String tableName = SCHEMA_PREFIX + TABLE_NAME;
+
+        TableImpl table = (TableImpl) ignite0.tables().table(tableName);
+        String tableId = table.tableId().toString();
+
+        // Lose the majority.
+        stopNode(1);
+        stopNode(2);
+
+        IgniteImpl restartingNode = startNode(testInfo, 1);
+
+        Loza loza = restartingNode.raftManager();
+
+        // Check that it restarts.
+        assertTrue(IgniteTestUtils.waitForCondition(
+                () -> loza.startedGroups().stream().anyMatch(grpName -> grpName.contains(tableId)),
+                TimeUnit.SECONDS.toMillis(10)
+        ));
+
+        // Check the data rebalanced correctly.
+        checkTableWithData(restartingNode, TABLE_NAME);
+    }
+
+    /**
+     * Restarts all the nodes with the partition.
+     */
+    @Test
+    public void inMemoryNodeFullPartitionRestart(TestInfo testInfo) throws Exception {
+        // Start three nodes, the first one is going to be CMG and MetaStorage leader.
+        IgniteImpl ignite0 = startNode(testInfo, 0);
+        startNode(testInfo, 1);
+        startNode(testInfo, 2);
+
+        // Create a table with replicas on every node.
+        createTableWithData(ignite0, TABLE_NAME, 3, 1);
+
+        String tableName = SCHEMA_PREFIX + TABLE_NAME;
+
+        TableImpl table = (TableImpl) ignite0.tables().table(tableName);
+        String tableId = table.tableId().toString();
+
+        stopNode(0);
+        stopNode(1);
+        stopNode(2);
+
+        startNode(testInfo, 0);
+        startNode(testInfo, 1);
+        startNode(testInfo, 2);
+
+        // Check that full partition restart happens.
+        for (int i = 0; i < 3; i++) {
+            Loza loza = ignite(i).raftManager();
+
+            assertTrue(IgniteTestUtils.waitForCondition(
+                    () -> loza.startedGroups().stream().anyMatch(grpName -> grpName.contains(tableId)),
+                    TimeUnit.SECONDS.toMillis(10)
+            ));
+        }
+    }
+
+    /**
+     * Checks the table exists and validates all data in it.
+     *
+     * @param ignite Ignite.
+     * @param name Table name.
+     */
+    private static void checkTableWithData(Ignite ignite, String name) {
+        Table table = ignite.tables().table(SCHEMA_PREFIX + name);
+
+        assertNotNull(table);
+
+        for (int i = 0; i < 100; i++) {
+            Tuple row = table.keyValueView().get(null, Tuple.create().set("id", i));
+
+            assertEquals(VALUE_PRODUCER.apply(i), row.stringValue("name"));
+        }
+    }
+
+    /**
+     * Creates a table and load data to it.
+     *
+     * @param ignite Ignite.
+     * @param name Table name.
+     * @param replicas Replica factor.
+     * @param partitions Partitions count.
+     */
+    private static void createTableWithData(Ignite ignite, String name, int replicas, int partitions) {
+        TableDefinition scmTbl1 = SchemaBuilders.tableBuilder("PUBLIC", name).columns(
+                SchemaBuilders.column("id", ColumnType.INT32).build(),
+                SchemaBuilders.column("name", ColumnType.string()).asNullable(true).build()
+        ).withPrimaryKey(
+                SchemaBuilders.primaryKey()
+                        .withColumns("id")
+                        .build()
+        ).build();
+
+        Table table = ignite.tables().createTable(
+                scmTbl1.canonicalName(),
+                tbl -> convert(scmTbl1, tbl).changeReplicas(replicas).changePartitions(partitions)
+                        .changeDataStorage(dsc -> dsc.convert(VolatilePageMemoryDataStorageChange.class))
+        );
+
+        for (int i = 0; i < 100; i++) {
+            Tuple key = Tuple.create().set("id", i);
+            Tuple val = Tuple.create().set("name", VALUE_PRODUCER.apply(i));
+
+            table.keyValueView().put(null, key, val);
+        }
+    }
+
+    private static IgniteImpl ignite(int idx) {
+        return (IgniteImpl) CLUSTER_NODES.get(idx);
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 347eb6493b..bbfe2ec788 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.storage.DataStorageModules;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableTxManagerImpl;
+import org.apache.ignite.internal.table.message.TableMessagesSerializationRegistryInitializer;
 import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
@@ -210,6 +211,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
         CmgMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
         RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
         TxMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+        TableMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
 
         var clusterLocalConfiguration = new ClusterLocalConfiguration(name, serializationRegistry);
 
@@ -270,6 +272,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
         SchemaManager schemaManager = new SchemaManager(registry, tblCfg);
 
         TableManager tableManager = new TableManager(
+                name,
                 registry,
                 tblCfg,
                 raftMgr,
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 7c805fd7ea..fc7f98adcb 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.storage.DataStorageModule;
 import org.apache.ignite.internal.storage.DataStorageModules;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.TableTxManagerImpl;
+import org.apache.ignite.internal.table.message.TableMessagesSerializationRegistryInitializer;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
@@ -244,6 +245,7 @@ public class IgniteImpl implements Ignite {
         SqlQueryMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
         TxMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
         ComputeMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+        TableMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
 
         var clusterLocalConfiguration = new ClusterLocalConfiguration(name, serializationRegistry);
 
@@ -330,6 +332,7 @@ public class IgniteImpl implements Ignite {
         volatileLogStorageFactoryCreator = new VolatileLogStorageFactoryCreator(workDir.resolve("volatile-log-spillout"));
 
         distributedTblMgr = new TableManager(
+                name,
                 registry,
                 clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY),
                 raftMgr,
@@ -701,6 +704,11 @@ public class IgniteImpl implements Ignite {
         return partitionsStore;
     }
 
+    @TestOnly
+    public Loza raftManager() {
+        return raftMgr;
+    }
+
     @TestOnly
     public ClusterNode node() {
         return clusterSvc.topologyService().localMember();
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index ef840a005c..417c22bba4 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -236,6 +236,9 @@ public class MockedStructuresTest extends IgniteAbstractTest {
     /** Inner initialisation. */
     @BeforeEach
     void before() throws Exception {
+        when(rm.messagingService()).thenReturn(mock(MessagingService.class));
+        when(rm.topologyService()).thenReturn(mock(TopologyService.class));
+
         revisionUpdater = (Function<Long, CompletableFuture<?>> function) -> {
             function.apply(0L).join();
 
@@ -773,6 +776,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
 
     private TableManager createTableManager() {
         TableManager tableManager = new TableManager(
+                "",
                 revisionUpdater,
                 tblsCfg,
                 rm,
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index e0e052432e..46ce848876 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -235,6 +235,11 @@
                         <artifactId>ignite-configuration-annotation-processor</artifactId>
                         <version>${project.version}</version>
                     </dependency>
+                    <dependency>
+                        <groupId>org.apache.ignite</groupId>
+                        <artifactId>ignite-network-annotation-processor</artifactId>
+                        <version>${project.version}</version>
+                    </dependency>
                 </dependencies>
                 <configuration>
                     <annotationProcessorPaths>
@@ -243,6 +248,11 @@
                             <artifactId>ignite-configuration-annotation-processor</artifactId>
                             <version>${project.version}</version>
                         </path>
+                        <path>
+                            <groupId>org.apache.ignite</groupId>
+                            <artifactId>ignite-network-annotation-processor</artifactId>
+                            <version>${project.version}</version>
+                        </path>
                     </annotationProcessorPaths>
                 </configuration>
             </plugin>
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b64e99d4d1..32bb02cf2f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -25,6 +25,7 @@ import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.ge
 import static org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.apache.ignite.internal.utils.RebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX;
 import static org.apache.ignite.internal.utils.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
 import static org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
 import static org.apache.ignite.internal.utils.RebalanceUtil.extractPartitionNumber;
@@ -36,8 +37,10 @@ import static org.apache.ignite.internal.utils.RebalanceUtil.updatePendingAssign
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -106,11 +109,17 @@ import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
 import org.apache.ignite.internal.table.distributed.storage.VersionedRowStore;
 import org.apache.ignite.internal.table.event.TableEvent;
 import org.apache.ignite.internal.table.event.TableEventParameters;
+import org.apache.ignite.internal.table.message.HasDataRequest;
+import org.apache.ignite.internal.table.message.HasDataRequestBuilder;
+import org.apache.ignite.internal.table.message.HasDataResponse;
+import org.apache.ignite.internal.table.message.TableMessageGroup;
+import org.apache.ignite.internal.table.message.TableMessagesFactory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteObjectName;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.utils.RebalanceUtil;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteException;
@@ -122,6 +131,7 @@ import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.lang.TableAlreadyExistsException;
 import org.apache.ignite.lang.TableNotFoundException;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.raft.client.Peer;
@@ -146,6 +156,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     /** Timeout to complete the tablesByIdVv on revision update. */
     private static final long TABLES_COMPLETE_TIMEOUT = 120;
 
+    private static final long QUERY_DATA_NODES_COUNT_TIMEOUT = TimeUnit.SECONDS.toMillis(3);
+
     /** The logger. */
     private static final IgniteLogger LOG = Loggers.forClass(TableManager.class);
 
@@ -218,9 +230,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     /** Rebalance scheduler pool size. */
     private static final int REBALANCE_SCHEDULER_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
 
+    private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory();
+
     /**
      * Creates a new table manager.
      *
+     * @param nodeName Node name.
      * @param registry Registry for versioned values.
      * @param tablesCfg Tables configuration.
      * @param raftMgr Raft manager.
@@ -232,6 +247,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      *                                         tables.
      */
     public TableManager(
+            String nodeName,
             Consumer<Function<Long, CompletableFuture<?>>> registry,
             TablesConfiguration tablesCfg,
             Loza raftMgr,
@@ -312,7 +328,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         });
 
         rebalanceScheduler = new ScheduledThreadPoolExecutor(REBALANCE_SCHEDULER_POOL_SIZE,
-                new NamedThreadFactory("rebalance-scheduler", LOG));
+                NamedThreadFactory.create(nodeName, "rebalance-scheduler", LOG));
 
         ioExecutor = new ThreadPoolExecutor(
                 Math.min(Utils.cpus() * 3, 25),
@@ -320,7 +336,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 100,
                 TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<>(),
-                new NamedThreadFactory("tableManager-io", LOG));
+                NamedThreadFactory.create(nodeName, "tableManager-io", LOG));
     }
 
     /** {@inheritDoc} */
@@ -365,6 +381,47 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 return completedFuture(false);
             }
         });
+
+        addMessageHandler(raftMgr.messagingService());
+    }
+
+    /**
+     * Adds a table manager message handler.
+     *
+     * @param messagingService Messaging service.
+     */
+    private void addMessageHandler(MessagingService messagingService) {
+        messagingService.addMessageHandler(TableMessageGroup.class, (message, senderAddr, correlationId) -> {
+            if (message instanceof HasDataRequest) {
+                // This message queries if a node has any data for a specific partition of a table
+                assert correlationId != null;
+
+                HasDataRequest msg = (HasDataRequest) message;
+
+                UUID tableId = msg.tableId();
+                int partitionId = msg.partitionId();
+
+                boolean contains = false;
+
+                TableImpl table = tablesByIdVv.latest().get(tableId);
+
+                if (table != null) {
+                    MvTableStorage storage = table.internalTable().storage();
+
+                    MvPartitionStorage mvPartition = storage.getMvPartition(partitionId);
+
+                    // If node's recovery process is incomplete (no partition storage), then we consider this node's
+                    // partition storage empty.
+                    if (mvPartition != null) {
+                        // If applied index of a storage is greater than 0,
+                        // then there is data.
+                        contains = mvPartition.lastAppliedIndex() > 0;
+                    }
+                }
+
+                messagingService.respond(senderAddr, TABLE_MESSAGES_FACTORY.hasDataResponse().result(contains).build(), correlationId);
+            }
+        });
     }
 
     /**
@@ -423,7 +480,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                     ctx.storageRevision(),
                     ctx.oldValue().name(),
                     ((ExtendedTableView) ctx.oldValue()).id(),
-                    (List<List<ClusterNode>>) ByteUtils.fromBytes(((ExtendedTableView) ctx.oldValue()).assignments())
+                    ByteUtils.fromBytes(((ExtendedTableView) ctx.oldValue()).assignments())
             );
         } finally {
             busyLock.leaveBusy();
@@ -459,9 +516,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 for (int i = 0; i < partCnt; i++) {
                     String partId = partitionRaftGroupName(((ExtendedTableConfiguration) tblCfg).id().value(), i);
 
-                    futures[i] = updatePendingAssignmentsKeys(
-                            tblCfg.name().value(), partId, baselineMgr.nodes(),
-                            partCnt, newReplicas,
+                    futures[i] = updatePendingAssignmentsKeys(tblCfg.name().value(), partId, baselineMgr.nodes(), newReplicas,
                             replicasCtx.storageRevision(), metaStorageMgr, i);
                 }
 
@@ -506,10 +561,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         long causalityToken = assignmentsCtx.storageRevision();
 
-        List<List<ClusterNode>> oldAssignments = assignmentsCtx.oldValue() == null ? null :
-                (List<List<ClusterNode>>) ByteUtils.fromBytes(assignmentsCtx.oldValue());
+        List<Set<ClusterNode>> oldAssignments = assignmentsCtx.oldValue() == null ? null : ByteUtils.fromBytes(assignmentsCtx.oldValue());
 
-        List<List<ClusterNode>> newAssignments = (List<List<ClusterNode>>) ByteUtils.fromBytes(assignmentsCtx.newValue());
+        List<Set<ClusterNode>> newAssignments = ByteUtils.fromBytes(assignmentsCtx.newValue());
 
         // Empty assignments might be a valid case if tables are created from within cluster init HOCON
         // configuration, which is not supported now.
@@ -519,16 +573,21 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];
 
+        List<Set<ClusterNode>> assignmentsLatest = ByteUtils.fromBytes(directProxy(tblCfg.assignments()).value());
+
+        TopologyService topologyService = raftMgr.topologyService();
+        ClusterNode localMember = topologyService.localMember();
+
         // TODO: IGNITE-15554 Add logic for assignment recalculation in case of partitions or replicas changes
         // TODO: Until IGNITE-15554 is implemented it's safe to iterate over partitions and replicas cause there will
         // TODO: be exact same amount of partitions and replicas for both old and new assignments
         for (int i = 0; i < partitions; i++) {
             int partId = i;
 
-            List<ClusterNode> oldPartAssignment = oldAssignments == null ? Collections.emptyList() :
+            Set<ClusterNode> oldPartAssignment = oldAssignments == null ? Collections.emptySet() :
                     oldAssignments.get(partId);
 
-            List<ClusterNode> newPartAssignment = newAssignments.get(partId);
+            Set<ClusterNode> newPartAssignment = newAssignments.get(partId);
 
             // Create new raft nodes according to new assignments.
             tablesByIdVv.update(causalityToken, (tablesById, e) -> {
@@ -538,13 +597,16 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                 InternalTable internalTbl = tablesById.get(tblId).internalTable();
 
+                MvTableStorage storage = internalTbl.storage();
+                boolean isInMemory = storage.isVolatile();
+
                 // TODO: IGNITE-17197 Remove assert after the ticket is resolved.
                 assert internalTbl.storage() instanceof MvTableStorage :
                         "Only multi version storages are supported. Current storage is a " + internalTbl.storage().getClass().getName();
 
                 // start new nodes, only if it is table creation
                 // other cases will be covered by rebalance logic
-                List<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ? newPartAssignment : Collections.emptyList();
+                Set<ClusterNode> nodes = (oldPartAssignment.isEmpty()) ? newPartAssignment : Collections.emptySet();
 
                 String grpId = partitionRaftGroupName(tblId, partId);
 
@@ -552,33 +614,72 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                 if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
                     startGroupFut = CompletableFuture
-                            .supplyAsync(
-                                    () -> internalTbl.storage().getOrCreateMvPartition(partId), ioExecutor)
+                            .supplyAsync(() -> internalTbl.storage().getOrCreateMvPartition(partId), ioExecutor)
                             .thenComposeAsync((partitionStorage) -> {
-                                RaftGroupOptions groupOptions = groupOptionsForPartition(internalTbl, tblCfg, partitionStorage,
-                                        newPartAssignment);
-
-                                try {
-                                    raftMgr.startRaftGroupNode(
-                                            grpId,
-                                            newPartAssignment,
-                                            new PartitionListener(tblId, new VersionedRowStore(partitionStorage, txManager)),
-                                            new RebalanceRaftGroupEventsListener(
-                                                    metaStorageMgr,
-                                                    tablesCfg.tables().get(tablesById.get(tblId).name()),
-                                                    grpId,
-                                                    partId,
-                                                    busyLock,
-                                                    movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
-                                                    rebalanceScheduler
-                                            ),
-                                            groupOptions
-                                    );
-
-                                    return CompletableFuture.completedFuture(null);
-                                } catch (NodeStoppingException ex) {
-                                    return CompletableFuture.failedFuture(ex);
+                                boolean hasData = partitionStorage.lastAppliedIndex() > 0;
+
+                                CompletableFuture<Boolean> fut;
+
+                                if (isInMemory || !hasData) {
+                                    Set<ClusterNode> partAssignments = assignmentsLatest.get(partId);
+
+                                    fut = queryDataNodesCount(tblId, partId, partAssignments).thenApply(dataNodesCount -> {
+                                        boolean fullPartitionRestart = dataNodesCount == 0;
+
+                                        if (fullPartitionRestart) {
+                                            return true;
+                                        }
+
+                                        boolean majorityAvailable = dataNodesCount >= (partAssignments.size() / 2) + 1;
+
+                                        if (majorityAvailable) {
+                                            RebalanceUtil.startPeerRemoval(partitionRaftGroupName(tblId, partId), localMember,
+                                                    metaStorageMgr);
+
+                                            return false;
+                                        } else {
+                                            // No majority and not a full partition restart - need to restart nodes
+                                            // with current partition.
+                                            String msg = "Unable to start partition " + partId + ". Majority not available.";
+
+                                            throw new IgniteInternalException(msg);
+                                        }
+                                    });
+                                } else {
+                                    fut = CompletableFuture.completedFuture(true);
                                 }
+
+                                return fut.thenComposeAsync(startGroup -> {
+                                    if (!startGroup) {
+                                        return CompletableFuture.completedFuture(null);
+                                    }
+
+                                    RaftGroupOptions groupOptions = groupOptionsForPartition(internalTbl, tblCfg, partitionStorage,
+                                            newPartAssignment);
+
+                                    try {
+                                        raftMgr.startRaftGroupNode(
+                                                grpId,
+                                                newPartAssignment,
+                                                new PartitionListener(tblId, new VersionedRowStore(partitionStorage, txManager)),
+                                                new RebalanceRaftGroupEventsListener(
+                                                        metaStorageMgr,
+                                                        tablesCfg.tables().get(tablesById.get(tblId).name()),
+                                                        grpId,
+                                                        partId,
+                                                        busyLock,
+                                                        movePartition(() -> internalTbl.partitionRaftGroupService(partId)),
+                                                        this::calculateAssignments,
+                                                        rebalanceScheduler
+                                                ),
+                                                groupOptions
+                                        );
+
+                                        return CompletableFuture.completedFuture(null);
+                                    } catch (NodeStoppingException ex) {
+                                        return CompletableFuture.failedFuture(ex);
+                                    }
+                                }, ioExecutor);
                             }, ioExecutor);
                 }
 
@@ -606,11 +707,37 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         CompletableFuture.allOf(futures).join();
     }
 
+    /**
+     * Calculates the quantity of the data nodes for the partition of the table.
+     *
+     * @param tblId Table id.
+     * @param partId Partition id.
+     * @param partAssignments Partition assignments.
+     * @return A future that will hold the quantity of data nodes.
+     */
+    private CompletableFuture<Long> queryDataNodesCount(UUID tblId, int partId, Set<ClusterNode> partAssignments) {
+        HasDataRequestBuilder requestBuilder = TABLE_MESSAGES_FACTORY.hasDataRequest().tableId(tblId).partitionId(partId);
+
+        //noinspection unchecked
+        CompletableFuture<Boolean>[] requestFutures = partAssignments.stream().map(node -> {
+            HasDataRequest request = requestBuilder.build();
+
+            return raftMgr.messagingService().invoke(node, request, QUERY_DATA_NODES_COUNT_TIMEOUT).thenApply(response -> {
+                assert response instanceof HasDataResponse : response;
+
+                return ((HasDataResponse) response).result();
+            }).exceptionally(unused -> false);
+        }).toArray(CompletableFuture[]::new);
+
+        return CompletableFuture.allOf(requestFutures)
+                .thenApply(unused -> Arrays.stream(requestFutures).filter(CompletableFuture::join).count());
+    }
+
     private RaftGroupOptions groupOptionsForPartition(
             InternalTable internalTbl,
             ExtendedTableConfiguration tableConfig,
             MvPartitionStorage partitionStorage,
-            List<ClusterNode> peers
+            Set<ClusterNode> peers
     ) {
         RaftGroupOptions raftGroupOptions;
 
@@ -767,7 +894,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @param tblId          Table id.
      * @param assignment     Affinity assignment.
      */
-    private void dropTableLocally(long causalityToken, String name, UUID tblId, List<List<ClusterNode>> assignment) {
+    private void dropTableLocally(long causalityToken, String name, UUID tblId, List<Set<ClusterNode>> assignment) {
         try {
             int partitions = assignment.size();
 
@@ -805,6 +932,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
         }
     }
 
+    private Set<ClusterNode> calculateAssignments(TableConfiguration tableCfg, int partNum) {
+        return AffinityUtils.calculateAssignmentForPartition(baselineMgr.nodes(), partNum, tableCfg.value().replicas());
+    }
+
     /**
      * Compounds a RAFT group unique name.
      *
@@ -881,7 +1012,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                         extConfCh.changeAssignments(ByteUtils.toBytes(AffinityUtils.calculateAssignments(
                                         baselineMgr.nodes(),
                                         tableChange.partitions(),
-                                        tableChange.replicas())))
+                                        tableChange.replicas(),
+                                        HashSet::new)))
                                 // Table schema preparation.
                                 .changeSchemas(schemasCh -> schemasCh.create(
                                         String.valueOf(INITIAL_SCHEMA_VERSION),
@@ -1420,7 +1552,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                     String partId = partitionRaftGroupName(tblId, part);
 
                     // Assignments of the pending rebalance that we received through the meta storage watch mechanism.
-                    List<ClusterNode> newPeers = ((List<ClusterNode>) ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value()));
+                    Set<ClusterNode> newPeers = ByteUtils.fromBytes(pendingAssignmentsWatchEvent.value());
 
                     var pendingAssignments = metaStorageMgr.get(pendingPartAssignmentsKey(partId)).join();
 
@@ -1440,12 +1572,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                     byte[] stableAssignments = metaStorageMgr.get(stablePartAssignmentsKey(partId),
                             pendingAssignmentsWatchEvent.revision()).join().value();
 
-                    List<ClusterNode> assignments = stableAssignments == null
+                    Set<ClusterNode> assignments = stableAssignments == null
                             // This is for the case when the first rebalance occurs.
-                            ? ((List<List<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(part)
-                            : (List<ClusterNode>) ByteUtils.fromBytes(stableAssignments);
+                            ? ((List<Set<ClusterNode>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(part)
+                            : ByteUtils.fromBytes(stableAssignments);
 
-                    ClusterNode localMember = raftMgr.server().clusterService().topologyService().localMember();
+                    ClusterNode localMember = raftMgr.topologyService().localMember();
 
                     var deltaPeers = newPeers.stream()
                             .filter(p -> !assignments.contains(p))
@@ -1478,6 +1610,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                                     part,
                                     busyLock,
                                     movePartition(() -> tbl.internalTable().partitionRaftGroupService(part)),
+                                    TableManager.this::calculateAssignments,
                                     rebalanceScheduler
                             );
 
@@ -1547,17 +1680,17 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                     String partId = partitionRaftGroupName(tblId, part);
 
-                    var stableAssignments = (List<ClusterNode>) ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
+                    Set<ClusterNode> stableAssignments = ByteUtils.fromBytes(stableAssignmentsWatchEvent.value());
 
                     byte[] pendingFromMetastorage = metaStorageMgr.get(pendingPartAssignmentsKey(partId),
                             stableAssignmentsWatchEvent.revision()).join().value();
 
-                    List<ClusterNode> pendingAssignments = pendingFromMetastorage == null
-                            ? Collections.emptyList()
-                            : (List<ClusterNode>) ByteUtils.fromBytes(pendingFromMetastorage);
+                    Set<ClusterNode> pendingAssignments = pendingFromMetastorage == null
+                            ? Collections.emptySet()
+                            : ByteUtils.fromBytes(pendingFromMetastorage);
 
                     try {
-                        ClusterNode localMember = raftMgr.server().clusterService().topologyService().localMember();
+                        ClusterNode localMember = raftMgr.topologyService().localMember();
 
                         if (!stableAssignments.contains(localMember) && !pendingAssignments.contains(localMember)) {
                             raftMgr.stopRaftGroup(partId);
@@ -1577,6 +1710,38 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 LOG.warn("Unable to process stable assignments event", e);
             }
         });
+
+        metaStorageMgr.registerWatchByPrefix(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX), new WatchListener() {
+            @Override
+            public boolean onUpdate(@NotNull WatchEvent evt) {
+                ByteArray key = evt.entryEvent().newEntry().key();
+
+                int partitionNumber = extractPartitionNumber(key);
+                UUID tblId = extractTableId(key, ASSIGNMENTS_SWITCH_REDUCE_PREFIX);
+
+                String partitionId = partitionRaftGroupName(tblId, partitionNumber);
+
+                TableImpl tbl = tablesByIdVv.latest().get(tblId);
+
+                TableConfiguration tblCfg = tablesCfg.tables().get(tbl.name());
+
+                RebalanceUtil.handleReduceChanged(
+                        metaStorageMgr,
+                        baselineMgr.nodes(),
+                        tblCfg.value().replicas(),
+                        partitionNumber,
+                        partitionId,
+                        evt
+                );
+
+                return true;
+            }
+
+            @Override
+            public void onError(@NotNull Throwable e) {
+                LOG.warn("Unable to process switch reduce event", e);
+            }
+        });
     }
 
     /**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
index e2e8c2f871..54179102d4 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java
@@ -17,18 +17,25 @@
 
 package org.apache.ignite.internal.table.distributed.raft;
 
+import static org.apache.ignite.internal.metastorage.client.CompoundCondition.and;
 import static org.apache.ignite.internal.metastorage.client.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.client.Conditions.revision;
 import static org.apache.ignite.internal.metastorage.client.Operations.ops;
 import static org.apache.ignite.internal.metastorage.client.Operations.put;
 import static org.apache.ignite.internal.metastorage.client.Operations.remove;
+import static org.apache.ignite.internal.utils.RebalanceUtil.intersect;
 import static org.apache.ignite.internal.utils.RebalanceUtil.pendingPartAssignmentsKey;
 import static org.apache.ignite.internal.utils.RebalanceUtil.plannedPartAssignmentsKey;
+import static org.apache.ignite.internal.utils.RebalanceUtil.readClusterNodes;
+import static org.apache.ignite.internal.utils.RebalanceUtil.resolveClusterNodes;
 import static org.apache.ignite.internal.utils.RebalanceUtil.stablePartAssignmentsKey;
+import static org.apache.ignite.internal.utils.RebalanceUtil.subtract;
+import static org.apache.ignite.internal.utils.RebalanceUtil.switchAppendKey;
+import static org.apache.ignite.internal.utils.RebalanceUtil.switchReduceKey;
+import static org.apache.ignite.internal.utils.RebalanceUtil.union;
 import static org.apache.ignite.raft.jraft.core.NodeImpl.LEADER_STEPPED_DOWN;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,6 +43,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
@@ -43,13 +51,15 @@ import org.apache.ignite.internal.configuration.schema.ExtendedTableChange;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.Condition;
 import org.apache.ignite.internal.metastorage.client.Entry;
 import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.SimpleCondition;
+import org.apache.ignite.internal.metastorage.client.Update;
 import org.apache.ignite.internal.raft.server.RaftGroupEventsListener;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.client.Peer;
@@ -65,6 +75,36 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
     /** Ignite logger. */
     private static final IgniteLogger LOG = Loggers.forClass(RebalanceRaftGroupEventsListener.class);
 
+    /** Number of retrying of the current rebalance in case of errors. */
+    private static final int REBALANCE_RETRY_THRESHOLD = 10;
+
+    /** Delay between unsuccessful trial of a rebalance and a new trial, ms. */
+    public static final int REBALANCE_RETRY_DELAY_MS = 200;
+
+    /** Success code for the MetaStorage switch append assignments change. */
+    private static final int SWITCH_APPEND_SUCCESS = 1;
+
+    /** Success code for the MetaStorage switch reduce assignments change. */
+    private static final int SWITCH_REDUCE_SUCCESS = 2;
+
+    /** Success code for the MetaStorage pending rebalance change. */
+    private static final int SCHEDULE_PENDING_REBALANCE_SUCCESS = 3;
+
+    /** Success code for the MetaStorage stable assignments change. */
+    private static final int FINISH_REBALANCE_SUCCESS = 4;
+
+    /** Failure code for the MetaStorage switch append assignments change. */
+    private static final int SWITCH_APPEND_FAIL = -SWITCH_APPEND_SUCCESS;
+
+    /** Failure code for the MetaStorage switch reduce assignments change. */
+    private static final int SWITCH_REDUCE_FAIL = -SWITCH_REDUCE_SUCCESS;
+
+    /** Failure code for the MetaStorage pending rebalance change. */
+    private static final int SCHEDULE_PENDING_REBALANCE_FAIL = -SCHEDULE_PENDING_REBALANCE_SUCCESS;
+
+    /** Failure code for the MetaStorage stable assignments change. */
+    private static final int FINISH_REBALANCE_FAIL = -FINISH_REBALANCE_SUCCESS;
+
     /** Meta storage manager. */
     private final MetaStorageManager metaStorageMgr;
 
@@ -89,11 +129,8 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
     /** Attempts to retry the current rebalance in case of errors. */
     private final AtomicInteger rebalanceAttempts =  new AtomicInteger(0);
 
-    /** Number of retrying of the current rebalance in case of errors. */
-    private static final int REBALANCE_RETRY_THRESHOLD = 10;
-
-    /** Delay between unsuccessful trial of a rebalance and a new trial, ms. */
-    public static final int REBALANCE_RETRY_DELAY_MS = 200;
+    /** Function that calculates assignments for table's partition. */
+    private final BiFunction<TableConfiguration, Integer, Set<ClusterNode>> calculateAssignmentsFn;
 
     /**
      * Constructs new listener.
@@ -102,6 +139,9 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
      * @param tblConfiguration Table configuration.
      * @param partId Partition id.
      * @param partNum Partition number.
+     * @param busyLock Busy lock.
+     * @param movePartitionFn Function that moves partition between nodes.
+     * @param calculateAssignmentsFn Function that calculates assignments for table's partition.
      * @param rebalanceScheduler Executor for scheduling rebalance retries.
      */
     public RebalanceRaftGroupEventsListener(
@@ -111,6 +151,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
             int partNum,
             IgniteSpinBusyLock busyLock,
             BiFunction<List<Peer>, Long, CompletableFuture<Void>> movePartitionFn,
+            BiFunction<TableConfiguration, Integer, Set<ClusterNode>> calculateAssignmentsFn,
             ScheduledExecutorService rebalanceScheduler) {
         this.metaStorageMgr = metaStorageMgr;
         this.tblConfiguration = tblConfiguration;
@@ -118,6 +159,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
         this.partNum = partNum;
         this.busyLock = busyLock;
         this.movePartitionFn = movePartitionFn;
+        this.calculateAssignmentsFn = calculateAssignmentsFn;
         this.rebalanceScheduler = rebalanceScheduler;
     }
 
@@ -140,7 +182,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
                     Entry pendingEntry = metaStorageMgr.get(pendingPartAssignmentsKey(partId)).get();
 
                     if (!pendingEntry.empty()) {
-                        List<ClusterNode> pendingNodes = (List<ClusterNode>) ByteUtils.fromBytes(pendingEntry.value());
+                        Set<ClusterNode> pendingNodes = ByteUtils.fromBytes(pendingEntry.value());
 
                         LOG.info("New leader elected. Going to reconfigure peers [group={}, partition={}, table={}, peers={}]",
                                 partId, partNum, tblConfiguration.name().value(), pendingNodes);
@@ -252,92 +294,193 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
      */
     private void doOnNewPeersConfigurationApplied(List<PeerId> peers) {
         try {
-            Map<ByteArray, Entry> keys = metaStorageMgr.getAll(
+            ByteArray pendingPartAssignmentsKey = pendingPartAssignmentsKey(partId);
+            ByteArray stablePartAssignmentsKey = stablePartAssignmentsKey(partId);
+            ByteArray plannedPartAssignmentsKey = plannedPartAssignmentsKey(partId);
+            ByteArray switchReduceKey = switchReduceKey(partId);
+            ByteArray switchAppendKey = switchAppendKey(partId);
+
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
+            Map<ByteArray, Entry> values = metaStorageMgr.getAll(
                     Set.of(
-                            plannedPartAssignmentsKey(partId),
-                            pendingPartAssignmentsKey(partId),
-                            stablePartAssignmentsKey(partId))).get();
+                            plannedPartAssignmentsKey,
+                            pendingPartAssignmentsKey,
+                            stablePartAssignmentsKey,
+                            switchReduceKey,
+                            switchAppendKey
+                    )
+            ).get();
 
-            Entry plannedEntry = keys.get(plannedPartAssignmentsKey(partId));
+            Entry stableEntry = values.get(stablePartAssignmentsKey);
+            Entry pendingEntry = values.get(pendingPartAssignmentsKey);
+            Entry plannedEntry = values.get(plannedPartAssignmentsKey);
+            Entry switchReduceEntry = values.get(switchReduceKey);
+            Entry switchAppendEntry = values.get(switchAppendKey);
 
-            List<ClusterNode> appliedPeers = resolveClusterNodes(peers,
-                    keys.get(pendingPartAssignmentsKey(partId)).value(), keys.get(stablePartAssignmentsKey(partId)).value());
+            Set<ClusterNode> calculatedAssignments = calculateAssignmentsFn.apply(tblConfiguration, partNum);
 
-            tblConfiguration.change(ch -> {
-                List<List<ClusterNode>> assignments =
-                        (List<List<ClusterNode>>) ByteUtils.fromBytes(((ExtendedTableChange) ch).assignments());
-                assignments.set(partNum, appliedPeers);
-                ((ExtendedTableChange) ch).changeAssignments(ByteUtils.toBytes(assignments));
-            }).get();
-
-            if (plannedEntry.value() != null) {
-                if (!metaStorageMgr.invoke(If.iif(
-                        revision(plannedPartAssignmentsKey(partId)).eq(plannedEntry.revision()),
-                        ops(
-                                put(stablePartAssignmentsKey(partId), ByteUtils.toBytes(appliedPeers)),
-                                put(pendingPartAssignmentsKey(partId), plannedEntry.value()),
-                                remove(plannedPartAssignmentsKey(partId)))
-                                .yield(true),
-                        ops().yield(false))).get().getAsBoolean()) {
-                    LOG.info("Planned key changed while trying to update rebalance information. Going to retry"
-                                    + " [key={}, partition={}, table={}, appliedPeers={}]",
-                            plannedPartAssignmentsKey(partId), partNum, tblConfiguration.name(), appliedPeers);
+            Set<ClusterNode> stable = resolveClusterNodes(peers, pendingEntry.value(), stableEntry.value());
 
-                    doOnNewPeersConfigurationApplied(peers);
-                }
+            Set<ClusterNode> retrievedSwitchReduce = readClusterNodes(switchReduceEntry);
+            Set<ClusterNode> retrievedSwitchAppend = readClusterNodes(switchAppendEntry);
+            Set<ClusterNode> retrievedStable = readClusterNodes(stableEntry);
 
-                LOG.info("Rebalance finished. Going to schedule next rebalance [partition={}, table={}, appliedPeers={}, plannedPeers={}]",
-                        partNum, tblConfiguration.name().value(), appliedPeers, ByteUtils.fromBytes(plannedEntry.value()));
-            } else {
-                if (!metaStorageMgr.invoke(If.iif(
-                        notExists(plannedPartAssignmentsKey(partId)),
-                        ops(put(stablePartAssignmentsKey(partId), ByteUtils.toBytes(appliedPeers)),
-                                remove(pendingPartAssignmentsKey(partId))).yield(true),
-                        ops().yield(false))).get().getAsBoolean()) {
-                    LOG.info("Planned key changed while trying to update rebalance information. Going to retry"
-                                    + " [key={}, partition={}, table={}, appliedPeers={}]",
-                            plannedPartAssignmentsKey(partId), partNum, tblConfiguration.name(), appliedPeers);
+            // Were reduced
+            Set<ClusterNode> reducedNodes = subtract(retrievedSwitchReduce, stable);
 
-                    doOnNewPeersConfigurationApplied(peers);
-                }
+            // Were added
+            Set<ClusterNode> addedNodes = subtract(stable, retrievedStable);
 
-                LOG.info("Rebalance finished [partition={}, table={}, appliedPeers={}]",
-                        partNum, tblConfiguration.name().value(), appliedPeers);
-            }
+            // For further reduction
+            Set<ClusterNode> calculatedSwitchReduce = subtract(retrievedSwitchReduce, reducedNodes);
 
-            rebalanceAttempts.set(0);
-        } catch (InterruptedException | ExecutionException e) {
-            // TODO: IGNITE-14693
-            LOG.warn("Unable to commit partition configuration to metastore [table = {}, partition = {}]",
-                    e, tblConfiguration.name(), partNum);
-        }
-    }
+            // For further addition
+            Set<ClusterNode> calculatedSwitchAppend = union(retrievedSwitchAppend, reducedNodes);
+            calculatedSwitchAppend = subtract(calculatedSwitchAppend, addedNodes);
+            calculatedSwitchAppend = intersect(calculatedAssignments, calculatedSwitchAppend);
 
-    private static List<ClusterNode> resolveClusterNodes(
-            List<PeerId> peers, byte[] pendingAssignments, byte[] stableAssignments) {
-        Map<NetworkAddress, ClusterNode> resolveRegistry = new HashMap<>();
+            Set<ClusterNode> calculatedPendingReduction = subtract(stable, retrievedSwitchReduce);
 
-        if (pendingAssignments != null) {
-            ((List<ClusterNode>) ByteUtils.fromBytes(pendingAssignments)).forEach(n -> resolveRegistry.put(n.address(), n));
-        }
+            Set<ClusterNode> calculatedPendingAddition = union(stable, reducedNodes);
+            calculatedPendingAddition = intersect(calculatedAssignments, calculatedPendingAddition);
 
-        if (stableAssignments != null) {
-            ((List<ClusterNode>) ByteUtils.fromBytes(stableAssignments)).forEach(n -> resolveRegistry.put(n.address(), n));
-        }
+            // eq(revision(assignments.stable), retrievedAssignmentsStable.revision)
+            SimpleCondition con1 = stableEntry.empty()
+                    ? notExists(stablePartAssignmentsKey) :
+                    revision(stablePartAssignmentsKey).eq(stableEntry.revision());
+
+            // eq(revision(assignments.pending), retrievedAssignmentsPending.revision)
+            SimpleCondition con2 = revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());
 
-        List<ClusterNode> resolvedNodes = new ArrayList<>(peers.size());
+            // eq(revision(assignments.switch.reduce), retrievedAssignmentsSwitchReduce.revision)
+            SimpleCondition con3 = switchReduceEntry.empty()
+                    ? notExists(switchReduceKey) : revision(switchReduceKey).eq(switchReduceEntry.revision());
 
-        for (PeerId p : peers) {
-            var addr = NetworkAddress.from(p.getEndpoint().getIp() + ":" + p.getEndpoint().getPort());
+            // eq(revision(assignments.switch.append), retrievedAssignmentsSwitchAppend.revision)
+            SimpleCondition con4 = switchAppendEntry.empty()
+                    ? notExists(switchAppendKey) : revision(switchAppendKey).eq(switchAppendEntry.revision());
 
-            if (resolveRegistry.containsKey(addr)) {
-                resolvedNodes.add(resolveRegistry.get(addr));
+            // All conditions combined with AND operator.
+            Condition retryPreconditions = and(con1, and(con2, and(con3, con4)));
+
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
+            tblConfiguration.change(ch -> {
+                List<Set<ClusterNode>> assignments = ByteUtils.fromBytes(((ExtendedTableChange) ch).assignments());
+                assignments.set(partNum, stable);
+                ((ExtendedTableChange) ch).changeAssignments(ByteUtils.toBytes(assignments));
+            }).get(10, TimeUnit.SECONDS);
+
+            Update successCase;
+            Update failCase;
+
+            byte[] stableByteArray = ByteUtils.toBytes(stable);
+            byte[] additionByteArray = ByteUtils.toBytes(calculatedPendingAddition);
+            byte[] reductionByteArray = ByteUtils.toBytes(calculatedPendingReduction);
+            byte[] switchReduceByteArray = ByteUtils.toBytes(calculatedSwitchReduce);
+            byte[] switchAppendByteArray = ByteUtils.toBytes(calculatedSwitchAppend);
+
+            if (!calculatedSwitchAppend.isEmpty()) {
+                successCase = ops(
+                        put(stablePartAssignmentsKey, stableByteArray),
+                        put(pendingPartAssignmentsKey, additionByteArray),
+                        put(switchReduceKey, switchReduceByteArray),
+                        put(switchAppendKey, switchAppendByteArray)
+                ).yield(SWITCH_APPEND_SUCCESS);
+                failCase = ops().yield(SWITCH_APPEND_FAIL);
+            } else if (!calculatedSwitchReduce.isEmpty()) {
+                successCase = ops(
+                        put(stablePartAssignmentsKey, stableByteArray),
+                        put(pendingPartAssignmentsKey, reductionByteArray),
+                        put(switchReduceKey, switchReduceByteArray),
+                        put(switchAppendKey, switchAppendByteArray)
+                ).yield(SWITCH_REDUCE_SUCCESS);
+                failCase = ops().yield(SWITCH_REDUCE_FAIL);
             } else {
-                throw new IgniteInternalException("Can't find appropriate cluster node for raft group peer: " + p);
+                Condition con5;
+                if (plannedEntry.value() != null) {
+                    // eq(revision(partition.assignments.planned), plannedEntry.revision)
+                    con5 = revision(plannedPartAssignmentsKey).eq(plannedEntry.revision());
+
+                    successCase = ops(
+                            put(stablePartAssignmentsKey, ByteUtils.toBytes(stable)),
+                            put(pendingPartAssignmentsKey, plannedEntry.value()),
+                            remove(plannedPartAssignmentsKey)
+                    ).yield(SCHEDULE_PENDING_REBALANCE_SUCCESS);
+
+                    failCase = ops().yield(SCHEDULE_PENDING_REBALANCE_FAIL);
+                } else {
+                    // notExists(partition.assignments.planned)
+                    con5 = notExists(plannedPartAssignmentsKey);
+
+                    successCase = ops(
+                            put(stablePartAssignmentsKey, ByteUtils.toBytes(stable)),
+                            remove(pendingPartAssignmentsKey)
+                    ).yield(FINISH_REBALANCE_SUCCESS);
+
+                    failCase = ops().yield(FINISH_REBALANCE_FAIL);
+                }
+
+                retryPreconditions = and(retryPreconditions, con5);
             }
-        }
 
-        return resolvedNodes;
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-17592 Remove synchronous wait
+            int res = metaStorageMgr.invoke(If.iif(retryPreconditions, successCase, failCase)).get().getAsInt();
+
+            if (res < 0) {
+                switch (res) {
+                    case SWITCH_APPEND_FAIL:
+                        LOG.info("Rebalance keys changed while trying to update rebalance pending addition information. Going to retry"
+                                + " [partition={}, table={}, appliedPeers={}]", partNum, tblConfiguration.name(), stable);
+                        break;
+                    case SWITCH_REDUCE_FAIL:
+                        LOG.info("Rebalance keys changed while trying to update rebalance pending reduce information. Going to retry"
+                                + " [partition={}, table={}, appliedPeers={}]", partNum, tblConfiguration.name(), stable);
+                        break;
+                    case SCHEDULE_PENDING_REBALANCE_FAIL:
+                    case FINISH_REBALANCE_FAIL:
+                        LOG.info("Rebalance keys changed while trying to update rebalance information. Going to retry"
+                                + " [partition={}, table={}, appliedPeers={}]", partNum, tblConfiguration.name(), stable);
+                        break;
+                    default:
+                        assert false : res;
+                        break;
+                }
+
+                doOnNewPeersConfigurationApplied(peers);
+                return;
+            }
+
+            switch (res) {
+                case SWITCH_APPEND_SUCCESS:
+                    LOG.info("Rebalance finished. Going to schedule next rebalance with addition"
+                            + " [partition={}, table={}, appliedPeers={}, plannedPeers={}]",
+                            partNum, tblConfiguration.name().value(), stable, calculatedPendingAddition);
+                    break;
+                case SWITCH_REDUCE_SUCCESS:
+                    LOG.info("Rebalance finished. Going to schedule next rebalance with reduction"
+                            + " [partition={}, table={}, appliedPeers={}, plannedPeers={}]",
+                            partNum, tblConfiguration.name().value(), stable, calculatedPendingReduction);
+                    break;
+                case SCHEDULE_PENDING_REBALANCE_SUCCESS:
+                    LOG.info("Rebalance finished. Going to schedule next rebalance [partition={}, table={}, appliedPeers={}, "
+                            + "plannedPeers={}]",
+                            partNum, tblConfiguration.name().value(), stable, ByteUtils.fromBytes(plannedEntry.value()));
+                    break;
+                case FINISH_REBALANCE_SUCCESS:
+                    LOG.info("Rebalance finished [partition={}, table={}, appliedPeers={}]",
+                            partNum, tblConfiguration.name().value(), stable);
+                    break;
+                default:
+                    assert false : res;
+                    break;
+            }
+
+            rebalanceAttempts.set(0);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            // TODO: IGNITE-14693
+            LOG.warn("Unable to commit partition configuration to metastore [table = {}, partition = {}]",
+                    e, tblConfiguration.name(), partNum);
+        }
     }
 
     /**
@@ -346,7 +489,7 @@ public class RebalanceRaftGroupEventsListener implements RaftGroupEventsListener
      * @param nodes List of cluster nodes to transform.
      * @return List of transformed peers.
      */
-    private static List<Peer> clusterNodesToPeers(List<ClusterNode> nodes) {
+    private static List<Peer> clusterNodesToPeers(Set<ClusterNode> nodes) {
         List<Peer> peers = new ArrayList<>(nodes.size());
 
         for (ClusterNode node : nodes) {
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java b/modules/table/src/main/java/org/apache/ignite/internal/table/message/HasDataRequest.java
similarity index 58%
copy from modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/message/HasDataRequest.java
index 8f684ba0c2..913319b97a 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/message/HasDataRequest.java
@@ -15,24 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.compute;
+package org.apache.ignite.internal.table.message;
 
-import org.apache.ignite.internal.compute.message.ExecuteRequest;
-import org.apache.ignite.internal.compute.message.ExecuteResponse;
-import org.apache.ignite.network.annotations.MessageGroup;
+import java.util.UUID;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Message types for the Compute module.
+ * A message that queries a node whether it has data for the partition of a table.
  */
-@MessageGroup(groupName = "ComputeMessages", groupType = 6)
-public class ComputeMessageTypes {
-    /**
-     * Type for {@link ExecuteRequest}.
-     */
-    public static final short EXECUTE_REQUEST = 0;
+@Transferable(TableMessageGroup.HAS_DATA_REQUEST)
+public interface HasDataRequest extends NetworkMessage {
+    /** ID of the table. */
+    UUID tableId();
 
-    /**
-     * Type for {@link ExecuteResponse}.
-     */
-    public static final short EXECUTE_RESPONSE = 1;
+    /** ID of the partition. */
+    int partitionId();
 }
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java b/modules/table/src/main/java/org/apache/ignite/internal/table/message/HasDataResponse.java
similarity index 58%
copy from modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/message/HasDataResponse.java
index 8f684ba0c2..5ecb678c35 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/message/HasDataResponse.java
@@ -15,24 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.compute;
+package org.apache.ignite.internal.table.message;
 
-import org.apache.ignite.internal.compute.message.ExecuteRequest;
-import org.apache.ignite.internal.compute.message.ExecuteResponse;
-import org.apache.ignite.network.annotations.MessageGroup;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Message types for the Compute module.
+ * A response to the {@link HasDataRequest}.
  */
-@MessageGroup(groupName = "ComputeMessages", groupType = 6)
-public class ComputeMessageTypes {
-    /**
-     * Type for {@link ExecuteRequest}.
-     */
-    public static final short EXECUTE_REQUEST = 0;
-
-    /**
-     * Type for {@link ExecuteResponse}.
-     */
-    public static final short EXECUTE_RESPONSE = 1;
+@Transferable(TableMessageGroup.HAS_DATA_RESPONSE)
+public interface HasDataResponse extends NetworkMessage {
+    /** {@code true} if a node has data for a partition of a table, {@code false} otherwise. */
+    boolean result();
 }
diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java b/modules/table/src/main/java/org/apache/ignite/internal/table/message/TableMessageGroup.java
similarity index 62%
copy from modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/message/TableMessageGroup.java
index 8f684ba0c2..8b9888617e 100644
--- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeMessageTypes.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/message/TableMessageGroup.java
@@ -15,24 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.compute;
+package org.apache.ignite.internal.table.message;
 
-import org.apache.ignite.internal.compute.message.ExecuteRequest;
-import org.apache.ignite.internal.compute.message.ExecuteResponse;
 import org.apache.ignite.network.annotations.MessageGroup;
 
 /**
- * Message types for the Compute module.
+ * Table module message group.
  */
-@MessageGroup(groupName = "ComputeMessages", groupType = 6)
-public class ComputeMessageTypes {
-    /**
-     * Type for {@link ExecuteRequest}.
-     */
-    public static final short EXECUTE_REQUEST = 0;
+@MessageGroup(groupType = 8, groupName = "TableMessages")
+public class TableMessageGroup {
+    /** {@link HasDataRequest}. */
+    static final int HAS_DATA_REQUEST = 0;
 
-    /**
-     * Type for {@link ExecuteResponse}.
-     */
-    public static final short EXECUTE_RESPONSE = 1;
+    /** {@link HasDataResponse}. */
+    static final int HAS_DATA_RESPONSE = 1;
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
index 9f0f7db412..8478648356 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java
@@ -20,22 +20,38 @@ package org.apache.ignite.internal.utils;
 import static org.apache.ignite.internal.metastorage.client.CompoundCondition.and;
 import static org.apache.ignite.internal.metastorage.client.CompoundCondition.or;
 import static org.apache.ignite.internal.metastorage.client.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.client.Conditions.revision;
 import static org.apache.ignite.internal.metastorage.client.Conditions.value;
+import static org.apache.ignite.internal.metastorage.client.If.iif;
 import static org.apache.ignite.internal.metastorage.client.Operations.ops;
 import static org.apache.ignite.internal.metastorage.client.Operations.put;
 import static org.apache.ignite.internal.metastorage.client.Operations.remove;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.affinity.AffinityUtils;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.Conditions;
+import org.apache.ignite.internal.metastorage.client.Entry;
 import org.apache.ignite.internal.metastorage.client.If;
+import org.apache.ignite.internal.metastorage.client.Operations;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.jraft.entity.PeerId;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -72,7 +88,6 @@ public class RebalanceUtil {
      * @param tableName Table name.
      * @param partId Unique identifier of a partition.
      * @param baselineNodes Nodes in baseline.
-     * @param partitions Number of partitions in a table.
      * @param replicas Number of replicas for a table.
      * @param revision Revision of Meta Storage that is specific for the assignment update.
      * @param metaStorageMgr Meta Storage manager.
@@ -80,7 +95,7 @@ public class RebalanceUtil {
      */
     public static @NotNull CompletableFuture<Void> updatePendingAssignmentsKeys(
             String tableName, String partId, Collection<ClusterNode> baselineNodes,
-            int partitions, int replicas, long revision, MetaStorageManager metaStorageMgr, int partNum) {
+            int replicas, long revision, MetaStorageManager metaStorageMgr, int partNum) {
         ByteArray partChangeTriggerKey = partChangeTriggerKey(partId);
 
         ByteArray partAssignmentsPendingKey = pendingPartAssignmentsKey(partId);
@@ -89,8 +104,9 @@ public class RebalanceUtil {
 
         ByteArray partAssignmentsStableKey = stablePartAssignmentsKey(partId);
 
-        byte[] partAssignmentsBytes = ByteUtils.toBytes(
-                AffinityUtils.calculateAssignments(baselineNodes, partitions, replicas).get(partNum));
+        Set<ClusterNode> partAssignments = AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
+
+        byte[] partAssignmentsBytes = ByteUtils.toBytes(partAssignments);
 
         //    if empty(partition.change.trigger.revision) || partition.change.trigger.revision < event.revision:
         //        if empty(partition.assignments.pending) && partition.assignments.stable != calcPartAssighments():
@@ -104,13 +120,13 @@ public class RebalanceUtil {
         //                remove(partition.assignments.planned)
         //    else:
         //        skip
-        var iif = If.iif(or(notExists(partChangeTriggerKey), value(partChangeTriggerKey).lt(ByteUtils.longToBytes(revision))),
-                If.iif(and(notExists(partAssignmentsPendingKey), value(partAssignmentsStableKey).ne(partAssignmentsBytes)),
+        var iif = iif(or(notExists(partChangeTriggerKey), value(partChangeTriggerKey).lt(ByteUtils.longToBytes(revision))),
+                iif(and(notExists(partAssignmentsPendingKey), value(partAssignmentsStableKey).ne(partAssignmentsBytes)),
                         ops(
                                 put(partAssignmentsPendingKey, partAssignmentsBytes),
                                 put(partChangeTriggerKey, ByteUtils.longToBytes(revision))
                         ).yield(PENDING_KEY_UPDATED),
-                        If.iif(value(partAssignmentsPendingKey).ne(partAssignmentsBytes),
+                        iif(value(partAssignmentsPendingKey).ne(partAssignmentsBytes),
                                 ops(
                                         put(partAssignmentsPlannedKey, partAssignmentsBytes),
                                         put(partChangeTriggerKey, ByteUtils.longToBytes(revision))
@@ -157,6 +173,12 @@ public class RebalanceUtil {
     /** Key prefix for stable assignments. */
     public static final String STABLE_ASSIGNMENTS_PREFIX = "assignments.stable.";
 
+    /** Key prefix for switch reduce assignments. */
+    public static final String ASSIGNMENTS_SWITCH_REDUCE_PREFIX = "assignments.switch.reduce.";
+
+    /** Key prefix for switch append assignments. */
+    public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX = "assignments.switch.append.";
+
     /**
      * Key that is needed for the rebalance algorithm.
      *
@@ -202,13 +224,46 @@ public class RebalanceUtil {
     }
 
     /**
-     * Extract table id from pending key of partition.
+     * Key that is needed for the rebalance algorithm.
+     *
+     * @param partId Unique identifier of a partition.
+     * @return Key for a partition.
+     * @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalnce documentation</a>
+     */
+    public static ByteArray switchReduceKey(String partId) {
+        return new ByteArray(ASSIGNMENTS_SWITCH_REDUCE_PREFIX + partId);
+    }
+
+    /**
+     * Key that is needed for the rebalance algorithm.
+     *
+     * @param partId Unique identifier of a partition.
+     * @return Key for a partition.
+     * @see <a href="https://github.com/apache/ignite-3/blob/main/modules/table/tech-notes/rebalance.md">Rebalnce documentation</a>
+     */
+    public static ByteArray switchAppendKey(String partId) {
+        return new ByteArray(ASSIGNMENTS_SWITCH_APPEND_PREFIX + partId);
+    }
+
+    /**
+     * Extract table id from a metastorage key of partition.
+     *
+     * @param key Key.
+     * @return Table id.
+     */
+    public static UUID extractTableId(ByteArray key) {
+        return extractTableId(key, "");
+    }
+
+    /**
+     * Extract table id from a metastorage key of partition.
      *
      * @param key Key.
+     * @param prefix Key prefix.
      * @return Table id.
      */
     public static UUID extractTableId(ByteArray key, String prefix) {
-        var strKey = key.toString();
+        String strKey = key.toString();
 
         return UUID.fromString(strKey.substring(prefix.length(), strKey.indexOf("_part_")));
     }
@@ -235,4 +290,209 @@ public class RebalanceUtil {
         // As long as we don't have a general failure handler, we assume that all errors are recoverable.
         return true;
     }
+
+    /**
+     * Starts the process of removing peer from raft group if that peer has in-memory storage or if its
+     * storage has been cleared.
+     *
+     * @param partId Partition's raft group id.
+     * @param clusterNode Cluster node to be removed from peers.
+     * @param metaStorageMgr MetaStorage manager.
+     * @return Completable future that signifies the completion of this operation.
+     */
+    public static CompletableFuture<Void> startPeerRemoval(String partId, ClusterNode clusterNode, MetaStorageManager metaStorageMgr) {
+        ByteArray key = switchReduceKey(partId);
+
+        return metaStorageMgr.get(key)
+                .thenCompose(retrievedAssignmentsSwitchReduce -> {
+                    byte[] prevValue = retrievedAssignmentsSwitchReduce.value();
+
+                    if (prevValue != null) {
+                        Set<ClusterNode> prev = ByteUtils.fromBytes(prevValue);
+
+                        prev.add(clusterNode);
+
+                        return metaStorageMgr.invoke(
+                                revision(key).eq(retrievedAssignmentsSwitchReduce.revision()),
+                                Operations.put(key, ByteUtils.toBytes(prev)),
+                                Operations.noop()
+                        );
+                    } else {
+                        var newValue = new HashSet<>();
+
+                        newValue.add(clusterNode);
+
+                        return metaStorageMgr.invoke(
+                                Conditions.notExists(key),
+                                Operations.put(key, ByteUtils.toBytes(newValue)),
+                                Operations.noop()
+                        );
+                    }
+                }).thenCompose(res -> {
+                    if (!res) {
+                        return startPeerRemoval(partId, clusterNode, metaStorageMgr);
+                    }
+
+                    return CompletableFuture.completedFuture(null);
+                });
+    }
+
+    /**
+     * Handles assignments switch reduce changed updating pending assignments if there is no rebalancing in progress.
+     * If there is rebalancing in progress, then new assignments will be applied when rebalance finishes. 
+     *
+     * @param metaStorageMgr MetaStorage manager.
+     * @param baselineNodes Baseline nodes.
+     * @param replicas Replicas count.
+     * @param partNum Number of the partition.
+     * @param partId Partition's raft group id.
+     * @param event Assignments switch reduce change event.
+     * @return Completable future that signifies the completion of this operation.
+     */
+    public static CompletableFuture<Void> handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<ClusterNode> baselineNodes,
+            int replicas, int partNum, String partId, WatchEvent event) {
+        Entry entry = event.entryEvent().newEntry();
+        byte[] eventData = entry.value();
+
+        Set<ClusterNode> assignments = AffinityUtils.calculateAssignmentForPartition(baselineNodes, partNum, replicas);
+
+        Set<ClusterNode> switchReduce = ByteUtils.fromBytes(eventData);
+
+        ByteArray pendingKey = pendingPartAssignmentsKey(partId);
+
+        Set<ClusterNode> pendingAssignments = subtract(assignments, switchReduce);
+
+        byte[] pendingByteArray = ByteUtils.toBytes(pendingAssignments);
+        byte[] assignmentsByteArray = ByteUtils.toBytes(assignments);
+
+        if (switchReduce.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        ByteArray changeTriggerKey = partChangeTriggerKey(partId);
+        byte[] rev = ByteUtils.longToBytes(event.entryEvent().newEntry().revision());
+
+        // Here is what happens in the MetaStorage:
+        // if ((notExists(changeTriggerKey) || value(changeTriggerKey) < revision) && (notExists(pendingKey) && notExists(stableKey)) {
+        //     put(pendingKey, pending)
+        //     put(stableKey, assignments)
+        //     put(changeTriggerKey, revision)
+        // } else if ((notExists(changeTriggerKey) || value(changeTriggerKey) < revision) && (notExists(pendingKey))) {
+        //     put(pendingKey, pending)
+        //     put(changeTriggerKey, revision)
+        // }
+
+        If resultingOperation = iif(
+                and(
+                        or(notExists(changeTriggerKey), value(changeTriggerKey).lt(rev)),
+                        and(notExists(pendingKey), (notExists(stablePartAssignmentsKey(partId))))
+                ),
+                ops(
+                        put(pendingKey, pendingByteArray),
+                        put(stablePartAssignmentsKey(partId), assignmentsByteArray),
+                        put(changeTriggerKey, rev)
+                ).yield(),
+                iif(
+                        and(
+                                or(notExists(changeTriggerKey), value(changeTriggerKey).lt(rev)),
+                                notExists(pendingKey)
+                        ),
+                        ops(
+                                put(pendingKey, pendingByteArray),
+                                put(changeTriggerKey, rev)
+                        ).yield(),
+                        ops().yield()
+                )
+        );
+
+        return metaStorageMgr.invoke(resultingOperation).thenApply(unused -> null);
+    }
+
+    /**
+     * Builds a list of cluster nodes based on a list of peers, pending and stable assignments.
+     * A peer will be added to the result list iff peer's address is present in pending or stable assignments.
+     *
+     * @param peers List of peers.
+     * @param pendingAssignments Byte array that contains serialized list of pending assignments.
+     * @param stableAssignments Byte array that contains serialized list of stable assignments.
+     * @return Resolved cluster nodes.
+     */
+    public static Set<ClusterNode> resolveClusterNodes(List<PeerId> peers, byte[] pendingAssignments, byte[] stableAssignments) {
+        Map<NetworkAddress, ClusterNode> resolveRegistry = new HashMap<>();
+
+        if (pendingAssignments != null) {
+            Set<ClusterNode> pending = ByteUtils.fromBytes(pendingAssignments);
+            pending.forEach(n -> resolveRegistry.put(n.address(), n));
+        }
+
+        if (stableAssignments != null) {
+            Set<ClusterNode> stable = ByteUtils.fromBytes(stableAssignments);
+            stable.forEach(n -> resolveRegistry.put(n.address(), n));
+        }
+
+        Set<ClusterNode> resolvedNodes = new HashSet<>(peers.size());
+
+        for (PeerId p : peers) {
+            var addr = NetworkAddress.from(p.getEndpoint().getIp() + ":" + p.getEndpoint().getPort());
+
+            if (resolveRegistry.containsKey(addr)) {
+                resolvedNodes.add(resolveRegistry.get(addr));
+            } else {
+                throw new IgniteInternalException("Can't find appropriate cluster node for raft group peer: " + p);
+            }
+        }
+
+        return resolvedNodes;
+    }
+
+    /**
+     * Reads a list of cluster nodes from a MetaStorage entry.
+     *
+     * @param entry MetaStorage entry.
+     * @return List of cluster nodes.
+     */
+    public static Set<ClusterNode> readClusterNodes(Entry entry) {
+        if (entry.empty()) {
+            return Collections.emptySet();
+        }
+
+        return ByteUtils.fromBytes(entry.value());
+    }
+
+    /**
+     * Removes nodes from set of nodes.
+     *
+     * @param minuend Set to remove nodes from.
+     * @param subtrahend Set of nodes to be removed.
+     * @return Result of the subtraction.
+     */
+    public static Set<ClusterNode> subtract(Set<ClusterNode> minuend, Set<ClusterNode> subtrahend) {
+        return minuend.stream().filter(v -> !subtrahend.contains(v)).collect(Collectors.toSet());
+    }
+
+    /**
+     * Adds nodes to the set of nodes.
+     *
+     * @param op1 First operand.
+     * @param op2 Second operand.
+     * @return Result of the addition.
+     */
+    public static Set<ClusterNode> union(Set<ClusterNode> op1, Set<ClusterNode> op2) {
+        var res = new HashSet<>(op1);
+
+        res.addAll(op2);
+
+        return res;
+    }
+
+    /**
+     * Returns an intersection of two set of nodes.
+     *
+     * @param op1 First operand.
+     * @param op2 Second operand.
+     * @return Result of the intersection.
+     */
+    public static Set<ClusterNode> intersect(Set<ClusterNode> op1, Set<ClusterNode> op2) {
+        return op1.stream().filter(op2::contains).collect(Collectors.toSet());
+    }
 }
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index f694cc0ea8..1f111133a8 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -46,7 +46,9 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -248,6 +250,9 @@ public class TableManagerTest extends IgniteAbstractTest {
     /** Before all test scenarios. */
     @BeforeEach
     void before() {
+        when(rm.messagingService()).thenReturn(mock(MessagingService.class));
+        when(rm.topologyService()).thenReturn(mock(TopologyService.class));
+
         revisionUpdater = (Function<Long, CompletableFuture<?>> function) -> {
             function.apply(0L).join();
 
@@ -305,10 +310,10 @@ public class TableManagerTest extends IgniteAbstractTest {
 
                 var extConfCh = ((ExtendedTableChange) tableChange);
 
-                ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
+                ArrayList<Set<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
 
                 for (int part = 0; part < PARTITIONS; part++) {
-                    assignment.add(new ArrayList<>(Collections.singleton(node)));
+                    assignment.add(new HashSet<>(Collections.singleton(node)));
                 }
 
                 extConfCh.changeAssignments(ByteUtils.toBytes(assignment))
@@ -761,6 +766,7 @@ public class TableManagerTest extends IgniteAbstractTest {
      */
     private TableManager createTableManager(CompletableFuture<TableManager> tblManagerFut, boolean waitingSqlSchema) {
         TableManager tableManager = new TableManager(
+                "test",
                 revisionUpdater,
                 tblsCfg,
                 rm,
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
index 7a15981de5..02f47b9e29 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListenerTest.java
@@ -53,7 +53,7 @@ class RebalanceRaftGroupEventsListenerTest {
     void testOnReconfigurationErrorCalledFromResetWithNullStatus() throws Exception {
         IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-        RaftGroupEventsListener spy = Mockito.spy(new RebalanceRaftGroupEventsListener(null, null, null, 0, busyLock, null, null));
+        RaftGroupEventsListener spy = Mockito.spy(new RebalanceRaftGroupEventsListener(null, null, null, 0, busyLock, null, null, null));
 
         NodeImpl node = Mockito.mock(NodeImpl.class);