You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2023/02/14 12:43:23 UTC

[ignite-3] branch main updated: IGNITE-18554 Remove MetaStorage learners on topology events (#1542)

This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new dc56fe75cf IGNITE-18554 Remove MetaStorage learners on topology events (#1542)
dc56fe75cf is described below

commit dc56fe75cf4c29b608caa40b74cd249afe29fb9d
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Tue Feb 14 14:43:17 2023 +0200

    IGNITE-18554 Remove MetaStorage learners on topology events (#1542)
---
 .../cluster/management/ItClusterManagerTest.java   |   2 +
 .../management/ClusterManagementGroupManager.java  |  23 +-
 .../management/raft/CmgRaftGroupListener.java      |   8 +-
 .../cluster/management/raft/ValidationManager.java |  34 +-
 modules/metastorage/build.gradle                   |   2 +
 .../impl/ItMetaStorageManagerImplTest.java         |  13 +-
 .../impl/ItMetaStorageMultipleNodesTest.java       | 345 +++++++++++++++++++++
 .../metastorage/impl/ItMetaStorageServiceTest.java |  35 ++-
 .../metastorage/impl/ItMetaStorageWatchTest.java   |  96 +++---
 .../internal/metastorage/impl/EntryImpl.java       |   3 +
 .../metastorage/impl/MetaStorageManagerImpl.java   | 133 +-------
 .../impl/MetaStorageRaftGroupEventsListener.java   | 230 ++++++++++++++
 .../raft/client/TopologyAwareRaftGroupService.java |   4 +-
 .../internal/raft/RaftGroupEventsListener.java     |  18 +-
 .../internal/raft/service/RaftGroupService.java    |   8 +-
 .../ignite/internal/raft/RaftGroupServiceImpl.java |   4 +-
 .../ItDistributedConfigurationPropertiesTest.java  |   6 +-
 .../ItDistributedConfigurationStorageTest.java     |   6 +-
 .../storage/ItRebalanceDistributedTest.java        |   6 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   6 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   1 +
 21 files changed, 743 insertions(+), 240 deletions(-)

diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
index aa252d828a..e295b7af8c 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -139,6 +140,7 @@ public class ItClusterManagerTest extends BaseItClusterManagementTest {
     /**
      * Tests a scenario when a node is restarted.
      */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-18795")
     @Test
     void testNodeRestart(TestInfo testInfo) throws Exception {
         startCluster(2, testInfo);
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 32b827434d..d3f58bc4e8 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -58,10 +58,8 @@ import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.properties.IgniteProductVersion;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
-import org.apache.ignite.internal.raft.RaftGroupEventsListener;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.Status;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -522,7 +520,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
                             new RaftNodeId(CmgGroupId.INSTANCE, serverPeer),
                             raftConfiguration,
                             new CmgRaftGroupListener(clusterStateStorage, logicalTopology, this::onLogicalTopologyChanged),
-                            createCmgRaftGroupEventsListener()
+                            this::onElectedAsLeader
                     )
                     .thenApply(service -> new CmgRaftService(service, clusterService, logicalTopology));
         } catch (Exception e) {
@@ -549,25 +547,6 @@ public class ClusterManagementGroupManager implements IgniteComponent {
         }
     }
 
-    private RaftGroupEventsListener createCmgRaftGroupEventsListener() {
-        return new RaftGroupEventsListener() {
-            @Override
-            public void onLeaderElected(long term) {
-                ClusterManagementGroupManager.this.onElectedAsLeader(term);
-            }
-
-            @Override
-            public void onNewPeersConfigurationApplied(PeersAndLearners configuration) {
-                // No-op.
-            }
-
-            @Override
-            public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {
-                // No-op.
-            }
-        };
-    }
-
     /**
      * Starts the CMG Raft service using the given {@code state} and persists it to the local storage.
      */
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index 3500713e15..945625b555 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -185,11 +185,13 @@ public class CmgRaftGroupListener implements RaftGroupListener {
         ClusterNode node = command.node().asClusterNode();
 
         if (validationManager.isNodeValidated(node)) {
-            logicalTopology.putNode(node);
+            validationManager.completeValidation(node);
 
-            LOG.info("Node added to the logical topology [node={}]", node.name());
+            logicalTopology.putNode(node);
 
-            validationManager.completeValidation(node);
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Node added to the logical topology [node={}]", node.name());
+            }
 
             return null;
         } else {
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java
index 3fbc42e1a3..8b822c437e 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java
@@ -17,10 +17,11 @@
 
 package org.apache.ignite.internal.cluster.management.raft;
 
+import static java.util.stream.Collectors.toSet;
+
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.Set;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
 import org.apache.ignite.internal.cluster.management.ClusterState;
 import org.apache.ignite.internal.cluster.management.ClusterTag;
 import org.apache.ignite.internal.cluster.management.raft.commands.InitCmgStateCommand;
@@ -90,7 +91,7 @@ class ValidationManager {
     }
 
     /**
-     * Validates a given node and issues a validation token.
+     * Validates a given node and saves it in the set of validated nodes.
      *
      * @return {@code null} in case of successful validation or a {@link ValidationErrorResponse} otherwise.
      */
@@ -134,16 +135,17 @@ class ValidationManager {
     void removeValidatedNodes(Collection<ClusterNode> nodes) {
         Set<String> validatedNodeIds = storage.getValidatedNodes().stream()
                 .map(ClusterNode::id)
-                // Using a sorted collection to have a stable notification order.
-                .collect(Collectors.toCollection(TreeSet::new));
+                .collect(toSet());
 
-        nodes.forEach(node -> {
-            if (validatedNodeIds.contains(node.id())) {
-                storage.removeValidatedNode(node);
+        // Using a sorted stream to have a stable notification order.
+        nodes.stream()
+                .filter(node -> validatedNodeIds.contains(node.id()))
+                .sorted(Comparator.comparing(ClusterNode::id))
+                .forEach(node -> {
+                    storage.removeValidatedNode(node);
 
-                logicalTopology.onNodeInvalidated(node);
-            }
-        });
+                    logicalTopology.onNodeInvalidated(node);
+                });
     }
 
     /**
@@ -152,6 +154,16 @@ class ValidationManager {
      * @param node Node that wishes to join the logical topology.
      */
     void completeValidation(ClusterNode node) {
+        // Remove all other versions of this node, if they were validated at some point, but not removed from the physical topology.
+        storage.getValidatedNodes().stream()
+                .filter(n -> n.name().equals(node.name()) && !n.id().equals(node.id()))
+                .sorted(Comparator.comparing(ClusterNode::id))
+                .forEach(nodeVersion -> {
+                    storage.removeValidatedNode(nodeVersion);
+
+                    logicalTopology.onNodeInvalidated(nodeVersion);
+                });
+
         storage.removeValidatedNode(node);
     }
 }
diff --git a/modules/metastorage/build.gradle b/modules/metastorage/build.gradle
index 7202db5c39..197562dae0 100644
--- a/modules/metastorage/build.gradle
+++ b/modules/metastorage/build.gradle
@@ -52,6 +52,8 @@ dependencies {
     integrationTestImplementation testFixtures(project(':ignite-raft'))
     integrationTestImplementation testFixtures(project(':ignite-configuration'))
     integrationTestImplementation testFixtures(project(':ignite-vault'))
+    integrationTestImplementation testFixtures(project(':ignite-metastorage'))
+    integrationTestImplementation testFixtures(project(':ignite-cluster-management'))
 
     testFixturesImplementation project(':ignite-core')
     testFixturesImplementation project(':ignite-rocksdb-common')
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index 9941d8b5cb..c2411ee654 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -33,12 +33,12 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Path;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -52,8 +52,7 @@ import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
 import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultEntry;
 import org.apache.ignite.internal.vault.VaultManager;
@@ -72,9 +71,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
 /**
  * Integration tests for {@link MetaStorageManagerImpl}.
  */
-@ExtendWith(WorkDirectoryExtension.class)
 @ExtendWith(ConfigurationExtension.class)
-public class ItMetaStorageManagerImplTest {
+public class ItMetaStorageManagerImplTest extends IgniteAbstractTest {
     private VaultManager vaultManager;
 
     private ClusterService clusterService;
@@ -86,8 +84,7 @@ public class ItMetaStorageManagerImplTest {
     private MetaStorageManagerImpl metaStorageManager;
 
     @BeforeEach
-    void setUp(TestInfo testInfo, @WorkDirectory Path workDir, @InjectConfiguration RaftConfiguration raftConfiguration)
-            throws NodeStoppingException {
+    void setUp(TestInfo testInfo, @InjectConfiguration RaftConfiguration raftConfiguration) throws NodeStoppingException {
         var addr = new NetworkAddress("localhost", 10_000);
 
         clusterService = clusterService(testInfo, addr.port(), new StaticNodeFinder(List.of(addr)));
@@ -107,6 +104,7 @@ public class ItMetaStorageManagerImplTest {
                 vaultManager,
                 clusterService,
                 cmgManager,
+                mock(LogicalTopologyService.class),
                 raftManager,
                 storage
         );
@@ -266,6 +264,7 @@ public class ItMetaStorageManagerImplTest {
                 vaultManager,
                 clusterService,
                 cmgManager,
+                mock(LogicalTopologyService.class),
                 raftManager,
                 storage
         );
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
new file mode 100644
index 0000000000..f7e1787f0c
--- /dev/null
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.DefaultMessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for scenarios when Meta Storage nodes join and leave a cluster.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class ItMetaStorageMultipleNodesTest extends IgniteAbstractTest {
+    @InjectConfiguration
+    private static RaftConfiguration raftConfiguration;
+
+    @InjectConfiguration("mock.failoverTimeout=0")
+    private static ClusterManagementConfiguration cmgConfiguration;
+
+    private static class Node {
+        private final VaultManager vaultManager;
+
+        private final ClusterService clusterService;
+
+        private final RaftManager raftManager;
+
+        private final ClusterStateStorage clusterStateStorage = new TestClusterStateStorage();
+
+        private final ClusterManagementGroupManager cmgManager;
+
+        private final MetaStorageManagerImpl metaStorageManager;
+
+        Node(ClusterService clusterService, Path dataPath) {
+            this.clusterService = clusterService;
+
+            this.vaultManager = new VaultManager(new InMemoryVaultService());
+
+            Path basePath = dataPath.resolve(name());
+
+            this.raftManager = new Loza(
+                    clusterService,
+                    raftConfiguration,
+                    basePath.resolve("raft"),
+                    new HybridClockImpl()
+            );
+
+            var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+            this.cmgManager = new ClusterManagementGroupManager(
+                    vaultManager,
+                    clusterService,
+                    raftManager,
+                    clusterStateStorage,
+                    logicalTopology,
+                    cmgConfiguration
+            );
+
+            this.metaStorageManager = new MetaStorageManagerImpl(
+                    vaultManager,
+                    clusterService,
+                    cmgManager,
+                    new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
+                    raftManager,
+                    new SimpleInMemoryKeyValueStorage(name())
+            );
+        }
+
+        void start() throws NodeStoppingException {
+            List<IgniteComponent> components =
+                    List.of(vaultManager, clusterService, raftManager, clusterStateStorage, cmgManager, metaStorageManager);
+
+            components.forEach(IgniteComponent::start);
+
+            metaStorageManager.deployWatches();
+        }
+
+        String name() {
+            return clusterService.localConfiguration().getName();
+        }
+
+        void stop() throws Exception {
+            List<IgniteComponent> components =
+                    List.of(metaStorageManager, cmgManager, raftManager, clusterStateStorage, clusterService, vaultManager);
+
+            Stream<AutoCloseable> beforeNodeStop = components.stream().map(c -> c::beforeNodeStop);
+
+            Stream<AutoCloseable> nodeStop = components.stream().map(c -> c::stop);
+
+            IgniteUtils.closeAll(Stream.concat(beforeNodeStop, nodeStop));
+        }
+
+        CompletableFuture<Set<String>> getMetaStorageLearners() {
+            return metaStorageManager
+                    .metaStorageServiceFuture()
+                    .thenApply(MetaStorageServiceImpl::raftGroupService)
+                    .thenCompose(service -> service.refreshMembers(false).thenApply(v -> service.learners()))
+                    .thenApply(learners -> learners.stream().map(Peer::consistentId).collect(toSet()));
+        }
+
+        void startDroppingMessagesTo(Node recipient, Class<? extends NetworkMessage> msgType) {
+            ((DefaultMessagingService) clusterService.messagingService())
+                    .dropMessages((recipientConsistentId, message) ->
+                            recipient.name().equals(recipientConsistentId) && msgType.isInstance(message));
+        }
+
+        void stopDroppingMessages() {
+            ((DefaultMessagingService) clusterService.messagingService()).stopDroppingMessages();
+        }
+    }
+
+    private final List<Node> nodes = new ArrayList<>();
+
+    private Node startNode(TestInfo testInfo) throws NodeStoppingException {
+        var nodeFinder = new StaticNodeFinder(List.of(new NetworkAddress("localhost", 10_000)));
+
+        ClusterService clusterService = ClusterServiceTestUtils.clusterService(testInfo, 10_000 + nodes.size(), nodeFinder);
+
+        var node = new Node(clusterService, workDir);
+
+        node.start();
+
+        nodes.add(node);
+
+        return node;
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        IgniteUtils.closeAll(nodes.stream().map(node -> node::stop));
+    }
+
+    /**
+     * Tests that an incoming node gets registered as a Learner and receives Meta Storage updates.
+     */
+    @Test
+    void testLearnerJoin(TestInfo testInfo) throws NodeStoppingException {
+        Node firstNode = startNode(testInfo);
+
+        firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test");
+
+        var key = new ByteArray("foo");
+        byte[] value = "bar".getBytes(StandardCharsets.UTF_8);
+
+        CompletableFuture<Boolean> invokeFuture = firstNode.metaStorageManager.invoke(notExists(key), put(key, value), noop());
+
+        assertThat(invokeFuture, willBe(true));
+
+        Node secondNode = startNode(testInfo);
+
+        // Check that reading remote data works correctly.
+        assertThat(secondNode.metaStorageManager.get(key).thenApply(Entry::value), willBe(value));
+
+        // Check that the new node will receive events.
+        var awaitFuture = new CompletableFuture<EntryEvent>();
+
+        secondNode.metaStorageManager.registerExactWatch(key, new WatchListener() {
+            @Override
+            public void onUpdate(WatchEvent event) {
+                // Skip the first update event, because it's not guaranteed to arrive here (insert may have happened before the watch was
+                // registered).
+                if (event.revision() != 1) {
+                    awaitFuture.complete(event.entryEvent());
+                }
+            }
+
+            @Override
+            public void onError(Throwable e) {
+                awaitFuture.completeExceptionally(e);
+            }
+        });
+
+        byte[] newValue = "baz".getBytes(StandardCharsets.UTF_8);
+
+        invokeFuture = firstNode.metaStorageManager.invoke(revision(key).eq(1), put(key, newValue), noop());
+
+        assertThat(invokeFuture, willBe(true));
+
+        var expectedEntryEvent = new EntryEvent(
+                new EntryImpl(key.bytes(), value, 1, 1),
+                new EntryImpl(key.bytes(), newValue, 2, 2)
+        );
+
+        assertThat(awaitFuture, willBe(expectedEntryEvent));
+
+        // Check that the second node has been registered as a learner.
+        assertThat(firstNode.getMetaStorageLearners(), willBe(Set.of(secondNode.name())));
+    }
+
+    /**
+     * Tests a case when a node leaves the physical topology without entering the logical topology.
+     */
+    @Test
+    void testLearnerLeavePhysicalTopology(TestInfo testInfo) throws Exception {
+        Node firstNode = startNode(testInfo);
+        Node secondNode = startNode(testInfo);
+
+        firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test");
+
+        // Try reading some data to make sure that Raft has been configured correctly.
+        assertThat(secondNode.metaStorageManager.get(new ByteArray("test")).thenApply(Entry::value), willBe(nullValue()));
+
+        // Check that the second node has been registered as a learner.
+        waitForCondition(() -> firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 10_000);
+
+        // Stop the second node.
+        secondNode.stop();
+
+        nodes.remove(1);
+
+        assertTrue(waitForCondition(() -> firstNode.getMetaStorageLearners().join().isEmpty(), 10_000));
+    }
+
+    /**
+     * Tests a case when a node leaves the physical topology without entering the logical topology.
+     */
+    @Test
+    void testLearnerLeaveLogicalTopology(TestInfo testInfo) throws Exception {
+        Node firstNode = startNode(testInfo);
+        Node secondNode = startNode(testInfo);
+
+        firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test");
+
+        assertThat(allOf(firstNode.cmgManager.onJoinReady(), secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
+
+        CompletableFuture<Set<String>> logicalTopologyNodes = firstNode.cmgManager
+                .logicalTopology()
+                .thenApply(logicalTopology -> logicalTopology.nodes().stream().map(ClusterNode::name).collect(toSet()));
+
+        assertThat(logicalTopologyNodes, willBe(Set.of(firstNode.name(), secondNode.name())));
+
+        // Try reading some data to make sure that Raft has been configured correctly.
+        assertThat(secondNode.metaStorageManager.get(new ByteArray("test")).thenApply(Entry::value), willBe(nullValue()));
+
+        // Check that the second node has been registered as a learner.
+        waitForCondition(() -> firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 10_000);
+
+        // Stop the second node.
+        secondNode.stop();
+
+        nodes.remove(1);
+
+        assertTrue(waitForCondition(() -> firstNode.getMetaStorageLearners().join().isEmpty(), 10_000));
+    }
+
+    /**
+     * Tests a scenario when a node gets kicked out of the Logical Topology due to a network partition. It should then be able to join
+     * the Meta Storage Raft group successfully.
+     */
+    @Test
+    void testLearnerLeaveAndJoinBecauseOfNetworkPartition(TestInfo testInfo) throws Exception {
+        Node firstNode = startNode(testInfo);
+        Node secondNode = startNode(testInfo);
+
+        firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test");
+
+        assertThat(allOf(firstNode.cmgManager.onJoinReady(), secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
+
+        CompletableFuture<Set<String>> logicalTopologyNodes = firstNode.cmgManager
+                .logicalTopology()
+                .thenApply(logicalTopology -> logicalTopology.nodes().stream().map(ClusterNode::name).collect(toSet()));
+
+        assertThat(logicalTopologyNodes, willBe(Set.of(firstNode.name(), secondNode.name())));
+
+        waitForCondition(() -> firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 10_000);
+
+        // Make first node lose the second node from the Physical and Logical topologies.
+        firstNode.startDroppingMessagesTo(secondNode, ScaleCubeMessage.class);
+
+        assertTrue(waitForCondition(() -> firstNode.getMetaStorageLearners().join().isEmpty(), 10_000));
+
+        // Make the first node discover the second node again. The second node should be added as a Meta Storage Learner again.
+        firstNode.stopDroppingMessages();
+
+        assertTrue(waitForCondition(() -> firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 10_000));
+    }
+}
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index ca9631a110..2b25b4f2de 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -60,8 +60,10 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -854,11 +856,9 @@ public class ItMetaStorageServiceTest {
 
     /**
      * Tests {@link MetaStorageService#closeCursors(String)}.
-     *
-     * @throws Exception If failed.
      */
     @Test
-    public void testCursorsCleanup() throws Exception {
+    public void testCursorsCleanup() throws InterruptedException {
         startNodes(2);
 
         Node leader = nodes.get(0);
@@ -873,14 +873,23 @@ public class ItMetaStorageServiceTest {
             return cursor;
         });
 
-        class MockSubscriber implements Subscriber<Entry> {
-            private Subscription subscription;
+        var subscriptionLatch = new CountDownLatch(3);
+        var closeCursorLatch = new CountDownLatch(1);
 
+        class MockSubscriber implements Subscriber<Entry> {
             private final CompletableFuture<Entry> result = new CompletableFuture<>();
 
             @Override
             public void onSubscribe(Subscription subscription) {
-                this.subscription = subscription;
+                subscriptionLatch.countDown();
+
+                try {
+                    assertTrue(closeCursorLatch.await(10, TimeUnit.SECONDS));
+                } catch (Throwable e) {
+                    onError(e);
+                }
+
+                subscription.request(1);
             }
 
             @Override
@@ -895,6 +904,7 @@ public class ItMetaStorageServiceTest {
 
             @Override
             public void onComplete() {
+                result.completeExceptionally(new AssertionError("No items produced"));
             }
         }
 
@@ -908,11 +918,16 @@ public class ItMetaStorageServiceTest {
 
         learner.metaStorageService.range(new ByteArray(EXPECTED_RESULT_ENTRY.key()), null).subscribe(node1Subscriber0);
 
-        leader.metaStorageService.closeCursors(leader.clusterService.topologyService().localMember().id()).get();
+        // Wait for all cursors to be registered on the server side.
+        assertTrue(subscriptionLatch.await(10, TimeUnit.SECONDS));
+
+        String leaderId = leader.clusterService.topologyService().localMember().id();
+
+        CompletableFuture<Void> closeCursorsFuture = leader.metaStorageService.closeCursors(leaderId);
+
+        assertThat(closeCursorsFuture, willCompleteSuccessfully());
 
-        node0Subscriber0.subscription.request(1);
-        node0Subscriber1.subscription.request(1);
-        node1Subscriber0.subscription.request(1);
+        closeCursorLatch.countDown();
 
         assertThat(node0Subscriber0.result, willFailFast(NoSuchElementException.class));
         assertThat(node0Subscriber1.result, willFailFast(NoSuchElementException.class));
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index b8d88f453f..635175d0ed 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -24,15 +24,12 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -40,9 +37,14 @@ import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
+import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
@@ -50,12 +52,11 @@ import org.apache.ignite.internal.metastorage.dsl.Conditions;
 import org.apache.ignite.internal.metastorage.dsl.Operations;
 import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
 import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterService;
@@ -71,55 +72,68 @@ import org.junit.jupiter.api.extension.ExtendWith;
 /**
  * Tests for Meta Storage Watches.
  */
-@ExtendWith(WorkDirectoryExtension.class)
 @ExtendWith(ConfigurationExtension.class)
-public class ItMetaStorageWatchTest {
+public class ItMetaStorageWatchTest extends IgniteAbstractTest {
     private static class Node {
-        private final ClusterService clusterService;
+        private final List<IgniteComponent> components = new ArrayList<>();
 
-        private final RaftManager raftManager;
+        private final ClusterService clusterService;
 
         private final MetaStorageManager metaStorageManager;
 
-        private final CompletableFuture<Set<String>> metaStorageNodesFuture = new CompletableFuture<>();
+        private final ClusterManagementGroupManager cmgManager;
+
+        Node(ClusterService clusterService, Path dataPath) {
+            var vaultManager = new VaultManager(new InMemoryVaultService());
+
+            components.add(vaultManager);
 
-        Node(ClusterService clusterService, RaftConfiguration raftConfiguration, Path dataPath) {
             this.clusterService = clusterService;
 
+            components.add(clusterService);
+
             Path basePath = dataPath.resolve(name());
 
-            this.raftManager = new Loza(
+            var raftManager = new Loza(
                     clusterService,
                     raftConfiguration,
                     basePath.resolve("raft"),
                     new HybridClockImpl()
             );
 
-            var vaultManager = mock(VaultManager.class);
+            components.add(raftManager);
 
-            when(vaultManager.get(any())).thenReturn(CompletableFuture.completedFuture(null));
-            when(vaultManager.put(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
-            when(vaultManager.putAll(any())).thenReturn(CompletableFuture.completedFuture(null));
+            var clusterStateStorage = new TestClusterStateStorage();
 
-            var cmgManager = mock(ClusterManagementGroupManager.class);
+            components.add(clusterStateStorage);
 
-            when(cmgManager.metaStorageNodes()).thenReturn(metaStorageNodesFuture);
+            var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+            this.cmgManager = new ClusterManagementGroupManager(
+                    vaultManager,
+                    clusterService,
+                    raftManager,
+                    clusterStateStorage,
+                    logicalTopology,
+                    cmgConfiguration
+            );
+
+            components.add(cmgManager);
 
             this.metaStorageManager = new MetaStorageManagerImpl(
                     vaultManager,
                     clusterService,
                     cmgManager,
+                    new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
                     raftManager,
                     new RocksDbKeyValueStorage(name(), basePath.resolve("storage"))
             );
-        }
 
-        void start(Set<String> metaStorageNodes) {
-            clusterService.start();
-            raftManager.start();
-            metaStorageManager.start();
+            components.add(metaStorageManager);
+        }
 
-            metaStorageNodesFuture.complete(metaStorageNodes);
+        void start() {
+            components.forEach(IgniteComponent::start);
         }
 
         String name() {
@@ -127,9 +141,11 @@ public class ItMetaStorageWatchTest {
         }
 
         void stop() throws Exception {
-            Stream<AutoCloseable> beforeNodeStop = Stream.of(metaStorageManager, raftManager, clusterService).map(c -> c::beforeNodeStop);
+            Collections.reverse(components);
 
-            Stream<AutoCloseable> nodeStop = Stream.of(metaStorageManager, raftManager, clusterService).map(c -> c::stop);
+            Stream<AutoCloseable> beforeNodeStop = components.stream().map(c -> c::beforeNodeStop);
+
+            Stream<AutoCloseable> nodeStop = components.stream().map(c -> c::stop);
 
             IgniteUtils.closeAll(Stream.concat(beforeNodeStop, nodeStop));
         }
@@ -137,11 +153,11 @@ public class ItMetaStorageWatchTest {
 
     private TestInfo testInfo;
 
-    @WorkDirectory
-    private Path workDir;
+    @InjectConfiguration
+    private static RaftConfiguration raftConfiguration;
 
     @InjectConfiguration
-    private RaftConfiguration raftConfiguration;
+    private static ClusterManagementConfiguration cmgConfiguration;
 
     private final List<Node> nodes = new ArrayList<>();
 
@@ -155,16 +171,20 @@ public class ItMetaStorageWatchTest {
         IgniteUtils.closeAll(nodes.stream().map(node -> node::stop));
     }
 
-    private void startNodes(int amount) {
-        List<NetworkAddress> localAddresses = findLocalAddresses(10_000, 10_000 + nodes.size() + amount);
+    private void startCluster(int size) throws NodeStoppingException {
+        List<NetworkAddress> localAddresses = findLocalAddresses(10_000, 10_000 + nodes.size() + size);
 
         var nodeFinder = new StaticNodeFinder(localAddresses);
 
         localAddresses.stream()
                 .map(addr -> ClusterServiceTestUtils.clusterService(testInfo, addr.port(), nodeFinder))
-                .forEach(clusterService -> nodes.add(new Node(clusterService, raftConfiguration, workDir)));
+                .forEach(clusterService -> nodes.add(new Node(clusterService, workDir)));
+
+        nodes.parallelStream().forEach(Node::start);
+
+        String name = nodes.get(0).name();
 
-        nodes.parallelStream().forEach(node -> node.start(Set.of(nodes.get(0).name())));
+        nodes.get(0).cmgManager.initCluster(List.of(name), List.of(name), "test");
     }
 
     @Test
@@ -229,7 +249,7 @@ public class ItMetaStorageWatchTest {
     private void testWatches(BiConsumer<Node, CountDownLatch> registerWatchAction) throws Exception {
         int numNodes = 3;
 
-        startNodes(numNodes);
+        startCluster(numNodes);
 
         var latch = new CountDownLatch(numNodes);
 
@@ -256,10 +276,10 @@ public class ItMetaStorageWatchTest {
      * Tests that metastorage missed metastorage events are replayed after deploying watches.
      */
     @Test
-    void testReplayUpdates() throws InterruptedException {
+    void testReplayUpdates() throws Exception {
         int numNodes = 3;
 
-        startNodes(numNodes);
+        startCluster(numNodes);
 
         var exactLatch = new CountDownLatch(numNodes);
         var prefixLatch = new CountDownLatch(numNodes);
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/EntryImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/EntryImpl.java
index 123b022779..130c38c58d 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/EntryImpl.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/EntryImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl;
 
 import java.util.Arrays;
 import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.Nullable;
 
@@ -46,9 +47,11 @@ public final class EntryImpl implements Entry {
     private static final long serialVersionUID = 3636551347117181271L;
 
     /** Key. */
+    @IgniteToStringInclude
     private final byte[] key;
 
     /** Value. */
+    @IgniteToStringInclude
     private final byte @Nullable [] val;
 
     /** Revision. */
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 8f0bea21d6..7a0921e64b 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -17,14 +17,11 @@
 
 package org.apache.ignite.internal.metastorage.impl;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
 import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
 import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -32,6 +29,7 @@ import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -52,7 +50,6 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
 import org.apache.ignite.internal.raft.RaftManager;
 import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.Status;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -61,8 +58,6 @@ import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.TopologyEventHandler;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -94,6 +89,8 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
 
     private final ClusterManagementGroupManager cmgMgr;
 
+    private final LogicalTopologyService logicalTopologyService;
+
     /** Meta storage service. */
     private final CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut = new CompletableFuture<>();
 
@@ -123,6 +120,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
             VaultManager vaultMgr,
             ClusterService clusterService,
             ClusterManagementGroupManager cmgMgr,
+            LogicalTopologyService logicalTopologyService,
             RaftManager raftMgr,
             KeyValueStorage storage
     ) {
@@ -130,6 +128,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
         this.clusterService = clusterService;
         this.raftMgr = raftMgr;
         this.cmgMgr = cmgMgr;
+        this.logicalTopologyService = logicalTopologyService;
         this.storage = storage;
     }
 
@@ -153,26 +152,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
                         new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
                         configuration,
                         new MetaStorageListener(storage),
-                        new RaftGroupEventsListener() {
-                            @Override
-                            public void onLeaderElected(long term) {
-                                // TODO: add listener to remove learners when they leave Logical Topology
-                                //  see https://issues.apache.org/jira/browse/IGNITE-18554
-
-                                registerTopologyEventListener();
-
-                                // Update the configuration immediately in case we missed some updates.
-                                addLearners(clusterService.topologyService().allMembers());
-                            }
-
-                            @Override
-                            public void onNewPeersConfigurationApplied(PeersAndLearners configuration) {
-                            }
-
-                            @Override
-                            public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {
-                            }
-                        }
+                        new MetaStorageRaftGroupEventsListener(busyLock, clusterService, logicalTopologyService, metaStorageSvcFut)
                 );
             } else {
                 PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
@@ -195,88 +175,6 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
         return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(raftService, busyLock, thisNode));
     }
 
-    private void registerTopologyEventListener() {
-        clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
-            @Override
-            public void onAppeared(ClusterNode member) {
-                addLearners(List.of(member));
-            }
-
-            @Override
-            public void onDisappeared(ClusterNode member) {
-                metaStorageSvcFut.thenAccept(service -> isCurrentNodeLeader(service.raftGroupService())
-                        .thenAccept(isLeader -> {
-                            if (isLeader) {
-                                service.closeCursors(member.id());
-                            }
-                        }));
-            }
-        });
-    }
-
-    private void addLearners(Collection<ClusterNode> nodes) {
-        if (!busyLock.enterBusy()) {
-            LOG.info("Skipping Meta Storage configuration update because the node is stopping");
-
-            return;
-        }
-
-        try {
-            metaStorageSvcFut
-                    .thenApply(MetaStorageServiceImpl::raftGroupService)
-                    .thenCompose(raftService -> isCurrentNodeLeader(raftService)
-                            .thenCompose(isLeader -> isLeader ? addLearners(raftService, nodes) : completedFuture(null)))
-                    .whenComplete((v, e) -> {
-                        if (e != null) {
-                            LOG.error("Unable to change peers on topology update", e);
-                        }
-                    });
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    private CompletableFuture<Void> addLearners(RaftGroupService raftService, Collection<ClusterNode> nodes) {
-        if (!busyLock.enterBusy()) {
-            LOG.info("Skipping Meta Storage configuration update because the node is stopping");
-
-            return completedFuture(null);
-        }
-
-        try {
-            Set<String> peers = raftService.peers().stream()
-                    .map(Peer::consistentId)
-                    .collect(toSet());
-
-            Set<String> learners = nodes.stream()
-                    .map(ClusterNode::name)
-                    .filter(name -> !peers.contains(name))
-                    .collect(toSet());
-
-            if (learners.isEmpty()) {
-                return completedFuture(null);
-            }
-
-            if (LOG.isInfoEnabled()) {
-                LOG.info("New Meta Storage learners detected: " + learners);
-            }
-
-            PeersAndLearners newConfiguration = PeersAndLearners.fromConsistentIds(peers, learners);
-
-            return raftService.addLearners(newConfiguration.learners());
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    private CompletableFuture<Boolean> isCurrentNodeLeader(RaftGroupService raftService) {
-        String name = clusterService.topologyService().localMember().name();
-
-        return raftService.refreshLeader()
-                .thenApply(v -> raftService.leader().consistentId().equals(name));
-    }
-
-    /** {@inheritDoc} */
     @Override
     public void start() {
         this.appliedRevision = readAppliedRevision().join();
@@ -304,7 +202,6 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
                 });
     }
 
-    /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
         if (!isStopped.compareAndSet(false, true)) {
@@ -404,7 +301,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
      *
      * @see MetaStorageService#getAll(Set, long)
      */
-    public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound) {
+    public CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(new NodeStoppingException());
         }
@@ -421,7 +318,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
      *
      * @see MetaStorageService#put(ByteArray, byte[])
      */
-    public @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte[] val) {
+    public CompletableFuture<Void> put(ByteArray key, byte[] val) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(new NodeStoppingException());
         }
@@ -438,7 +335,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
      *
      * @see MetaStorageService#getAndPut(ByteArray, byte[])
      */
-    public @NotNull CompletableFuture<Entry> getAndPut(@NotNull ByteArray key, byte[] val) {
+    public CompletableFuture<Entry> getAndPut(ByteArray key, byte[] val) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(new NodeStoppingException());
         }
@@ -455,7 +352,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
      *
      * @see MetaStorageService#putAll(Map)
      */
-    public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) {
+    public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(new NodeStoppingException());
         }
@@ -472,7 +369,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
      *
      * @see MetaStorageService#getAndPutAll(Map)
      */
-    public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndPutAll(@NotNull Map<ByteArray, byte[]> vals) {
+    public CompletableFuture<Map<ByteArray, Entry>> getAndPutAll(Map<ByteArray, byte[]> vals) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(new NodeStoppingException());
         }
@@ -489,7 +386,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
      *
      * @see MetaStorageService#remove(ByteArray)
      */
-    public @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key) {
+    public CompletableFuture<Void> remove(ByteArray key) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(new NodeStoppingException());
         }
@@ -506,7 +403,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
      *
      * @see MetaStorageService#getAndRemove(ByteArray)
      */
-    public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull ByteArray key) {
+    public CompletableFuture<Entry> getAndRemove(ByteArray key) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(new NodeStoppingException());
         }
@@ -523,7 +420,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
      *
      * @see MetaStorageService#removeAll(Set)
      */
-    public @NotNull CompletableFuture<Void> removeAll(@NotNull Set<ByteArray> keys) {
+    public CompletableFuture<Void> removeAll(Set<ByteArray> keys) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(new NodeStoppingException());
         }
@@ -540,7 +437,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
      *
      * @see MetaStorageService#getAndRemoveAll(Set)
      */
-    public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Set<ByteArray> keys) {
+    public CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(Set<ByteArray> keys) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(new NodeStoppingException());
         }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java
new file mode 100644
index 0000000000..9fe455fff4
--- /dev/null
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toSet;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Raft Group Events listener that registers Logical Topology listener for updating the list of Meta Storage Raft group listeners.
+ */
+public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListener {
+    private static final IgniteLogger LOG = Loggers.forClass(MetaStorageManagerImpl.class);
+
+    private final IgniteSpinBusyLock busyLock;
+
+    private final String nodeName;
+
+    private final LogicalTopologyService logicalTopologyService;
+
+    private final CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut;
+
+    /**
+     * Future required to enable serialized processing of topology events.
+     *
+     * <p>While Logical Topology events are linearized, we usually start a bunch of async operations inside the event listener and need
+     * them to finish in the same order.
+     *
+     * <p>Multi-threaded access is guarded by {@code serializationFutureMux}.
+     */
+    private CompletableFuture<Void> serializationFuture = null;
+
+    private final Object serializationFutureMux = new Object();
+
+    MetaStorageRaftGroupEventsListener(
+            IgniteSpinBusyLock busyLock,
+            ClusterService clusterService,
+            LogicalTopologyService logicalTopologyService,
+            CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut
+    ) {
+        this.busyLock = busyLock;
+        this.nodeName = clusterService.localConfiguration().getName();
+        this.logicalTopologyService = logicalTopologyService;
+        this.metaStorageSvcFut = metaStorageSvcFut;
+    }
+
+    @Override
+    public void onLeaderElected(long term) {
+        synchronized (serializationFutureMux) {
+            registerTopologyEventListeners();
+
+            // Update learner configuration (in case we missed some topology updates) and initialize the serialization future.
+            serializationFuture = executeIfLeaderImpl(this::resetLearners);
+        }
+    }
+
+    private void registerTopologyEventListeners() {
+        logicalTopologyService.addEventListener(new LogicalTopologyEventListener() {
+            @Override
+            public void onNodeValidated(ClusterNode validatedNode) {
+                executeIfLeader((service, term) -> addLearner(service.raftGroupService(), validatedNode));
+            }
+
+            @Override
+            public void onNodeInvalidated(ClusterNode invalidatedNode) {
+                executeIfLeader((service, term) -> {
+                    CompletableFuture<Void> closeCursorsFuture = service.closeCursors(invalidatedNode.id())
+                            .exceptionally(e -> {
+                                LOG.error("Unable to close cursor for " + invalidatedNode, e);
+
+                                return null;
+                            });
+
+                    CompletableFuture<Void> removeLearnersFuture = removeLearner(service.raftGroupService(), invalidatedNode);
+
+                    return allOf(closeCursorsFuture, removeLearnersFuture);
+                });
+            }
+
+            @Override
+            public void onNodeLeft(ClusterNode leftNode, LogicalTopologySnapshot newTopology) {
+                onNodeInvalidated(leftNode);
+            }
+
+            @Override
+            public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
+                executeIfLeader(MetaStorageRaftGroupEventsListener.this::resetLearners);
+            }
+        });
+    }
+
+    @FunctionalInterface
+    private interface OnLeaderAction {
+        CompletableFuture<Void> apply(MetaStorageServiceImpl service, long term);
+    }
+
+    /**
+     * Executes the given action if the current node is the Meta Storage leader.
+     */
+    private void executeIfLeader(OnLeaderAction action) {
+        if (!busyLock.enterBusy()) {
+            LOG.info("Skipping Meta Storage configuration update because the node is stopping");
+
+            return;
+        }
+
+        try {
+            synchronized (serializationFutureMux) {
+                // We are definitely not a leader if the serialization future has not been initialized.
+                if (serializationFuture == null) {
+                    return;
+                }
+
+                serializationFuture = serializationFuture
+                        // we don't care about exceptions here, they should be logged independently
+                        .handle((v, e) -> executeIfLeaderImpl(action))
+                        .thenCompose(Function.identity());
+            }
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private CompletableFuture<Void> executeIfLeaderImpl(OnLeaderAction action) {
+        return metaStorageSvcFut.thenCompose(service -> service.raftGroupService().refreshAndGetLeaderWithTerm()
+                .thenCompose(leaderWithTerm -> {
+                    String leaderName = leaderWithTerm.leader().consistentId();
+
+                    return leaderName.equals(nodeName) ? action.apply(service, leaderWithTerm.term()) : completedFuture(null);
+                }));
+    }
+
+    private CompletableFuture<Void> addLearner(RaftGroupService raftService, ClusterNode learner) {
+        return updateConfigUnderLock(() -> isPeer(raftService, learner)
+                ? completedFuture(null)
+                : raftService.addLearners(List.of(new Peer(learner.name()))));
+    }
+
+    private static boolean isPeer(RaftGroupService raftService, ClusterNode node) {
+        return raftService.peers().stream().anyMatch(peer -> peer.consistentId().equals(node.name()));
+    }
+
+    private CompletableFuture<Void> removeLearner(RaftGroupService raftService, ClusterNode learner) {
+        return updateConfigUnderLock(() -> logicalTopologyService.validatedNodesOnLeader()
+                .thenCompose(validatedNodes -> updateConfigUnderLock(() -> {
+                    if (isPeer(raftService, learner)) {
+                        return completedFuture(null);
+                    }
+
+                    // Due to possible races, we can have multiple versions of the same node in the validated set. We only remove
+                    // a learner if there are no such versions left.
+                    if (validatedNodes.stream().anyMatch(n -> n.name().equals(learner.name()))) {
+                        return completedFuture(null);
+                    }
+
+                    return raftService.removeLearners(List.of(new Peer(learner.name())));
+                })));
+    }
+
+    private CompletableFuture<Void> resetLearners(MetaStorageServiceImpl service, long term) {
+        return updateConfigUnderLock(() -> logicalTopologyService.validatedNodesOnLeader()
+                .thenCompose(validatedNodes -> updateConfigUnderLock(() -> {
+                    RaftGroupService raftService = service.raftGroupService();
+
+                    Set<String> peers = raftService.peers().stream().map(Peer::consistentId).collect(toSet());
+
+                    Set<String> learners = validatedNodes.stream()
+                            .map(ClusterNode::name)
+                            .filter(name -> !peers.contains(name))
+                            .collect(toSet());
+
+                    PeersAndLearners newPeerConfiguration = PeersAndLearners.fromConsistentIds(peers, learners);
+
+                    // We can't use 'resetLearners' call here because it does not support empty lists of learners.
+                    return raftService.changePeersAsync(newPeerConfiguration, term);
+                })));
+    }
+
+    private CompletableFuture<Void> updateConfigUnderLock(Supplier<CompletableFuture<Void>> action) {
+        if (!busyLock.enterBusy()) {
+            LOG.info("Skipping Meta Storage configuration update because the node is stopping");
+
+            return completedFuture(null);
+        }
+
+        try {
+            return action.get()
+                    .whenComplete((v, e) -> {
+                        if (e != null) {
+                            LOG.error("Unable to change peers on topology update", e);
+                        }
+                    });
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+}
diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index d37dd7e70f..47d16c65cf 100644
--- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -367,12 +367,12 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
     }
 
     @Override
-    public CompletableFuture<Void> removeLearners(List<Peer> learners) {
+    public CompletableFuture<Void> removeLearners(Collection<Peer> learners) {
         return raftClient.removeLearners(learners);
     }
 
     @Override
-    public CompletableFuture<Void> resetLearners(List<Peer> learners) {
+    public CompletableFuture<Void> resetLearners(Collection<Peer> learners) {
         return raftClient.resetLearners(learners);
     }
 
diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
index 6647bfeb5c..fe9be4c453 100644
--- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
+++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
@@ -33,7 +33,7 @@ public interface RaftGroupEventsListener {
      *
      * @param configuration New Raft group configuration.
      */
-    void onNewPeersConfigurationApplied(PeersAndLearners configuration);
+    default void onNewPeersConfigurationApplied(PeersAndLearners configuration) {}
 
     /**
      * Invoked on the leader if membership reconfiguration failed, because of {@link Status}.
@@ -42,22 +42,10 @@ public interface RaftGroupEventsListener {
      * @param configuration Configuration that failed to be applied.
      * @param term Raft term of the current leader.
      */
-    void onReconfigurationError(Status status, PeersAndLearners configuration, long term);
+    default void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {}
 
     /**
      * No-op raft group events listener.
      */
-    RaftGroupEventsListener noopLsnr = new RaftGroupEventsListener() {
-        /** {@inheritDoc} */
-        @Override
-        public void onLeaderElected(long term) { }
-
-        /** {@inheritDoc} */
-        @Override
-        public void onNewPeersConfigurationApplied(PeersAndLearners configuration) { }
-
-        /** {@inheritDoc} */
-        @Override
-        public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {}
-    };
+    RaftGroupEventsListener noopLsnr = term -> {};
 }
diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
index 72eeaa453d..e86f83e994 100644
--- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
+++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
@@ -181,10 +181,10 @@ public interface RaftGroupService {
      *
      * <p>This operation is executed on a group leader.
      *
-     * @param learners List of learners.
+     * @param learners Collection of learners.
      * @return A future.
      */
-    CompletableFuture<Void> removeLearners(List<Peer> learners);
+    CompletableFuture<Void> removeLearners(Collection<Peer> learners);
 
     /**
      * Set learners of the raft group to needed list of learners.
@@ -194,10 +194,10 @@ public interface RaftGroupService {
      *
      * <p>This operation is executed on a group leader.
      *
-     * @param learners List of learners.
+     * @param learners Collection of learners.
      * @return A future.
      */
-    CompletableFuture<Void> resetLearners(List<Peer> learners);
+    CompletableFuture<Void> resetLearners(Collection<Peer> learners);
 
     /**
      * Takes a state machine snapshot on a given group peer.
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index b806120105..b5156316fd 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -360,7 +360,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     }
 
     @Override
-    public CompletableFuture<Void> removeLearners(List<Peer> learners) {
+    public CompletableFuture<Void> removeLearners(Collection<Peer> learners) {
         Peer leader = this.leader;
 
         if (leader == null) {
@@ -378,7 +378,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
     }
 
     @Override
-    public CompletableFuture<Void> resetLearners(List<Peer> learners) {
+    public CompletableFuture<Void> resetLearners(Collection<Peer> learners) {
         Peer leader = this.leader;
 
         if (leader == null) {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index fa44708e99..a107f23c40 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManag
 import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
 import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
 import org.apache.ignite.internal.configuration.storage.ConfigurationStorageListener;
 import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -127,14 +128,14 @@ public class ItDistributedConfigurationPropertiesTest {
             raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl());
 
             var clusterStateStorage = new TestClusterStateStorage();
-            var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage);
+            var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
 
             cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
                     clusterService,
                     raftManager,
                     clusterStateStorage,
-                    logicalTopologyService,
+                    logicalTopology,
                     clusterManagementConfiguration
             );
 
@@ -142,6 +143,7 @@ public class ItDistributedConfigurationPropertiesTest {
                     vaultManager,
                     clusterService,
                     cmgManager,
+                    new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
                     raftManager,
                     new SimpleInMemoryKeyValueStorage(name())
             );
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 791df8a8cb..7e14edb256 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManag
 import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
 import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -101,14 +102,14 @@ public class ItDistributedConfigurationStorageTest {
             raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl());
 
             var clusterStateStorage = new TestClusterStateStorage();
-            var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage);
+            var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
 
             cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
                     clusterService,
                     raftManager,
                     clusterStateStorage,
-                    logicalTopologyService,
+                    logicalTopology,
                     clusterManagementConfiguration
             );
 
@@ -116,6 +117,7 @@ public class ItDistributedConfigurationStorageTest {
                     vaultManager,
                     clusterService,
                     cmgManager,
+                    new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
                     raftManager,
                     new SimpleInMemoryKeyValueStorage(name())
             );
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 13dd06aa18..a809d22747 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManag
 import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
 import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.NodeBootstrapConfiguration;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -642,14 +643,14 @@ public class ItRebalanceDistributedTest {
             txManager = new TxManagerImpl(replicaSvc, lockManager, hybridClock);
 
             var clusterStateStorage = new TestClusterStateStorage();
-            var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage);
+            var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
 
             cmgManager = new ClusterManagementGroupManager(
                     vaultManager,
                     clusterService,
                     raftManager,
                     clusterStateStorage,
-                    logicalTopologyService,
+                    logicalTopology,
                     clusterManagementConfiguration
             );
 
@@ -659,6 +660,7 @@ public class ItRebalanceDistributedTest {
                     vaultManager,
                     clusterService,
                     cmgManager,
+                    new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
                     raftManager,
                     testInfo.getTestMethod().get().isAnnotationPresent(UseRocksMetaStorage.class)
                             ? new RocksDbKeyValueStorage(nodeName, resolveDir(dir, "metaStorage"))
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 6b5675902e..71e53c799e 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManag
 import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
 import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.ConfigurationModule;
 import org.apache.ignite.internal.configuration.ConfigurationModules;
@@ -269,14 +270,14 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
 
         var clusterStateStorage = new RocksDbClusterStateStorage(dir.resolve("cmg"));
 
-        var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage);
+        var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
 
         var cmgManager = new ClusterManagementGroupManager(
                 vault,
                 clusterSvc,
                 raftMgr,
                 clusterStateStorage,
-                logicalTopologyService,
+                logicalTopology,
                 clusterManagementConfiguration
         );
 
@@ -284,6 +285,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
                 vault,
                 clusterSvc,
                 cmgManager,
+                new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
                 raftMgr,
                 new RocksDbKeyValueStorage(name, dir.resolve("metastorage"))
         );
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index c7f1b0247d..afe6311409 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -349,6 +349,7 @@ public class IgniteImpl implements Ignite {
                 vaultMgr,
                 clusterSvc,
                 cmgMgr,
+                logicalTopologyService,
                 raftMgr,
                 new RocksDbKeyValueStorage(name, workDir.resolve(METASTORAGE_DB_PATH))
         );