You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/12/07 09:28:08 UTC

[ignite-3] branch main updated: IGNITE-18085 Implement logical topology events subscription and notification (#1403)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8eb3ae2e7c IGNITE-18085 Implement logical topology events subscription and notification (#1403)
8eb3ae2e7c is described below

commit 8eb3ae2e7cc7dc72ad0f1a5390c479ad843d1a38
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Wed Dec 7 13:28:02 2022 +0400

    IGNITE-18085 Implement logical topology events subscription and notification (#1403)
---
 .../network/LogicalTopologyEventListener.java      |  71 +++++
 .../ignite/network/LogicalTopologyService.java     |  61 ++++
 .../ignite/network}/LogicalTopologySnapshot.java   |  47 +--
 .../ignite/network/TopologyEventHandler.java       |   8 +-
 .../org/apache/ignite/network/TopologyService.java |   8 +-
 .../internal/cluster/management/MockNode.java      |   2 +-
 .../management/raft/ItCmgRaftServiceTest.java      |   2 +-
 .../management/ClusterManagementGroupManager.java  |   2 +-
 .../management/raft/CmgRaftGroupListener.java      |   8 +-
 .../cluster/management/raft/CmgRaftService.java    |   2 +-
 .../raft/responses/LogicalTopologyResponse.java    |   2 +-
 .../management/rest/TopologyController.java        |   2 +-
 .../management/topology/LogicalTopology.java       |  28 +-
 .../management/topology/LogicalTopologyImpl.java   | 140 ++++++++-
 .../topology/LogicalTopologyServiceImpl.java       |  53 ++++
 .../management/raft/CmgRaftGroupListenerTest.java  |  25 +-
 .../topology/LogicalTopologyImplTest.java          | 338 ++++++++++++++++++---
 .../topology/LogicalTopologyServiceImplTest.java   |  71 +++++
 .../internal/AbstractClusterIntegrationTest.java   | 106 +++++--
 .../ignite/internal/compute/ItComputeTest.java     |   2 +-
 .../internal/compute/ItLogicalTopologyTest.java    | 133 ++++++++
 .../inmemory/ItRaftStorageVolatilityTest.java      |   2 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  15 +-
 23 files changed, 991 insertions(+), 137 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologyEventListener.java b/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologyEventListener.java
new file mode 100644
index 0000000000..d20bd34420
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologyEventListener.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+/**
+ * Listens to events related to logical topology changes.
+ *
+ * <p>Event listeners are not guaranteed to see events they receive being consistent with the state acquired from
+ * {@link LogicalTopologyService#logicalTopologyOnLeader()} (or local logical topology state obtained in other means). This means that,
+ * if you get an event with topology version N, event listener might see version M greater than N if it
+ * tries to get current logical topology from the CMG leader, or it might see version M less or greater than N if it
+ * tries to get current local topology from the local storage.
+ *
+ * <p>Event listener methods must return as quickly as possible. If some heavy processing, blocking I/O or waiting
+ * for a future has to be done, this should be offloaded to another thread.
+ *
+ * <p>While an event listener is registered, it is guaranteed to get all logical topology events, in the correct order.
+ * While an event listener is NOT registereed (for instance, when its node is restarting), events are skipped. This
+ * means that, if an Ignite node is restarting, it might miss some logical topology events.
+ * {@link LogicalTopologyService#logicalTopologyOnLeader()} should be used to catch up.
+ */
+public interface LogicalTopologyEventListener {
+    /**
+     * Called when a new member has joined a cluster's logical topology.
+     *
+     * @param appearedNode Appeared cluster member.
+     * @param newTopology Topology in which the member has joined.
+     */
+    default void onAppeared(ClusterNode appearedNode, LogicalTopologySnapshot newTopology) {
+        // No-op.
+    }
+
+    /**
+     * Called when a member has left a cluster's logical topology.
+     *
+     * @param disappearedNode Disappeared cluster member.
+     * @param newTopology Topology in which the member has disappeared.
+     */
+    default void onDisappeared(ClusterNode disappearedNode, LogicalTopologySnapshot newTopology) {
+        // No-op.
+    }
+
+    /**
+     * Called when a topology changed in a way that cannot be interpreted as a sequence of 'appeared'/'disappeared' events,
+     * so the change happens as a leap forward. This is mutually exclusive with {@link #onAppeared(ClusterNode, LogicalTopologySnapshot)}
+     * and {@link #onDisappeared(ClusterNode, LogicalTopologySnapshot)}.
+     *
+     * <p>This happens rarely (in a well-configured system this should never happen), for instance, when a Cluster Management
+     * RAFT Group sends its update by installing a RAFT snapshot instead of normal AppendEntries.
+     *
+     * @param newTopology The new logical topology state.
+     */
+    default void onTopologyLeap(LogicalTopologySnapshot newTopology) {
+        // No-op.
+    }
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologyService.java b/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologyService.java
new file mode 100644
index 0000000000..308d0d146e
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologyService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Used for getting information about the cluster's Logical Topology.
+ *
+ * <p>There are 2 kinds of 'topologies': physical (see {@link TopologyService} and logical.
+ * <ul>
+ *     <li>Physical topology consists of nodes mutually discovered by a membership protocol (like SWIM)</li>
+ *     <li>
+ *         Logical topology is a subset of a physical topology and only contains nodes that have successfully
+ *         <a href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-77%3A+Node+Join+Protocol+and+Initialization+for+Ignite+3">
+ *         joined the cluster</a>
+ *     </li>
+ * </ul>
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-77%3A+Node+Join+Protocol+and+Initialization+for+Ignite+3">
+ *     IEP-77: Node Join Protocol and Initialization for Ignite 3</a>
+ * @see TopologyService
+ */
+public interface LogicalTopologyService {
+    /**
+     * Adds a listener for logical topology events.
+     *
+     * @param listener Listener to add.
+     * @see LogicalTopologyEventListener
+     */
+    void addEventListener(LogicalTopologyEventListener listener);
+
+    /**
+     * Removes a listener for logical topology events.
+     *
+     * @param listener Listener to remove.
+     */
+    void removeEventListener(LogicalTopologyEventListener listener);
+
+    /**
+     * Returns a future that, when complete, resolves into a logical topology snapshot as the CMG leader sees it.
+     *
+     * @return Future that, when complete, resolves into a logical topology snapshot from the point of view of the CMG leader.
+     */
+    CompletableFuture<LogicalTopologySnapshot> logicalTopologyOnLeader();
+}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologySnapshot.java b/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologySnapshot.java
similarity index 56%
rename from modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologySnapshot.java
rename to modules/api/src/main/java/org/apache/ignite/network/LogicalTopologySnapshot.java
index 635faba0ea..66ed67214f 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologySnapshot.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologySnapshot.java
@@ -15,36 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.cluster.management.topology;
+package org.apache.ignite.network;
 
 import static java.util.Collections.emptySet;
-import static java.util.function.Function.identity;
-import static java.util.stream.Collectors.toMap;
 
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.Map;
 import java.util.Set;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.network.ClusterNode;
 
 /**
  * A snapshot of a logical topology as seen locally. Includes nodes participating in the logical topology and the version
  * of the topology (that gets incremented on each update to the topology).
+ *
+ * <p>Instances of this class are immutable.
  */
 public class LogicalTopologySnapshot implements Serializable {
     private static final long serialVersionUID = 0L;
 
-    /** Initial 'topology' for an empty cluster (before any node joined). */
-    static final LogicalTopologySnapshot INITIAL = new LogicalTopologySnapshot(0, emptySet());
+    /** Initial 'topology' for an empty cluster (before any node has joined). */
+    public static final LogicalTopologySnapshot INITIAL = new LogicalTopologySnapshot(0, emptySet());
 
     private final long version;
 
     @IgniteToStringInclude
     private final Set<ClusterNode> nodes;
 
-    private LogicalTopologySnapshot(long version, Collection<ClusterNode> nodes) {
+    public LogicalTopologySnapshot(long version, Collection<ClusterNode> nodes) {
         this.version = version;
         this.nodes = Set.copyOf(nodes);
     }
@@ -63,39 +61,6 @@ public class LogicalTopologySnapshot implements Serializable {
         return nodes;
     }
 
-    LogicalTopologySnapshot addNode(ClusterNode nodeToAdd) {
-        Map<String, ClusterNode> map = nodes.stream().collect(toMap(ClusterNode::name, identity()));
-
-        ClusterNode oldNode = map.put(nodeToAdd.name(), nodeToAdd);
-        if (oldNode != null && oldNode.id().equals(nodeToAdd.id())) {
-            // We already have this node, nothing needs to be changed.
-            return this;
-        }
-
-        return new LogicalTopologySnapshot(version + 1, map.values());
-    }
-
-    LogicalTopologySnapshot removeNodesByIds(Set<ClusterNode> nodesToRemove) {
-        Map<String, ClusterNode> mapById = nodes.stream().collect(toMap(ClusterNode::id, identity()));
-
-        int originalSize = mapById.size();
-
-        for (ClusterNode nodeToRemove : nodesToRemove) {
-            mapById.remove(nodeToRemove.id());
-        }
-
-        if (mapById.size() == originalSize) {
-            // Nothing was actually removed.
-            return this;
-        }
-
-        return new LogicalTopologySnapshot(version + 1, mapById.values());
-    }
-
-    boolean containsNodeById(ClusterNode needle) {
-        return nodes.stream().anyMatch(node -> node.id().equals(needle.id()));
-    }
-
     @Override
     public String toString() {
         return S.toString(LogicalTopologySnapshot.class, this);
diff --git a/modules/api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java b/modules/api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
index 9a4f89906d..2949331090 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
@@ -18,11 +18,11 @@
 package org.apache.ignite.network;
 
 /**
- * Interface for handling events related to topology changes.
+ * Interface for handling events related to physical topology changes.
  */
 public interface TopologyEventHandler {
     /**
-     * Called when a new member has been detected joining a cluster.
+     * Called when a new member has been detected joining a cluster's physical topology.
      *
      * @param member Appeared cluster member.
      */
@@ -31,8 +31,8 @@ public interface TopologyEventHandler {
     }
 
     /**
-     * Indicates that a member has left a cluster. This method is only called when a member leaves permanently (i.e. it is not possible to
-     * re-establish a connection to it).
+     * Indicates that a member has left a cluster's physical topology. This method is only called when a member leaves permanently
+     * (i.e. it is not possible to re-establish a connection to it).
      *
      * @param member Disappeared cluster member.
      */
diff --git a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
index bd6b442276..99c5d7dc9c 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
@@ -21,7 +21,7 @@ import java.util.Collection;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Entry point for obtaining information about a cluster's topology.
+ * Entry point for obtaining information about a cluster's physical topology.
  */
 // TODO: allow removing event handlers, see https://issues.apache.org/jira/browse/IGNITE-14519
 public interface TopologyService {
@@ -40,14 +40,14 @@ public interface TopologyService {
     Collection<ClusterNode> allMembers();
 
     /**
-     * Registers a handler for topology change events.
+     * Registers a handler for physical topology change events.
      *
-     * @param handler Topology events handler.
+     * @param handler Physical topology events handler.
      */
     void addEventHandler(TopologyEventHandler handler);
 
     /**
-     * Returns a cluster node by its network address in host:port format.
+     * Returns a cluster node by its network address.
      *
      * @param addr The address.
      * @return The node or {@code null} if the node has not yet been discovered or is offline.
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 358811a51c..44c63a17f0 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
-import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySnapshot;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.raft.Loza;
@@ -36,6 +35,7 @@ import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.LogicalTopologySnapshot;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NodeFinder;
 import org.apache.ignite.utils.ClusterServiceTestUtils;
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 03cf4c53d2..cf871112a9 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -47,7 +47,6 @@ import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyComm
 import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
-import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySnapshot;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -63,6 +62,7 @@ import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.LogicalTopologySnapshot;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NodeFinder;
 import org.apache.ignite.network.StaticNodeFinder;
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 3fc97d55fd..c52d4ad20c 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.cluster.management.raft.JoinDeniedException;
 import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
-import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySnapshot;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
@@ -67,6 +66,7 @@ import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.LogicalTopologySnapshot;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.TopologyEventHandler;
 import org.apache.ignite.network.TopologyService;
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index 054128dcd1..f58f332f85 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -158,7 +158,7 @@ public class CmgRaftGroupListener implements RaftGroupListener {
         ClusterNode node = command.node().asClusterNode();
 
         if (validationManager.isNodeValidated(node)) {
-            logicalTopology.putLogicalTopologyNode(node);
+            logicalTopology.putNode(node);
 
             LOG.info("Node added to the logical topology [node={}]", node.name());
 
@@ -173,7 +173,7 @@ public class CmgRaftGroupListener implements RaftGroupListener {
     private void removeNodesFromLogicalTopology(NodesLeaveCommand command) {
         Set<ClusterNode> nodes = command.nodes().stream().map(ClusterNodeMessage::asClusterNode).collect(Collectors.toSet());
 
-        logicalTopology.removeLogicalTopologyNodes(nodes);
+        logicalTopology.removeNodes(nodes);
 
         if (LOG.isInfoEnabled()) {
             LOG.info("Nodes removed from the logical topology [nodes={}]", nodes.stream().map(ClusterNode::name).collect(toList()));
@@ -191,9 +191,11 @@ public class CmgRaftGroupListener implements RaftGroupListener {
         try {
             storage.restoreSnapshot(path);
 
+            logicalTopology.fireTopologyLeap();
+
             return true;
         } catch (IgniteInternalException e) {
-            LOG.debug("Failed to restore snapshot [path={}]", path, e);
+            LOG.error("Failed to restore snapshot [path={}]", path, e);
 
             return false;
         }
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
index 4863182b26..26845a3cd1 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.cluster.management.raft.commands.NodesLeaveCom
 import org.apache.ignite.internal.cluster.management.raft.responses.LogicalTopologyResponse;
 import org.apache.ignite.internal.cluster.management.raft.responses.ValidationErrorResponse;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
-import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySnapshot;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.properties.IgniteProductVersion;
@@ -42,6 +41,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.LogicalTopologySnapshot;
 
 /**
  * A wrapper around a {@link RaftGroupService} providing helpful methods for working with the CMG.
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/responses/LogicalTopologyResponse.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/responses/LogicalTopologyResponse.java
index a5251bc558..79724c666f 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/responses/LogicalTopologyResponse.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/responses/LogicalTopologyResponse.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.cluster.management.raft.responses;
 
 import java.io.Serializable;
-import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySnapshot;
+import org.apache.ignite.network.LogicalTopologySnapshot;
 
 /**
  * Response containing the current logical topology.
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java
index 773496fdaf..a2b665cc8b 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java
@@ -24,13 +24,13 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySnapshot;
 import org.apache.ignite.internal.rest.api.cluster.ClusterNodeDto;
 import org.apache.ignite.internal.rest.api.cluster.NetworkAddressDto;
 import org.apache.ignite.internal.rest.api.cluster.NodeMetadataDto;
 import org.apache.ignite.internal.rest.api.cluster.TopologyApi;
 import org.apache.ignite.internal.rest.exception.ClusterNotInitializedException;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.LogicalTopologySnapshot;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NodeMetadata;
 import org.apache.ignite.network.TopologyService;
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopology.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopology.java
index f77eb84f00..2b9df4b26f 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopology.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopology.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.cluster.management.topology;
 
 import java.util.Set;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.LogicalTopologyEventListener;
+import org.apache.ignite.network.LogicalTopologySnapshot;
 
 /**
  * Used to manage logical topology information available locally on the current node.
@@ -26,7 +28,7 @@ import org.apache.ignite.network.ClusterNode;
  */
 public interface LogicalTopology {
     /**
-     * Retrieves the current logical topology snapshot.
+     * Retrieves the current logical topology snapshot stored in the local storage.
      */
     LogicalTopologySnapshot getLogicalTopology();
 
@@ -35,17 +37,37 @@ public interface LogicalTopology {
      *
      * @param node Node to put.
      */
-    void putLogicalTopologyNode(ClusterNode node);
+    void putNode(ClusterNode node);
 
     /**
      * Removes given nodes from the logical topology.
      *
      * @param nodes Nodes to remove.
      */
-    void removeLogicalTopologyNodes(Set<ClusterNode> nodes);
+    void removeNodes(Set<ClusterNode> nodes);
 
     /**
      * Returns {@code true} if a given node is present in the logical topology or {@code false} otherwise.
      */
     boolean isNodeInLogicalTopology(ClusterNode node);
+
+    /**
+     * Adds a listener for logical topology events.
+     *
+     * @param listener Listener to add.
+     */
+    void addEventListener(LogicalTopologyEventListener listener);
+
+    /**
+     * Removes a listener for logical topology events.
+     *
+     * @param listener Listener to remove.
+     */
+    void removeEventListener(LogicalTopologyEventListener listener);
+
+    /**
+     * Causes {@link LogicalTopologyEventListener#onTopologyLeap(LogicalTopologySnapshot)} to be fired with the topology snapshot
+     * currently stored in an underlying storage. Invoked after the storage has been restored from a snapshot.
+     */
+    void fireTopologyLeap();
 }
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
index 54dd8003e2..3c969eaaa8 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
@@ -18,22 +18,38 @@
 package org.apache.ignite.internal.cluster.management.topology;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Comparator.comparing;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
 import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
 import static org.apache.ignite.internal.util.ByteUtils.toBytes;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.LogicalTopologyEventListener;
+import org.apache.ignite.network.LogicalTopologySnapshot;
 
 /**
  * Implementation of {@link LogicalTopology}.
  */
 public class LogicalTopologyImpl implements LogicalTopology {
+    private static final IgniteLogger LOG = Loggers.forClass(LogicalTopologyImpl.class);
+
     /** Storage key for the logical topology. */
     private static final byte[] LOGICAL_TOPOLOGY_KEY = "logical".getBytes(UTF_8);
 
     private final ClusterStateStorage storage;
 
+    private final List<LogicalTopologyEventListener> listeners = new CopyOnWriteArrayList<>();
+
     public LogicalTopologyImpl(ClusterStateStorage storage) {
         this.storage = storage;
     }
@@ -50,21 +66,131 @@ public class LogicalTopologyImpl implements LogicalTopology {
     }
 
     @Override
-    public void putLogicalTopologyNode(ClusterNode node) {
-        replaceLogicalTopologyWith(readLogicalTopology().addNode(node));
+    public void putNode(ClusterNode nodeToPut) {
+        LogicalTopologySnapshot snapshot = readLogicalTopology();
+
+        Map<String, ClusterNode> mapByName = snapshot.nodes().stream()
+                .collect(toMap(ClusterNode::name, identity()));
+
+        Runnable fireRemovalTask = null;
+
+        ClusterNode oldNode = mapByName.remove(nodeToPut.name());
+
+        if (oldNode != null) {
+            if (oldNode.id().equals(nodeToPut.id())) {
+                // We already have this node, nothing needs to be changed.
+                return;
+            }
+
+            // This is an update. First simulate disappearance, then appearance will be fired.
+            snapshot = new LogicalTopologySnapshot(snapshot.version() + 1, mapByName.values());
+
+            LogicalTopologySnapshot snapshotAfterRemoval = snapshot;
+            fireRemovalTask = () -> fireDisappeared(oldNode, snapshotAfterRemoval);
+        }
+
+        mapByName.put(nodeToPut.name(), nodeToPut);
+
+        snapshot = new LogicalTopologySnapshot(snapshot.version() + 1, mapByName.values());
+
+        // Only save to storage once per call so that our writes to storage are atomic and we don't end up in a situation
+        // when different CMG listener instances produce different sequences of topology snapshots.
+        saveSnapshotToStorage(snapshot);
+
+        if (fireRemovalTask != null) {
+            fireRemovalTask.run();
+        }
+        fireAppeared(nodeToPut, snapshot);
     }
 
-    private void replaceLogicalTopologyWith(LogicalTopologySnapshot newTopology) {
+    private void saveSnapshotToStorage(LogicalTopologySnapshot newTopology) {
         storage.put(LOGICAL_TOPOLOGY_KEY, toBytes(newTopology));
     }
 
     @Override
-    public void removeLogicalTopologyNodes(Set<ClusterNode> nodes) {
-        replaceLogicalTopologyWith(readLogicalTopology().removeNodesByIds(nodes));
+    public void removeNodes(Set<ClusterNode> nodesToRemove) {
+        LogicalTopologySnapshot snapshot = readLogicalTopology();
+
+        Map<String, ClusterNode> mapById = snapshot.nodes().stream()
+                .collect(toMap(ClusterNode::id, identity()));
+
+        // Removing in a well-defined order to make sure that a command produces an identical sequence of events in each CMG listener.
+        List<ClusterNode> sortedNodesToRemove = nodesToRemove.stream()
+                .sorted(comparing(ClusterNode::id))
+                .collect(toList());
+
+        List<Runnable> fireTasks = new ArrayList<>();
+
+        for (ClusterNode nodeToRemove : sortedNodesToRemove) {
+            ClusterNode removedNode = mapById.remove(nodeToRemove.id());
+
+            if (removedNode != null) {
+                snapshot = new LogicalTopologySnapshot(snapshot.version() + 1, mapById.values());
+
+                LogicalTopologySnapshot finalSnapshot = snapshot;
+                fireTasks.add(() -> fireDisappeared(nodeToRemove, finalSnapshot));
+            }
+        }
+
+        // Only save to storage once per call so that our writes to storage are atomic and we don't end up in a situation
+        // when different CMG listener instances produce different sequences of topology snapshots.
+        saveSnapshotToStorage(snapshot);
+
+        fireTasks.forEach(Runnable::run);
+    }
+
+    @Override
+    public boolean isNodeInLogicalTopology(ClusterNode needle) {
+        return readLogicalTopology().nodes().stream()
+                .anyMatch(node -> node.id().equals(needle.id()));
+    }
+
+    private void fireAppeared(ClusterNode appearedNode, LogicalTopologySnapshot snapshot) {
+        for (LogicalTopologyEventListener listener : listeners) {
+            try {
+                listener.onAppeared(appearedNode, snapshot);
+            } catch (Throwable e) {
+                logAndRethrowIfError(e, "Failure while notifying onAppear() listener {}", listener);
+            }
+        }
+    }
+
+    private void fireDisappeared(ClusterNode oldNode, LogicalTopologySnapshot snapshot) {
+        for (LogicalTopologyEventListener listener : listeners) {
+            try {
+                listener.onDisappeared(oldNode, snapshot);
+            } catch (Throwable e) {
+                logAndRethrowIfError(e, "Failure while notifying onDisappear() listener {}", listener);
+            }
+        }
+    }
+
+    @Override
+    public void fireTopologyLeap() {
+        for (LogicalTopologyEventListener listener : listeners) {
+            try {
+                listener.onTopologyLeap(readLogicalTopology());
+            } catch (Throwable e) {
+                logAndRethrowIfError(e, "Failure while notifying onTopologyLeap() listener {}", listener);
+            }
+        }
+    }
+
+    private static void logAndRethrowIfError(Throwable e, String logMessagePattern, LogicalTopologyEventListener listener) {
+        LOG.error(logMessagePattern, e, listener);
+
+        if (e instanceof Error) {
+            throw (Error) e;
+        }
+    }
+
+    @Override
+    public void addEventListener(LogicalTopologyEventListener listener) {
+        listeners.add(listener);
     }
 
     @Override
-    public boolean isNodeInLogicalTopology(ClusterNode node) {
-        return readLogicalTopology().containsNodeById(node);
+    public void removeEventListener(LogicalTopologyEventListener listener) {
+        listeners.remove(listener);
     }
 }
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyServiceImpl.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyServiceImpl.java
new file mode 100644
index 0000000000..58378072d5
--- /dev/null
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyServiceImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.topology;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.network.LogicalTopologyEventListener;
+import org.apache.ignite.network.LogicalTopologyService;
+import org.apache.ignite.network.LogicalTopologySnapshot;
+
+/**
+ * {@link LogicalTopologyService} implementation.
+ */
+public class LogicalTopologyServiceImpl implements LogicalTopologyService {
+    private final LogicalTopology logicalTopology;
+
+    private final ClusterManagementGroupManager clusterManagementGroupManager;
+
+    public LogicalTopologyServiceImpl(LogicalTopology logicalTopology, ClusterManagementGroupManager clusterManagementGroupManager) {
+        this.logicalTopology = logicalTopology;
+        this.clusterManagementGroupManager = clusterManagementGroupManager;
+    }
+
+    @Override
+    public void addEventListener(LogicalTopologyEventListener listener) {
+        logicalTopology.addEventListener(listener);
+    }
+
+    @Override
+    public void removeEventListener(LogicalTopologyEventListener listener) {
+        logicalTopology.removeEventListener(listener);
+    }
+
+    @Override
+    public CompletableFuture<LogicalTopologySnapshot> logicalTopologyOnLeader() {
+        return clusterManagementGroupManager.logicalTopology();
+    }
+}
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
index dd7e6fa5ca..743b96c05d 100644
--- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
@@ -23,12 +23,17 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
 import java.io.Serializable;
+import java.nio.file.Paths;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -38,6 +43,7 @@ import org.apache.ignite.internal.cluster.management.ClusterTag;
 import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
 import org.apache.ignite.internal.cluster.management.raft.commands.ClusterNodeMessage;
 import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
 import org.apache.ignite.internal.properties.IgniteProductVersion;
 import org.apache.ignite.internal.raft.Command;
@@ -51,15 +57,13 @@ import org.junit.jupiter.api.Test;
  * Tests for the {@link CmgRaftGroupListener}.
  */
 public class CmgRaftGroupListenerTest {
-    private final ClusterStateStorage storage = new TestClusterStateStorage();
+    private final ClusterStateStorage storage = spy(new TestClusterStateStorage());
 
     private final LongConsumer onLogicalTopologyChanged = mock(LongConsumer.class);
 
-    private final CmgRaftGroupListener listener = new CmgRaftGroupListener(
-            storage,
-            new LogicalTopologyImpl(storage),
-            onLogicalTopologyChanged
-    );
+    private final LogicalTopology logicalTopology = spy(new LogicalTopologyImpl(storage));
+
+    private final CmgRaftGroupListener listener = new CmgRaftGroupListener(storage, logicalTopology, onLogicalTopologyChanged);
 
     private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
 
@@ -131,6 +135,15 @@ public class CmgRaftGroupListenerTest {
         verify(onLogicalTopologyChanged).accept(anyLong());
     }
 
+    @Test
+    void restoreFromSnapshotTriggersTopologyLeapEvent() {
+        doNothing().when(storage).restoreSnapshot(any());
+
+        assertTrue(listener.onSnapshotLoad(Paths.get("/unused")));
+
+        verify(logicalTopology).fireTopologyLeap();
+    }
+
     private static <T extends Command> Iterator<CommandClosure<T>> iterator(T obj) {
         CommandClosure<T> closure = new CommandClosure<>() {
             @Override
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java
index 8cd59dcbf3..340b0a8e80 100644
--- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.cluster.management.topology;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.stream.Collectors.toList;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
@@ -24,12 +25,24 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertAll;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collection;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -38,26 +51,51 @@ import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorag
 import org.apache.ignite.internal.testframework.WorkDirectory;
 import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.LogicalTopologyEventListener;
+import org.apache.ignite.network.LogicalTopologySnapshot;
 import org.apache.ignite.network.NetworkAddress;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(MockitoExtension.class)
 class LogicalTopologyImplTest {
-    private final ClusterStateStorage storage = new TestClusterStateStorage();
+    private final ClusterStateStorage storage = spy(new TestClusterStateStorage());
 
-    private LogicalTopology topologyService;
+    private LogicalTopology topology;
 
     @WorkDirectory
     protected Path workDir;
 
+    @Mock
+    private LogicalTopologyEventListener listener;
+
+    @Captor
+    private ArgumentCaptor<ClusterNode> nodeCaptor;
+
+    @Captor
+    private ArgumentCaptor<ClusterNode> nodeCaptor2;
+
+    @Captor
+    private ArgumentCaptor<LogicalTopologySnapshot> topologyCaptor;
+
+    @Captor
+    private ArgumentCaptor<LogicalTopologySnapshot> topologyCaptor2;
+
     @BeforeEach
     void setUp() {
         storage.start();
 
-        topologyService = new LogicalTopologyImpl(storage);
+        topology = new LogicalTopologyImpl(storage);
+
+        topology.addEventListener(listener);
     }
 
     @AfterEach
@@ -70,33 +108,33 @@ class LogicalTopologyImplTest {
      */
     @Test
     void testLogicalTopology() {
-        assertThat(topologyService.getLogicalTopology().nodes(), is(empty()));
+        assertThat(topology.getLogicalTopology().nodes(), is(empty()));
 
         var node1 = new ClusterNode("foo", "bar", new NetworkAddress("localhost", 123));
 
-        topologyService.putLogicalTopologyNode(node1);
+        topology.putNode(node1);
 
-        assertThat(topologyService.getLogicalTopology().nodes(), contains(node1));
+        assertThat(topology.getLogicalTopology().nodes(), contains(node1));
 
         var node2 = new ClusterNode("baz", "quux", new NetworkAddress("localhost", 123));
 
-        topologyService.putLogicalTopologyNode(node2);
+        topology.putNode(node2);
 
-        assertThat(topologyService.getLogicalTopology().nodes(), containsInAnyOrder(node1, node2));
+        assertThat(topology.getLogicalTopology().nodes(), containsInAnyOrder(node1, node2));
 
         var node3 = new ClusterNode("lol", "boop", new NetworkAddress("localhost", 123));
 
-        topologyService.putLogicalTopologyNode(node3);
+        topology.putNode(node3);
 
-        assertThat(topologyService.getLogicalTopology().nodes(), containsInAnyOrder(node1, node2, node3));
+        assertThat(topology.getLogicalTopology().nodes(), containsInAnyOrder(node1, node2, node3));
 
-        topologyService.removeLogicalTopologyNodes(Set.of(node1, node2));
+        topology.removeNodes(Set.of(node1, node2));
 
-        assertThat(topologyService.getLogicalTopology().nodes(), contains(node3));
+        assertThat(topology.getLogicalTopology().nodes(), contains(node3));
 
-        topologyService.removeLogicalTopologyNodes(Set.of(node3));
+        topology.removeNodes(Set.of(node3));
 
-        assertThat(topologyService.getLogicalTopology().nodes(), is(empty()));
+        assertThat(topology.getLogicalTopology().nodes(), is(empty()));
     }
 
     /**
@@ -106,24 +144,24 @@ class LogicalTopologyImplTest {
     void testLogicalTopologyIdempotence() {
         var node = new ClusterNode("foo", "bar", new NetworkAddress("localhost", 123));
 
-        topologyService.putLogicalTopologyNode(node);
-        topologyService.putLogicalTopologyNode(node);
+        topology.putNode(node);
+        topology.putNode(node);
 
-        assertThat(topologyService.getLogicalTopology().nodes(), contains(node));
+        assertThat(topology.getLogicalTopology().nodes(), contains(node));
 
-        topologyService.removeLogicalTopologyNodes(Set.of(node));
-        topologyService.removeLogicalTopologyNodes(Set.of(node));
+        topology.removeNodes(Set.of(node));
+        topology.removeNodes(Set.of(node));
 
-        assertThat(topologyService.getLogicalTopology().nodes(), is(empty()));
+        assertThat(topology.getLogicalTopology().nodes(), is(empty()));
     }
 
     @Test
-    void logicalTopologyAdditionUsesNameAsNodeKey() {
-        topologyService.putLogicalTopologyNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+    void additionUsesNameAsNodeKey() {
+        topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
 
-        topologyService.putLogicalTopologyNode(new ClusterNode("id2", "node", new NetworkAddress("host", 1000)));
+        topology.putNode(new ClusterNode("id2", "node", new NetworkAddress("host", 1000)));
 
-        Collection<ClusterNode> topology = topologyService.getLogicalTopology().nodes();
+        Collection<ClusterNode> topology = this.topology.getLogicalTopology().nodes();
 
         assertThat(topology, hasSize(1));
 
@@ -131,25 +169,25 @@ class LogicalTopologyImplTest {
     }
 
     @Test
-    void logicalTopologyRemovalUsesIdAsNodeKey() {
-        topologyService.putLogicalTopologyNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+    void removalUsesIdAsNodeKey() {
+        topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
 
-        topologyService.removeLogicalTopologyNodes(Set.of(new ClusterNode("id2", "node", new NetworkAddress("host", 1000))));
+        topology.removeNodes(Set.of(new ClusterNode("id2", "node", new NetworkAddress("host", 1000))));
 
-        assertThat(topologyService.getLogicalTopology().nodes(), hasSize(1));
-        assertThat((topologyService.getLogicalTopology().nodes()).iterator().next().id(), is("id1"));
+        assertThat(topology.getLogicalTopology().nodes(), hasSize(1));
+        assertThat((topology.getLogicalTopology().nodes()).iterator().next().id(), is("id1"));
 
-        topologyService.removeLogicalTopologyNodes(Set.of(new ClusterNode("id1", "another-name", new NetworkAddress("host", 1000))));
+        topology.removeNodes(Set.of(new ClusterNode("id1", "another-name", new NetworkAddress("host", 1000))));
 
-        assertThat(topologyService.getLogicalTopology().nodes(), is(empty()));
+        assertThat(topology.getLogicalTopology().nodes(), is(empty()));
     }
 
     @Test
     void inLogicalTopologyTestUsesIdAsNodeKey() {
-        topologyService.putLogicalTopologyNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+        topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
 
-        assertTrue(topologyService.isNodeInLogicalTopology(new ClusterNode("id1", "node", new NetworkAddress("host", 1000))));
-        assertFalse(topologyService.isNodeInLogicalTopology(new ClusterNode("another-id", "node", new NetworkAddress("host", 1000))));
+        assertTrue(topology.isNodeInLogicalTopology(new ClusterNode("id1", "node", new NetworkAddress("host", 1000))));
+        assertFalse(topology.isNodeInLogicalTopology(new ClusterNode("another-id", "node", new NetworkAddress("host", 1000))));
     }
 
     @Test
@@ -157,17 +195,243 @@ class LogicalTopologyImplTest {
         Path snapshotDir = workDir.resolve("snapshot");
         Files.createDirectory(snapshotDir);
 
-        topologyService.putLogicalTopologyNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+        topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
 
         storage.snapshot(snapshotDir).get(10, TimeUnit.SECONDS);
 
-        topologyService.putLogicalTopologyNode(new ClusterNode("id2", "another-node", new NetworkAddress("host", 1001)));
+        topology.putNode(new ClusterNode("id2", "another-node", new NetworkAddress("host", 1001)));
 
         storage.restoreSnapshot(snapshotDir);
 
-        List<String> namesInTopology = topologyService.getLogicalTopology().nodes().stream()
+        List<String> namesInTopology = topology.getLogicalTopology().nodes().stream()
                 .map(ClusterNode::name)
                 .collect(toList());
         assertThat(namesInTopology, contains("node"));
     }
+
+    @Test
+    void addingNewNodeProducesAppearedEvent() {
+        topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+        verify(listener).onAppeared(nodeCaptor.capture(), topologyCaptor.capture());
+
+        assertThat(topologyCaptor.getValue().version(), is(1L));
+
+        ClusterNode appearedNode = nodeCaptor.getValue();
+
+        assertThat(appearedNode.id(), is("id1"));
+        assertThat(appearedNode.name(), is("node"));
+    }
+
+    @Test
+    void addingSameExistingNodeProducesNoEvents() {
+        topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+        topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+        verify(listener, times(1)).onAppeared(any(), any());
+    }
+
+    @Test
+    void updatingExistingNodeProducesDisappearedAndAppearedEvents() {
+        topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+        topology.putNode(new ClusterNode("id2", "node", new NetworkAddress("host1", 1001)));
+
+        InOrder inOrder = inOrder(listener);
+
+        inOrder.verify(listener).onDisappeared(nodeCaptor.capture(), topologyCaptor.capture());
+        inOrder.verify(listener).onAppeared(nodeCaptor2.capture(), topologyCaptor2.capture());
+
+        assertThat(topologyCaptor.getValue().version(), is(2L));
+        assertThat(topologyCaptor2.getValue().version(), is(3L));
+
+        ClusterNode disappearedNode = nodeCaptor.getValue();
+
+        assertThat(disappearedNode.id(), is("id1"));
+        assertThat(disappearedNode.name(), is("node"));
+        assertThat(disappearedNode.address(), is(new NetworkAddress("host", 1000)));
+
+        ClusterNode appearedNode = nodeCaptor2.getValue();
+
+        assertThat(appearedNode.id(), is("id2"));
+        assertThat(appearedNode.name(), is("node"));
+        assertThat(appearedNode.address(), is(new NetworkAddress("host1", 1001)));
+    }
+
+    @Test
+    void updatingExistingNodeProducesExactlyOneWriteToDb() {
+        topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+        topology.putNode(new ClusterNode("id2", "node", new NetworkAddress("host1", 1001)));
+
+        // Expecting 2 writes because there are two puts. The test verifies that second put also produces just 1 write.
+        verify(storage, times(2)).put(eq("logical".getBytes(UTF_8)), any());
+    }
+
+    @Test
+    void removingExistingNodeProducesDisappearedEvent() {
+        ClusterNode node = new ClusterNode("id1", "node", new NetworkAddress("host", 1000));
+        topology.putNode(node);
+
+        topology.removeNodes(Set.of(node));
+
+        verify(listener).onDisappeared(nodeCaptor.capture(), topologyCaptor.capture());
+
+        assertThat(topologyCaptor.getValue().version(), is(2L));
+
+        ClusterNode disappearedNode = nodeCaptor.getValue();
+
+        assertThat(disappearedNode.id(), is("id1"));
+        assertThat(disappearedNode.name(), is("node"));
+    }
+
+    @Test
+    void removingNonExistingNodeProducesNoEvents() {
+        ClusterNode node = new ClusterNode("id1", "node", new NetworkAddress("host", 1000));
+
+        topology.removeNodes(Set.of(node));
+
+        verify(listener, never()).onDisappeared(any(), any());
+    }
+
+    @Test
+    void multiRemovalProducesDisappearedEventsInOrderOfNodeIds() {
+        ClusterNode node1 = new ClusterNode("id1", "node", new NetworkAddress("host", 1000));
+        ClusterNode node2 = new ClusterNode("id2", "node2", new NetworkAddress("host2", 1000));
+
+        topology.putNode(node1);
+        topology.putNode(node2);
+
+        topology.removeNodes(new LinkedHashSet<>(List.of(node2, node1)));
+
+        verify(listener, times(2)).onDisappeared(nodeCaptor.capture(), topologyCaptor.capture());
+
+        List<LogicalTopologySnapshot> capturedSnapshots = topologyCaptor.getAllValues();
+
+        assertThat(capturedSnapshots.get(0).version(), is(3L));
+        assertThat(capturedSnapshots.get(1).version(), is(4L));
+
+        ClusterNode disappearedNode1 = nodeCaptor.getAllValues().get(0);
+        ClusterNode disappearedNode2 = nodeCaptor.getAllValues().get(1);
+
+        assertAll(
+                () -> assertThat(disappearedNode1.id(), is("id1")),
+                () -> assertThat(disappearedNode2.id(), is("id2"))
+        );
+    }
+
+    @Test
+    void multiRemovalProducesExactlyOneWriteToDb() {
+        ClusterNode node1 = new ClusterNode("id1", "node", new NetworkAddress("host", 1000));
+        ClusterNode node2 = new ClusterNode("id2", "node2", new NetworkAddress("host2", 1000));
+
+        topology.putNode(node1);
+        topology.putNode(node2);
+
+        topology.removeNodes(Set.of(node1, node2));
+
+        // Expecting 3 writes because there are two puts and one removal. The test verifies that the removal produces just 1 write.
+        verify(storage, times(3)).put(eq("logical".getBytes(UTF_8)), any());
+    }
+
+    @Test
+    void onTopologyLeapIsTriggeredOnSnapshotRestore() {
+        topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+        topology.fireTopologyLeap();
+
+        verify(listener).onTopologyLeap(topologyCaptor.capture());
+
+        LogicalTopologySnapshot capturedSnapshot = topologyCaptor.getValue();
+        assertThat(capturedSnapshot.version(), is(1L));
+        assertThat(capturedSnapshot.nodes(), hasSize(1));
+        assertThat(capturedSnapshot.nodes().iterator().next().id(), is("id1"));
+    }
+
+    @Test
+    void onAppearedListenersExceptionsDoNotBreakNotification() {
+        LogicalTopologyEventListener secondListener = mock(LogicalTopologyEventListener.class);
+
+        doThrow(new RuntimeException("Oops")).when(listener).onAppeared(any(), any());
+
+        topology.addEventListener(secondListener);
+
+        topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+        verify(listener).onAppeared(any(), any());
+        verify(secondListener).onAppeared(any(), any());
+    }
+
+    @Test
+    void onAppearedListenerErrorIsRethrown() {
+        doThrow(new TestError()).when(listener).onAppeared(any(), any());
+
+        assertThrows(
+                TestError.class,
+                () -> topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)))
+        );
+    }
+
+    @Test
+    void onDisappearedListenersExceptionsDoNotBreakNotification() {
+        LogicalTopologyEventListener secondListener = mock(LogicalTopologyEventListener.class);
+
+        doThrow(new RuntimeException("Oops")).when(listener).onDisappeared(any(), any());
+
+        topology.addEventListener(secondListener);
+
+        ClusterNode node = new ClusterNode("id1", "node", new NetworkAddress("host", 1000));
+
+        topology.putNode(node);
+        topology.removeNodes(Set.of(node));
+
+        verify(listener).onDisappeared(any(), any());
+        verify(secondListener).onDisappeared(any(), any());
+    }
+
+    @Test
+    void onDisappearedListenerErrorIsRethrown() {
+        doThrow(new TestError()).when(listener).onDisappeared(any(), any());
+
+        ClusterNode node = new ClusterNode("id1", "node", new NetworkAddress("host", 1000));
+
+        topology.putNode(node);
+
+        assertThrows(TestError.class, () -> topology.removeNodes(Set.of(node)));
+    }
+
+    @Test
+    void onTopologyLeapListenersExceptionsDoNotBreakNotification() {
+        LogicalTopologyEventListener secondListener = mock(LogicalTopologyEventListener.class);
+
+        doThrow(new RuntimeException("Oops")).when(listener).onTopologyLeap(any());
+
+        topology.addEventListener(secondListener);
+
+        topology.fireTopologyLeap();
+
+        verify(listener).onTopologyLeap(any());
+        verify(secondListener).onTopologyLeap(any());
+    }
+
+    @Test
+    void onTopologyLeapListenerErrorIsRethrown() {
+        doThrow(new TestError()).when(listener).onTopologyLeap(any());
+
+        assertThrows(TestError.class, () -> topology.fireTopologyLeap());
+    }
+
+    @Test
+    void eventListenerStopsGettingEventsAfterListenerRemoval() {
+        topology.removeEventListener(listener);
+
+        topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+        verify(listener, never()).onAppeared(any(), any());
+    }
+
+    @SuppressWarnings("serial")
+    private static class TestError extends Error {
+    }
 }
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyServiceImplTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyServiceImplTest.java
new file mode 100644
index 0000000000..b30a3dc240
--- /dev/null
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyServiceImplTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.topology;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.network.LogicalTopologyEventListener;
+import org.apache.ignite.network.LogicalTopologySnapshot;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class LogicalTopologyServiceImplTest {
+    @Mock
+    private LogicalTopology logicalTopology;
+
+    @Mock
+    private ClusterManagementGroupManager cmgManager;
+
+    @InjectMocks
+    private LogicalTopologyServiceImpl logicalTopologyService;
+
+    @Mock
+    private LogicalTopologyEventListener listener;
+
+    @Test
+    void delegatesAddEventListener() {
+        logicalTopologyService.addEventListener(listener);
+
+        verify(logicalTopology).addEventListener(listener);
+    }
+
+    @Test
+    void delegatesRemoveEventListener() {
+        logicalTopologyService.removeEventListener(listener);
+
+        verify(logicalTopology).removeEventListener(listener);
+    }
+
+    @Test
+    void delegatesLogicalTopologyOnLeader() {
+        CompletableFuture<LogicalTopologySnapshot> snapshotFuture = new CompletableFuture<>();
+
+        doReturn(snapshotFuture).when(cmgManager).logicalTopology();
+
+        assertThat(logicalTopologyService.logicalTopologyOnLeader(), is(snapshotFuture));
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
index d701d13b0d..74dacefbe0 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
@@ -20,13 +20,18 @@ package org.apache.ignite.internal;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
@@ -47,6 +52,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 /**
  * Abstract integration test that starts and stops a cluster.
  */
+@SuppressWarnings("ALL")
 @ExtendWith(WorkDirectoryExtension.class)
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractTest {
@@ -79,57 +85,113 @@ public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractT
      */
     @BeforeEach
     void startNodes(TestInfo testInfo) {
-        String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
-
-        List<CompletableFuture<Ignite>> futures = IntStream.range(0, nodes())
-                .mapToObj(i -> {
-                    String nodeName = testNodeName(testInfo, i);
-
-                    String config = IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, BASE_PORT + i, connectNodeAddr);
-
-                    return IgnitionManager.start(nodeName, config, WORK_DIR.resolve(nodeName));
-                })
+        List<CompletableFuture<Ignite>> futures = IntStream.range(0, initialNodes())
+                .mapToObj(i -> startNode0(i, testInfo))
                 .collect(toList());
 
-        String metaStorageNodeName = testNodeName(testInfo, nodes() - 1);
+        String metaStorageNodeName = testNodeName(testInfo, initialNodes() - 1);
 
         IgnitionManager.init(metaStorageNodeName, List.of(metaStorageNodeName), "cluster");
 
         for (CompletableFuture<Ignite> future : futures) {
-            assertThat(future, willCompleteSuccessfully());
+            assertThat(future, willSucceedIn(10, TimeUnit.SECONDS));
 
             clusterNodes.add(future.join());
         }
     }
 
+    private static CompletableFuture<Ignite> startNode0(int nodeIndex, TestInfo testInfo) {
+        String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+        String nodeName = testNodeName(testInfo, nodeIndex);
+
+        String config = IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, BASE_PORT + nodeIndex, connectNodeAddr);
+
+        return IgnitionManager.start(nodeName, config, WORK_DIR.resolve(nodeName));
+    }
+
     /**
-     * Get a count of nodes in the Ignite cluster.
+     * Starts an Ignite node with the given index.
      *
-     * @return Count of nodes.
+     * @param nodeIndex Zero-based index (used to build node name).
+     * @param testInfo Test info (used to build node name).
+     * @return Started Ignite node.
      */
-    protected int nodes() {
+    protected Ignite startNode(int nodeIndex, TestInfo testInfo) {
+        CompletableFuture<Ignite> future = startNode0(nodeIndex, testInfo);
+
+        assertThat(future, willSucceedIn(10, TimeUnit.SECONDS));
+
+        Ignite ignite = future.join();
+
+        if (nodeIndex < clusterNodes.size()) {
+            clusterNodes.set(nodeIndex, ignite);
+        } else if (nodeIndex == clusterNodes.size()) {
+            clusterNodes.add(ignite);
+        } else {
+            throw new IllegalArgumentException("Cannot start node with index " + nodeIndex + " because we only have "
+                    + clusterNodes.size() + " nodes");
+        }
+
+        return ignite;
+    }
+
+    /**
+     * Returns count of nodes in the Ignite cluster started before each test.
+     *
+     * @return Count of nodes in initial cluster.
+     */
+    protected int initialNodes() {
         return 3;
     }
 
     /**
-     * After all.
+     * Stops all nodes that are not stopped yet.
      */
     @AfterEach
     void stopNodes(TestInfo testInfo) throws Exception {
         LOG.info("Start tearDown()");
 
-        clusterNodes.clear();
-
-        List<AutoCloseable> closeables = IntStream.range(0, nodes())
-                .mapToObj(i -> testNodeName(testInfo, i))
-                .map(nodeName -> (AutoCloseable) () -> IgnitionManager.stop(nodeName))
+        List<AutoCloseable> closeables = clusterNodes.stream()
+                .filter(Objects::nonNull)
+                .map(node -> (AutoCloseable) () -> IgnitionManager.stop(node.name()))
                 .collect(toList());
 
         IgniteUtils.closeAll(closeables);
 
+        clusterNodes.clear();
+
         LOG.info("End tearDown()");
     }
 
+    /**
+     * Stops a node by index.
+     *
+     * @param nodeIndex Node index.
+     * @param testInfo Test info (used to construct node name).
+     */
+    protected final void stopNode(int nodeIndex, TestInfo testInfo) {
+        assertThat(clusterNodes.size(), is(greaterThan(nodeIndex)));
+        assertThat(clusterNodes.get(nodeIndex), is(notNullValue()));
+
+        IgnitionManager.stop(testNodeName(testInfo, nodeIndex));
+
+        clusterNodes.set(nodeIndex, null);
+    }
+
+    /**
+     * Restarts a node by index.
+     *
+     * @param nodeIndex Node index.
+     * @param testInfo Test info (used to construct node name).
+     * @return New node instance.
+     */
+    protected final Ignite restartNode(int nodeIndex, TestInfo testInfo) {
+        stopNode(nodeIndex, testInfo);
+
+        return startNode(nodeIndex, testInfo);
+    }
+
     /**
      * Invokes before the test will start.
      *
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
index fc1a7d6da6..09b6c53289 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
@@ -248,7 +248,7 @@ class ItComputeTest extends AbstractClusterIntegrationTest {
     }
 
     private List<String> allNodeNames() {
-        return IntStream.range(0, nodes())
+        return IntStream.range(0, initialNodes())
                 .mapToObj(this::node)
                 .map(Ignite::name)
                 .collect(toList());
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
new file mode 100644
index 0000000000..1993f8f908
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.AbstractClusterIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.LogicalTopologyEventListener;
+import org.apache.ignite.network.LogicalTopologySnapshot;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Integration tests for functionality of logical topology events subscription.
+ */
+@SuppressWarnings("resource")
+class ItLogicalTopologyTest extends AbstractClusterIntegrationTest {
+    private final List<Event> events = new CopyOnWriteArrayList<>();
+
+    private final LogicalTopologyEventListener listener = new LogicalTopologyEventListener() {
+        @Override
+        public void onAppeared(ClusterNode appearedNode, LogicalTopologySnapshot newTopology) {
+            events.add(new Event(true, appearedNode, newTopology.version()));
+        }
+
+        @Override
+        public void onDisappeared(ClusterNode disappearedNode, LogicalTopologySnapshot newTopology) {
+            events.add(new Event(false, disappearedNode, newTopology.version()));
+        }
+    };
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @Test
+    void receivesLogicalTopologyEvents(TestInfo testInfo) throws Exception {
+        IgniteImpl entryNode = node(0);
+
+        entryNode.logicalTopologyService().addEventListener(listener);
+
+        // Checking that onAppeared() is received.
+        Ignite secondIgnite = startNode(1, testInfo);
+
+        assertTrue(waitForCondition(() -> !events.isEmpty(), 10_000));
+
+        assertThat(events, hasSize(1));
+
+        Event firstEvent = events.get(0);
+
+        assertTrue(firstEvent.appeared);
+        assertThat(firstEvent.node.name(), is(secondIgnite.name()));
+        assertThat(firstEvent.topologyVersion, is(2L));
+
+        // Checking that onDisappeared() is received.
+        stopNode(1, testInfo);
+
+        assertTrue(waitForCondition(() -> events.size() > 1, 10_000));
+
+        assertThat(events, hasSize(2));
+
+        Event secondEvent = events.get(1);
+
+        assertFalse(secondEvent.appeared);
+        assertThat(secondEvent.node.name(), is(secondIgnite.name()));
+        assertThat(secondEvent.topologyVersion, is(3L));
+    }
+
+    @Test
+    void receivesLogicalTopologyEventsFromRestart(TestInfo testInfo) throws Exception {
+        IgniteImpl entryNode = node(0);
+
+        Ignite secondIgnite = startNode(1, testInfo);
+
+        entryNode.logicalTopologyService().addEventListener(listener);
+
+        restartNode(1, testInfo);
+
+        assertTrue(waitForCondition(() -> events.size() >= 4, 10_000));
+
+        assertThat(events, hasSize(4));
+
+        Event leaveEvent = events.get(2);
+
+        assertFalse(leaveEvent.appeared);
+        assertThat(leaveEvent.node.name(), is(secondIgnite.name()));
+        assertThat(leaveEvent.topologyVersion, is(3L));
+
+        Event joinEvent = events.get(3);
+
+        assertTrue(joinEvent.appeared);
+        assertThat(joinEvent.node.name(), is(secondIgnite.name()));
+        assertThat(joinEvent.topologyVersion, is(4L));
+    }
+
+    private static class Event {
+        private final boolean appeared;
+        private final ClusterNode node;
+        private final long topologyVersion;
+
+        private Event(boolean appeared, ClusterNode node, long topologyVersion) {
+            this.appeared = appeared;
+            this.node = node;
+            this.topologyVersion = topologyVersion;
+        }
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
index 9a2d59cd3c..bde357dd6f 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
@@ -62,7 +62,7 @@ class ItRaftStorageVolatilityTest extends AbstractClusterIntegrationTest {
     private static final String TABLE_NAME = "test";
 
     @Override
-    protected int nodes() {
+    protected int initialNodes() {
         return 1;
     }
 
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 58b4d7f4a3..68c83ee438 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
 import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
 import org.apache.ignite.internal.cluster.management.rest.ClusterManagementRestFactory;
 import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
 import org.apache.ignite.internal.component.RestAddressReporter;
 import org.apache.ignite.internal.components.LongJvmPauseDetector;
 import org.apache.ignite.internal.compute.ComputeComponent;
@@ -112,6 +113,7 @@ import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.LogicalTopologyService;
 import org.apache.ignite.network.MessageSerializationRegistryImpl;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkAddress;
@@ -209,6 +211,8 @@ public class IgniteImpl implements Ignite {
 
     private final ClusterManagementGroupManager cmgMgr;
 
+    private final LogicalTopologyService logicalTopologyService;
+
     /** Client handler module. */
     private final ClientHandlerModule clientHandlerModule;
 
@@ -322,16 +326,18 @@ public class IgniteImpl implements Ignite {
         // TODO: IGNITE-16841 - use common RocksDB instance to store cluster state as well.
         clusterStateStorage = new RocksDbClusterStateStorage(workDir.resolve(CMG_DB_PATH));
 
-        var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage);
+        var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
 
         cmgMgr = new ClusterManagementGroupManager(
                 vaultMgr,
                 clusterSvc,
                 raftMgr,
                 clusterStateStorage,
-                logicalTopologyService
+                logicalTopology
         );
 
+        logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgMgr);
+
         metaStorageMgr = new MetaStorageManager(
                 vaultMgr,
                 clusterSvc,
@@ -809,4 +815,9 @@ public class IgniteImpl implements Ignite {
     public DistributionZoneManager distributionZoneManager() {
         return distributionZoneManager;
     }
+
+    @TestOnly
+    public LogicalTopologyService logicalTopologyService() {
+        return logicalTopologyService;
+    }
 }