You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/12/07 09:28:08 UTC
[ignite-3] branch main updated: IGNITE-18085 Implement logical topology events subscription and notification (#1403)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8eb3ae2e7c IGNITE-18085 Implement logical topology events subscription and notification (#1403)
8eb3ae2e7c is described below
commit 8eb3ae2e7cc7dc72ad0f1a5390c479ad843d1a38
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Wed Dec 7 13:28:02 2022 +0400
IGNITE-18085 Implement logical topology events subscription and notification (#1403)
---
.../network/LogicalTopologyEventListener.java | 71 +++++
.../ignite/network/LogicalTopologyService.java | 61 ++++
.../ignite/network}/LogicalTopologySnapshot.java | 47 +--
.../ignite/network/TopologyEventHandler.java | 8 +-
.../org/apache/ignite/network/TopologyService.java | 8 +-
.../internal/cluster/management/MockNode.java | 2 +-
.../management/raft/ItCmgRaftServiceTest.java | 2 +-
.../management/ClusterManagementGroupManager.java | 2 +-
.../management/raft/CmgRaftGroupListener.java | 8 +-
.../cluster/management/raft/CmgRaftService.java | 2 +-
.../raft/responses/LogicalTopologyResponse.java | 2 +-
.../management/rest/TopologyController.java | 2 +-
.../management/topology/LogicalTopology.java | 28 +-
.../management/topology/LogicalTopologyImpl.java | 140 ++++++++-
.../topology/LogicalTopologyServiceImpl.java | 53 ++++
.../management/raft/CmgRaftGroupListenerTest.java | 25 +-
.../topology/LogicalTopologyImplTest.java | 338 ++++++++++++++++++---
.../topology/LogicalTopologyServiceImplTest.java | 71 +++++
.../internal/AbstractClusterIntegrationTest.java | 106 +++++--
.../ignite/internal/compute/ItComputeTest.java | 2 +-
.../internal/compute/ItLogicalTopologyTest.java | 133 ++++++++
.../inmemory/ItRaftStorageVolatilityTest.java | 2 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 15 +-
23 files changed, 991 insertions(+), 137 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologyEventListener.java b/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologyEventListener.java
new file mode 100644
index 0000000000..d20bd34420
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologyEventListener.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+/**
+ * Listens to events related to logical topology changes.
+ *
+ * <p>Event listeners are not guaranteed to see events they receive being consistent with the state acquired from
+ * {@link LogicalTopologyService#logicalTopologyOnLeader()} (or local logical topology state obtained in other means). This means that,
+ * if you get an event with topology version N, event listener might see version M greater than N if it
+ * tries to get current logical topology from the CMG leader, or it might see version M less or greater than N if it
+ * tries to get current local topology from the local storage.
+ *
+ * <p>Event listener methods must return as quickly as possible. If some heavy processing, blocking I/O or waiting
+ * for a future has to be done, this should be offloaded to another thread.
+ *
+ * <p>While an event listener is registered, it is guaranteed to get all logical topology events, in the correct order.
+ * While an event listener is NOT registereed (for instance, when its node is restarting), events are skipped. This
+ * means that, if an Ignite node is restarting, it might miss some logical topology events.
+ * {@link LogicalTopologyService#logicalTopologyOnLeader()} should be used to catch up.
+ */
+public interface LogicalTopologyEventListener {
+ /**
+ * Called when a new member has joined a cluster's logical topology.
+ *
+ * @param appearedNode Appeared cluster member.
+ * @param newTopology Topology in which the member has joined.
+ */
+ default void onAppeared(ClusterNode appearedNode, LogicalTopologySnapshot newTopology) {
+ // No-op.
+ }
+
+ /**
+ * Called when a member has left a cluster's logical topology.
+ *
+ * @param disappearedNode Disappeared cluster member.
+ * @param newTopology Topology in which the member has disappeared.
+ */
+ default void onDisappeared(ClusterNode disappearedNode, LogicalTopologySnapshot newTopology) {
+ // No-op.
+ }
+
+ /**
+ * Called when a topology changed in a way that cannot be interpreted as a sequence of 'appeared'/'disappeared' events,
+ * so the change happens as a leap forward. This is mutually exclusive with {@link #onAppeared(ClusterNode, LogicalTopologySnapshot)}
+ * and {@link #onDisappeared(ClusterNode, LogicalTopologySnapshot)}.
+ *
+ * <p>This happens rarely (in a well-configured system this should never happen), for instance, when a Cluster Management
+ * RAFT Group sends its update by installing a RAFT snapshot instead of normal AppendEntries.
+ *
+ * @param newTopology The new logical topology state.
+ */
+ default void onTopologyLeap(LogicalTopologySnapshot newTopology) {
+ // No-op.
+ }
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologyService.java b/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologyService.java
new file mode 100644
index 0000000000..308d0d146e
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologyService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Used for getting information about the cluster's Logical Topology.
+ *
+ * <p>There are 2 kinds of 'topologies': physical (see {@link TopologyService} and logical.
+ * <ul>
+ * <li>Physical topology consists of nodes mutually discovered by a membership protocol (like SWIM)</li>
+ * <li>
+ * Logical topology is a subset of a physical topology and only contains nodes that have successfully
+ * <a href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-77%3A+Node+Join+Protocol+and+Initialization+for+Ignite+3">
+ * joined the cluster</a>
+ * </li>
+ * </ul>
+ *
+ * @see <a href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-77%3A+Node+Join+Protocol+and+Initialization+for+Ignite+3">
+ * IEP-77: Node Join Protocol and Initialization for Ignite 3</a>
+ * @see TopologyService
+ */
+public interface LogicalTopologyService {
+ /**
+ * Adds a listener for logical topology events.
+ *
+ * @param listener Listener to add.
+ * @see LogicalTopologyEventListener
+ */
+ void addEventListener(LogicalTopologyEventListener listener);
+
+ /**
+ * Removes a listener for logical topology events.
+ *
+ * @param listener Listener to remove.
+ */
+ void removeEventListener(LogicalTopologyEventListener listener);
+
+ /**
+ * Returns a future that, when complete, resolves into a logical topology snapshot as the CMG leader sees it.
+ *
+ * @return Future that, when complete, resolves into a logical topology snapshot from the point of view of the CMG leader.
+ */
+ CompletableFuture<LogicalTopologySnapshot> logicalTopologyOnLeader();
+}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologySnapshot.java b/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologySnapshot.java
similarity index 56%
rename from modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologySnapshot.java
rename to modules/api/src/main/java/org/apache/ignite/network/LogicalTopologySnapshot.java
index 635faba0ea..66ed67214f 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologySnapshot.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/LogicalTopologySnapshot.java
@@ -15,36 +15,34 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.cluster.management.topology;
+package org.apache.ignite.network;
import static java.util.Collections.emptySet;
-import static java.util.function.Function.identity;
-import static java.util.stream.Collectors.toMap;
import java.io.Serializable;
import java.util.Collection;
-import java.util.Map;
import java.util.Set;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.network.ClusterNode;
/**
* A snapshot of a logical topology as seen locally. Includes nodes participating in the logical topology and the version
* of the topology (that gets incremented on each update to the topology).
+ *
+ * <p>Instances of this class are immutable.
*/
public class LogicalTopologySnapshot implements Serializable {
private static final long serialVersionUID = 0L;
- /** Initial 'topology' for an empty cluster (before any node joined). */
- static final LogicalTopologySnapshot INITIAL = new LogicalTopologySnapshot(0, emptySet());
+ /** Initial 'topology' for an empty cluster (before any node has joined). */
+ public static final LogicalTopologySnapshot INITIAL = new LogicalTopologySnapshot(0, emptySet());
private final long version;
@IgniteToStringInclude
private final Set<ClusterNode> nodes;
- private LogicalTopologySnapshot(long version, Collection<ClusterNode> nodes) {
+ public LogicalTopologySnapshot(long version, Collection<ClusterNode> nodes) {
this.version = version;
this.nodes = Set.copyOf(nodes);
}
@@ -63,39 +61,6 @@ public class LogicalTopologySnapshot implements Serializable {
return nodes;
}
- LogicalTopologySnapshot addNode(ClusterNode nodeToAdd) {
- Map<String, ClusterNode> map = nodes.stream().collect(toMap(ClusterNode::name, identity()));
-
- ClusterNode oldNode = map.put(nodeToAdd.name(), nodeToAdd);
- if (oldNode != null && oldNode.id().equals(nodeToAdd.id())) {
- // We already have this node, nothing needs to be changed.
- return this;
- }
-
- return new LogicalTopologySnapshot(version + 1, map.values());
- }
-
- LogicalTopologySnapshot removeNodesByIds(Set<ClusterNode> nodesToRemove) {
- Map<String, ClusterNode> mapById = nodes.stream().collect(toMap(ClusterNode::id, identity()));
-
- int originalSize = mapById.size();
-
- for (ClusterNode nodeToRemove : nodesToRemove) {
- mapById.remove(nodeToRemove.id());
- }
-
- if (mapById.size() == originalSize) {
- // Nothing was actually removed.
- return this;
- }
-
- return new LogicalTopologySnapshot(version + 1, mapById.values());
- }
-
- boolean containsNodeById(ClusterNode needle) {
- return nodes.stream().anyMatch(node -> node.id().equals(needle.id()));
- }
-
@Override
public String toString() {
return S.toString(LogicalTopologySnapshot.class, this);
diff --git a/modules/api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java b/modules/api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
index 9a4f89906d..2949331090 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
@@ -18,11 +18,11 @@
package org.apache.ignite.network;
/**
- * Interface for handling events related to topology changes.
+ * Interface for handling events related to physical topology changes.
*/
public interface TopologyEventHandler {
/**
- * Called when a new member has been detected joining a cluster.
+ * Called when a new member has been detected joining a cluster's physical topology.
*
* @param member Appeared cluster member.
*/
@@ -31,8 +31,8 @@ public interface TopologyEventHandler {
}
/**
- * Indicates that a member has left a cluster. This method is only called when a member leaves permanently (i.e. it is not possible to
- * re-establish a connection to it).
+ * Indicates that a member has left a cluster's physical topology. This method is only called when a member leaves permanently
+ * (i.e. it is not possible to re-establish a connection to it).
*
* @param member Disappeared cluster member.
*/
diff --git a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
index bd6b442276..99c5d7dc9c 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/TopologyService.java
@@ -21,7 +21,7 @@ import java.util.Collection;
import org.jetbrains.annotations.Nullable;
/**
- * Entry point for obtaining information about a cluster's topology.
+ * Entry point for obtaining information about a cluster's physical topology.
*/
// TODO: allow removing event handlers, see https://issues.apache.org/jira/browse/IGNITE-14519
public interface TopologyService {
@@ -40,14 +40,14 @@ public interface TopologyService {
Collection<ClusterNode> allMembers();
/**
- * Registers a handler for topology change events.
+ * Registers a handler for physical topology change events.
*
- * @param handler Topology events handler.
+ * @param handler Physical topology events handler.
*/
void addEventHandler(TopologyEventHandler handler);
/**
- * Returns a cluster node by its network address in host:port format.
+ * Returns a cluster node by its network address.
*
* @param addr The address.
* @return The node or {@code null} if the node has not yet been discovered or is offline.
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 358811a51c..44c63a17f0 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
-import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySnapshot;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.raft.Loza;
@@ -36,6 +35,7 @@ import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.LogicalTopologySnapshot;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.utils.ClusterServiceTestUtils;
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 03cf4c53d2..cf871112a9 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -47,7 +47,6 @@ import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyComm
import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
-import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySnapshot;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -63,6 +62,7 @@ import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.LogicalTopologySnapshot;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.StaticNodeFinder;
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 3fc97d55fd..c52d4ad20c 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -50,7 +50,6 @@ import org.apache.ignite.internal.cluster.management.raft.JoinDeniedException;
import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
-import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySnapshot;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -67,6 +66,7 @@ import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.LogicalTopologySnapshot;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.network.TopologyService;
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index 054128dcd1..f58f332f85 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -158,7 +158,7 @@ public class CmgRaftGroupListener implements RaftGroupListener {
ClusterNode node = command.node().asClusterNode();
if (validationManager.isNodeValidated(node)) {
- logicalTopology.putLogicalTopologyNode(node);
+ logicalTopology.putNode(node);
LOG.info("Node added to the logical topology [node={}]", node.name());
@@ -173,7 +173,7 @@ public class CmgRaftGroupListener implements RaftGroupListener {
private void removeNodesFromLogicalTopology(NodesLeaveCommand command) {
Set<ClusterNode> nodes = command.nodes().stream().map(ClusterNodeMessage::asClusterNode).collect(Collectors.toSet());
- logicalTopology.removeLogicalTopologyNodes(nodes);
+ logicalTopology.removeNodes(nodes);
if (LOG.isInfoEnabled()) {
LOG.info("Nodes removed from the logical topology [nodes={}]", nodes.stream().map(ClusterNode::name).collect(toList()));
@@ -191,9 +191,11 @@ public class CmgRaftGroupListener implements RaftGroupListener {
try {
storage.restoreSnapshot(path);
+ logicalTopology.fireTopologyLeap();
+
return true;
} catch (IgniteInternalException e) {
- LOG.debug("Failed to restore snapshot [path={}]", path, e);
+ LOG.error("Failed to restore snapshot [path={}]", path, e);
return false;
}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
index 4863182b26..26845a3cd1 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.cluster.management.raft.commands.NodesLeaveCom
import org.apache.ignite.internal.cluster.management.raft.responses.LogicalTopologyResponse;
import org.apache.ignite.internal.cluster.management.raft.responses.ValidationErrorResponse;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
-import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySnapshot;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.properties.IgniteProductVersion;
@@ -42,6 +41,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.LogicalTopologySnapshot;
/**
* A wrapper around a {@link RaftGroupService} providing helpful methods for working with the CMG.
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/responses/LogicalTopologyResponse.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/responses/LogicalTopologyResponse.java
index a5251bc558..79724c666f 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/responses/LogicalTopologyResponse.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/responses/LogicalTopologyResponse.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.cluster.management.raft.responses;
import java.io.Serializable;
-import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySnapshot;
+import org.apache.ignite.network.LogicalTopologySnapshot;
/**
* Response containing the current logical topology.
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java
index 773496fdaf..a2b665cc8b 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/rest/TopologyController.java
@@ -24,13 +24,13 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
-import org.apache.ignite.internal.cluster.management.topology.LogicalTopologySnapshot;
import org.apache.ignite.internal.rest.api.cluster.ClusterNodeDto;
import org.apache.ignite.internal.rest.api.cluster.NetworkAddressDto;
import org.apache.ignite.internal.rest.api.cluster.NodeMetadataDto;
import org.apache.ignite.internal.rest.api.cluster.TopologyApi;
import org.apache.ignite.internal.rest.exception.ClusterNotInitializedException;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.LogicalTopologySnapshot;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeMetadata;
import org.apache.ignite.network.TopologyService;
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopology.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopology.java
index f77eb84f00..2b9df4b26f 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopology.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopology.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.cluster.management.topology;
import java.util.Set;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.LogicalTopologyEventListener;
+import org.apache.ignite.network.LogicalTopologySnapshot;
/**
* Used to manage logical topology information available locally on the current node.
@@ -26,7 +28,7 @@ import org.apache.ignite.network.ClusterNode;
*/
public interface LogicalTopology {
/**
- * Retrieves the current logical topology snapshot.
+ * Retrieves the current logical topology snapshot stored in the local storage.
*/
LogicalTopologySnapshot getLogicalTopology();
@@ -35,17 +37,37 @@ public interface LogicalTopology {
*
* @param node Node to put.
*/
- void putLogicalTopologyNode(ClusterNode node);
+ void putNode(ClusterNode node);
/**
* Removes given nodes from the logical topology.
*
* @param nodes Nodes to remove.
*/
- void removeLogicalTopologyNodes(Set<ClusterNode> nodes);
+ void removeNodes(Set<ClusterNode> nodes);
/**
* Returns {@code true} if a given node is present in the logical topology or {@code false} otherwise.
*/
boolean isNodeInLogicalTopology(ClusterNode node);
+
+ /**
+ * Adds a listener for logical topology events.
+ *
+ * @param listener Listener to add.
+ */
+ void addEventListener(LogicalTopologyEventListener listener);
+
+ /**
+ * Removes a listener for logical topology events.
+ *
+ * @param listener Listener to remove.
+ */
+ void removeEventListener(LogicalTopologyEventListener listener);
+
+ /**
+ * Causes {@link LogicalTopologyEventListener#onTopologyLeap(LogicalTopologySnapshot)} to be fired with the topology snapshot
+ * currently stored in an underlying storage. Invoked after the storage has been restored from a snapshot.
+ */
+ void fireTopologyLeap();
}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
index 54dd8003e2..3c969eaaa8 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImpl.java
@@ -18,22 +18,38 @@
package org.apache.ignite.internal.cluster.management.topology;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Comparator.comparing;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.LogicalTopologyEventListener;
+import org.apache.ignite.network.LogicalTopologySnapshot;
/**
* Implementation of {@link LogicalTopology}.
*/
public class LogicalTopologyImpl implements LogicalTopology {
+ private static final IgniteLogger LOG = Loggers.forClass(LogicalTopologyImpl.class);
+
/** Storage key for the logical topology. */
private static final byte[] LOGICAL_TOPOLOGY_KEY = "logical".getBytes(UTF_8);
private final ClusterStateStorage storage;
+ private final List<LogicalTopologyEventListener> listeners = new CopyOnWriteArrayList<>();
+
public LogicalTopologyImpl(ClusterStateStorage storage) {
this.storage = storage;
}
@@ -50,21 +66,131 @@ public class LogicalTopologyImpl implements LogicalTopology {
}
@Override
- public void putLogicalTopologyNode(ClusterNode node) {
- replaceLogicalTopologyWith(readLogicalTopology().addNode(node));
+ public void putNode(ClusterNode nodeToPut) {
+ LogicalTopologySnapshot snapshot = readLogicalTopology();
+
+ Map<String, ClusterNode> mapByName = snapshot.nodes().stream()
+ .collect(toMap(ClusterNode::name, identity()));
+
+ Runnable fireRemovalTask = null;
+
+ ClusterNode oldNode = mapByName.remove(nodeToPut.name());
+
+ if (oldNode != null) {
+ if (oldNode.id().equals(nodeToPut.id())) {
+ // We already have this node, nothing needs to be changed.
+ return;
+ }
+
+ // This is an update. First simulate disappearance, then appearance will be fired.
+ snapshot = new LogicalTopologySnapshot(snapshot.version() + 1, mapByName.values());
+
+ LogicalTopologySnapshot snapshotAfterRemoval = snapshot;
+ fireRemovalTask = () -> fireDisappeared(oldNode, snapshotAfterRemoval);
+ }
+
+ mapByName.put(nodeToPut.name(), nodeToPut);
+
+ snapshot = new LogicalTopologySnapshot(snapshot.version() + 1, mapByName.values());
+
+ // Only save to storage once per call so that our writes to storage are atomic and we don't end up in a situation
+ // when different CMG listener instances produce different sequences of topology snapshots.
+ saveSnapshotToStorage(snapshot);
+
+ if (fireRemovalTask != null) {
+ fireRemovalTask.run();
+ }
+ fireAppeared(nodeToPut, snapshot);
}
- private void replaceLogicalTopologyWith(LogicalTopologySnapshot newTopology) {
+ private void saveSnapshotToStorage(LogicalTopologySnapshot newTopology) {
storage.put(LOGICAL_TOPOLOGY_KEY, toBytes(newTopology));
}
@Override
- public void removeLogicalTopologyNodes(Set<ClusterNode> nodes) {
- replaceLogicalTopologyWith(readLogicalTopology().removeNodesByIds(nodes));
+ public void removeNodes(Set<ClusterNode> nodesToRemove) {
+ LogicalTopologySnapshot snapshot = readLogicalTopology();
+
+ Map<String, ClusterNode> mapById = snapshot.nodes().stream()
+ .collect(toMap(ClusterNode::id, identity()));
+
+ // Removing in a well-defined order to make sure that a command produces an identical sequence of events in each CMG listener.
+ List<ClusterNode> sortedNodesToRemove = nodesToRemove.stream()
+ .sorted(comparing(ClusterNode::id))
+ .collect(toList());
+
+ List<Runnable> fireTasks = new ArrayList<>();
+
+ for (ClusterNode nodeToRemove : sortedNodesToRemove) {
+ ClusterNode removedNode = mapById.remove(nodeToRemove.id());
+
+ if (removedNode != null) {
+ snapshot = new LogicalTopologySnapshot(snapshot.version() + 1, mapById.values());
+
+ LogicalTopologySnapshot finalSnapshot = snapshot;
+ fireTasks.add(() -> fireDisappeared(nodeToRemove, finalSnapshot));
+ }
+ }
+
+ // Only save to storage once per call so that our writes to storage are atomic and we don't end up in a situation
+ // when different CMG listener instances produce different sequences of topology snapshots.
+ saveSnapshotToStorage(snapshot);
+
+ fireTasks.forEach(Runnable::run);
+ }
+
+ @Override
+ public boolean isNodeInLogicalTopology(ClusterNode needle) {
+ return readLogicalTopology().nodes().stream()
+ .anyMatch(node -> node.id().equals(needle.id()));
+ }
+
+ private void fireAppeared(ClusterNode appearedNode, LogicalTopologySnapshot snapshot) {
+ for (LogicalTopologyEventListener listener : listeners) {
+ try {
+ listener.onAppeared(appearedNode, snapshot);
+ } catch (Throwable e) {
+ logAndRethrowIfError(e, "Failure while notifying onAppear() listener {}", listener);
+ }
+ }
+ }
+
+ private void fireDisappeared(ClusterNode oldNode, LogicalTopologySnapshot snapshot) {
+ for (LogicalTopologyEventListener listener : listeners) {
+ try {
+ listener.onDisappeared(oldNode, snapshot);
+ } catch (Throwable e) {
+ logAndRethrowIfError(e, "Failure while notifying onDisappear() listener {}", listener);
+ }
+ }
+ }
+
+ @Override
+ public void fireTopologyLeap() {
+ for (LogicalTopologyEventListener listener : listeners) {
+ try {
+ listener.onTopologyLeap(readLogicalTopology());
+ } catch (Throwable e) {
+ logAndRethrowIfError(e, "Failure while notifying onTopologyLeap() listener {}", listener);
+ }
+ }
+ }
+
+ private static void logAndRethrowIfError(Throwable e, String logMessagePattern, LogicalTopologyEventListener listener) {
+ LOG.error(logMessagePattern, e, listener);
+
+ if (e instanceof Error) {
+ throw (Error) e;
+ }
+ }
+
+ @Override
+ public void addEventListener(LogicalTopologyEventListener listener) {
+ listeners.add(listener);
}
@Override
- public boolean isNodeInLogicalTopology(ClusterNode node) {
- return readLogicalTopology().containsNodeById(node);
+ public void removeEventListener(LogicalTopologyEventListener listener) {
+ listeners.remove(listener);
}
}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyServiceImpl.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyServiceImpl.java
new file mode 100644
index 0000000000..58378072d5
--- /dev/null
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyServiceImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.topology;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.network.LogicalTopologyEventListener;
+import org.apache.ignite.network.LogicalTopologyService;
+import org.apache.ignite.network.LogicalTopologySnapshot;
+
+/**
+ * {@link LogicalTopologyService} implementation.
+ */
+public class LogicalTopologyServiceImpl implements LogicalTopologyService {
+ private final LogicalTopology logicalTopology;
+
+ private final ClusterManagementGroupManager clusterManagementGroupManager;
+
+ public LogicalTopologyServiceImpl(LogicalTopology logicalTopology, ClusterManagementGroupManager clusterManagementGroupManager) {
+ this.logicalTopology = logicalTopology;
+ this.clusterManagementGroupManager = clusterManagementGroupManager;
+ }
+
+ @Override
+ public void addEventListener(LogicalTopologyEventListener listener) {
+ logicalTopology.addEventListener(listener);
+ }
+
+ @Override
+ public void removeEventListener(LogicalTopologyEventListener listener) {
+ logicalTopology.removeEventListener(listener);
+ }
+
+ @Override
+ public CompletableFuture<LogicalTopologySnapshot> logicalTopologyOnLeader() {
+ return clusterManagementGroupManager.logicalTopology();
+ }
+}
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
index dd7e6fa5ca..743b96c05d 100644
--- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
@@ -23,12 +23,17 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import java.io.Serializable;
+import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -38,6 +43,7 @@ import org.apache.ignite.internal.cluster.management.ClusterTag;
import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory;
import org.apache.ignite.internal.cluster.management.raft.commands.ClusterNodeMessage;
import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopology;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.raft.Command;
@@ -51,15 +57,13 @@ import org.junit.jupiter.api.Test;
* Tests for the {@link CmgRaftGroupListener}.
*/
public class CmgRaftGroupListenerTest {
- private final ClusterStateStorage storage = new TestClusterStateStorage();
+ private final ClusterStateStorage storage = spy(new TestClusterStateStorage());
private final LongConsumer onLogicalTopologyChanged = mock(LongConsumer.class);
- private final CmgRaftGroupListener listener = new CmgRaftGroupListener(
- storage,
- new LogicalTopologyImpl(storage),
- onLogicalTopologyChanged
- );
+ private final LogicalTopology logicalTopology = spy(new LogicalTopologyImpl(storage));
+
+ private final CmgRaftGroupListener listener = new CmgRaftGroupListener(storage, logicalTopology, onLogicalTopologyChanged);
private final CmgMessagesFactory msgFactory = new CmgMessagesFactory();
@@ -131,6 +135,15 @@ public class CmgRaftGroupListenerTest {
verify(onLogicalTopologyChanged).accept(anyLong());
}
+ @Test
+ void restoreFromSnapshotTriggersTopologyLeapEvent() {
+ doNothing().when(storage).restoreSnapshot(any());
+
+ assertTrue(listener.onSnapshotLoad(Paths.get("/unused")));
+
+ verify(logicalTopology).fireTopologyLeap();
+ }
+
private static <T extends Command> Iterator<CommandClosure<T>> iterator(T obj) {
CommandClosure<T> closure = new CommandClosure<>() {
@Override
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java
index 8cd59dcbf3..340b0a8e80 100644
--- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyImplTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.cluster.management.topology;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -24,12 +25,24 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -38,26 +51,51 @@ import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorag
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.LogicalTopologyEventListener;
+import org.apache.ignite.network.LogicalTopologySnapshot;
import org.apache.ignite.network.NetworkAddress;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(WorkDirectoryExtension.class)
+@ExtendWith(MockitoExtension.class)
class LogicalTopologyImplTest {
- private final ClusterStateStorage storage = new TestClusterStateStorage();
+ private final ClusterStateStorage storage = spy(new TestClusterStateStorage());
- private LogicalTopology topologyService;
+ private LogicalTopology topology;
@WorkDirectory
protected Path workDir;
+ @Mock
+ private LogicalTopologyEventListener listener;
+
+ @Captor
+ private ArgumentCaptor<ClusterNode> nodeCaptor;
+
+ @Captor
+ private ArgumentCaptor<ClusterNode> nodeCaptor2;
+
+ @Captor
+ private ArgumentCaptor<LogicalTopologySnapshot> topologyCaptor;
+
+ @Captor
+ private ArgumentCaptor<LogicalTopologySnapshot> topologyCaptor2;
+
@BeforeEach
void setUp() {
storage.start();
- topologyService = new LogicalTopologyImpl(storage);
+ topology = new LogicalTopologyImpl(storage);
+
+ topology.addEventListener(listener);
}
@AfterEach
@@ -70,33 +108,33 @@ class LogicalTopologyImplTest {
*/
@Test
void testLogicalTopology() {
- assertThat(topologyService.getLogicalTopology().nodes(), is(empty()));
+ assertThat(topology.getLogicalTopology().nodes(), is(empty()));
var node1 = new ClusterNode("foo", "bar", new NetworkAddress("localhost", 123));
- topologyService.putLogicalTopologyNode(node1);
+ topology.putNode(node1);
- assertThat(topologyService.getLogicalTopology().nodes(), contains(node1));
+ assertThat(topology.getLogicalTopology().nodes(), contains(node1));
var node2 = new ClusterNode("baz", "quux", new NetworkAddress("localhost", 123));
- topologyService.putLogicalTopologyNode(node2);
+ topology.putNode(node2);
- assertThat(topologyService.getLogicalTopology().nodes(), containsInAnyOrder(node1, node2));
+ assertThat(topology.getLogicalTopology().nodes(), containsInAnyOrder(node1, node2));
var node3 = new ClusterNode("lol", "boop", new NetworkAddress("localhost", 123));
- topologyService.putLogicalTopologyNode(node3);
+ topology.putNode(node3);
- assertThat(topologyService.getLogicalTopology().nodes(), containsInAnyOrder(node1, node2, node3));
+ assertThat(topology.getLogicalTopology().nodes(), containsInAnyOrder(node1, node2, node3));
- topologyService.removeLogicalTopologyNodes(Set.of(node1, node2));
+ topology.removeNodes(Set.of(node1, node2));
- assertThat(topologyService.getLogicalTopology().nodes(), contains(node3));
+ assertThat(topology.getLogicalTopology().nodes(), contains(node3));
- topologyService.removeLogicalTopologyNodes(Set.of(node3));
+ topology.removeNodes(Set.of(node3));
- assertThat(topologyService.getLogicalTopology().nodes(), is(empty()));
+ assertThat(topology.getLogicalTopology().nodes(), is(empty()));
}
/**
@@ -106,24 +144,24 @@ class LogicalTopologyImplTest {
void testLogicalTopologyIdempotence() {
var node = new ClusterNode("foo", "bar", new NetworkAddress("localhost", 123));
- topologyService.putLogicalTopologyNode(node);
- topologyService.putLogicalTopologyNode(node);
+ topology.putNode(node);
+ topology.putNode(node);
- assertThat(topologyService.getLogicalTopology().nodes(), contains(node));
+ assertThat(topology.getLogicalTopology().nodes(), contains(node));
- topologyService.removeLogicalTopologyNodes(Set.of(node));
- topologyService.removeLogicalTopologyNodes(Set.of(node));
+ topology.removeNodes(Set.of(node));
+ topology.removeNodes(Set.of(node));
- assertThat(topologyService.getLogicalTopology().nodes(), is(empty()));
+ assertThat(topology.getLogicalTopology().nodes(), is(empty()));
}
@Test
- void logicalTopologyAdditionUsesNameAsNodeKey() {
- topologyService.putLogicalTopologyNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+ void additionUsesNameAsNodeKey() {
+ topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
- topologyService.putLogicalTopologyNode(new ClusterNode("id2", "node", new NetworkAddress("host", 1000)));
+ topology.putNode(new ClusterNode("id2", "node", new NetworkAddress("host", 1000)));
- Collection<ClusterNode> topology = topologyService.getLogicalTopology().nodes();
+ Collection<ClusterNode> topology = this.topology.getLogicalTopology().nodes();
assertThat(topology, hasSize(1));
@@ -131,25 +169,25 @@ class LogicalTopologyImplTest {
}
@Test
- void logicalTopologyRemovalUsesIdAsNodeKey() {
- topologyService.putLogicalTopologyNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+ void removalUsesIdAsNodeKey() {
+ topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
- topologyService.removeLogicalTopologyNodes(Set.of(new ClusterNode("id2", "node", new NetworkAddress("host", 1000))));
+ topology.removeNodes(Set.of(new ClusterNode("id2", "node", new NetworkAddress("host", 1000))));
- assertThat(topologyService.getLogicalTopology().nodes(), hasSize(1));
- assertThat((topologyService.getLogicalTopology().nodes()).iterator().next().id(), is("id1"));
+ assertThat(topology.getLogicalTopology().nodes(), hasSize(1));
+ assertThat((topology.getLogicalTopology().nodes()).iterator().next().id(), is("id1"));
- topologyService.removeLogicalTopologyNodes(Set.of(new ClusterNode("id1", "another-name", new NetworkAddress("host", 1000))));
+ topology.removeNodes(Set.of(new ClusterNode("id1", "another-name", new NetworkAddress("host", 1000))));
- assertThat(topologyService.getLogicalTopology().nodes(), is(empty()));
+ assertThat(topology.getLogicalTopology().nodes(), is(empty()));
}
@Test
void inLogicalTopologyTestUsesIdAsNodeKey() {
- topologyService.putLogicalTopologyNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+ topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
- assertTrue(topologyService.isNodeInLogicalTopology(new ClusterNode("id1", "node", new NetworkAddress("host", 1000))));
- assertFalse(topologyService.isNodeInLogicalTopology(new ClusterNode("another-id", "node", new NetworkAddress("host", 1000))));
+ assertTrue(topology.isNodeInLogicalTopology(new ClusterNode("id1", "node", new NetworkAddress("host", 1000))));
+ assertFalse(topology.isNodeInLogicalTopology(new ClusterNode("another-id", "node", new NetworkAddress("host", 1000))));
}
@Test
@@ -157,17 +195,243 @@ class LogicalTopologyImplTest {
Path snapshotDir = workDir.resolve("snapshot");
Files.createDirectory(snapshotDir);
- topologyService.putLogicalTopologyNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+ topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
storage.snapshot(snapshotDir).get(10, TimeUnit.SECONDS);
- topologyService.putLogicalTopologyNode(new ClusterNode("id2", "another-node", new NetworkAddress("host", 1001)));
+ topology.putNode(new ClusterNode("id2", "another-node", new NetworkAddress("host", 1001)));
storage.restoreSnapshot(snapshotDir);
- List<String> namesInTopology = topologyService.getLogicalTopology().nodes().stream()
+ List<String> namesInTopology = topology.getLogicalTopology().nodes().stream()
.map(ClusterNode::name)
.collect(toList());
assertThat(namesInTopology, contains("node"));
}
+
+ @Test
+ void addingNewNodeProducesAppearedEvent() {
+ topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+ verify(listener).onAppeared(nodeCaptor.capture(), topologyCaptor.capture());
+
+ assertThat(topologyCaptor.getValue().version(), is(1L));
+
+ ClusterNode appearedNode = nodeCaptor.getValue();
+
+ assertThat(appearedNode.id(), is("id1"));
+ assertThat(appearedNode.name(), is("node"));
+ }
+
+ @Test
+ void addingSameExistingNodeProducesNoEvents() {
+ topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+ topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+ verify(listener, times(1)).onAppeared(any(), any());
+ }
+
+ @Test
+ void updatingExistingNodeProducesDisappearedAndAppearedEvents() {
+ topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+ topology.putNode(new ClusterNode("id2", "node", new NetworkAddress("host1", 1001)));
+
+ InOrder inOrder = inOrder(listener);
+
+ inOrder.verify(listener).onDisappeared(nodeCaptor.capture(), topologyCaptor.capture());
+ inOrder.verify(listener).onAppeared(nodeCaptor2.capture(), topologyCaptor2.capture());
+
+ assertThat(topologyCaptor.getValue().version(), is(2L));
+ assertThat(topologyCaptor2.getValue().version(), is(3L));
+
+ ClusterNode disappearedNode = nodeCaptor.getValue();
+
+ assertThat(disappearedNode.id(), is("id1"));
+ assertThat(disappearedNode.name(), is("node"));
+ assertThat(disappearedNode.address(), is(new NetworkAddress("host", 1000)));
+
+ ClusterNode appearedNode = nodeCaptor2.getValue();
+
+ assertThat(appearedNode.id(), is("id2"));
+ assertThat(appearedNode.name(), is("node"));
+ assertThat(appearedNode.address(), is(new NetworkAddress("host1", 1001)));
+ }
+
+ @Test
+ void updatingExistingNodeProducesExactlyOneWriteToDb() {
+ topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+ topology.putNode(new ClusterNode("id2", "node", new NetworkAddress("host1", 1001)));
+
+ // Expecting 2 writes because there are two puts. The test verifies that second put also produces just 1 write.
+ verify(storage, times(2)).put(eq("logical".getBytes(UTF_8)), any());
+ }
+
+ @Test
+ void removingExistingNodeProducesDisappearedEvent() {
+ ClusterNode node = new ClusterNode("id1", "node", new NetworkAddress("host", 1000));
+ topology.putNode(node);
+
+ topology.removeNodes(Set.of(node));
+
+ verify(listener).onDisappeared(nodeCaptor.capture(), topologyCaptor.capture());
+
+ assertThat(topologyCaptor.getValue().version(), is(2L));
+
+ ClusterNode disappearedNode = nodeCaptor.getValue();
+
+ assertThat(disappearedNode.id(), is("id1"));
+ assertThat(disappearedNode.name(), is("node"));
+ }
+
+ @Test
+ void removingNonExistingNodeProducesNoEvents() {
+ ClusterNode node = new ClusterNode("id1", "node", new NetworkAddress("host", 1000));
+
+ topology.removeNodes(Set.of(node));
+
+ verify(listener, never()).onDisappeared(any(), any());
+ }
+
+ @Test
+ void multiRemovalProducesDisappearedEventsInOrderOfNodeIds() {
+ ClusterNode node1 = new ClusterNode("id1", "node", new NetworkAddress("host", 1000));
+ ClusterNode node2 = new ClusterNode("id2", "node2", new NetworkAddress("host2", 1000));
+
+ topology.putNode(node1);
+ topology.putNode(node2);
+
+ topology.removeNodes(new LinkedHashSet<>(List.of(node2, node1)));
+
+ verify(listener, times(2)).onDisappeared(nodeCaptor.capture(), topologyCaptor.capture());
+
+ List<LogicalTopologySnapshot> capturedSnapshots = topologyCaptor.getAllValues();
+
+ assertThat(capturedSnapshots.get(0).version(), is(3L));
+ assertThat(capturedSnapshots.get(1).version(), is(4L));
+
+ ClusterNode disappearedNode1 = nodeCaptor.getAllValues().get(0);
+ ClusterNode disappearedNode2 = nodeCaptor.getAllValues().get(1);
+
+ assertAll(
+ () -> assertThat(disappearedNode1.id(), is("id1")),
+ () -> assertThat(disappearedNode2.id(), is("id2"))
+ );
+ }
+
+ @Test
+ void multiRemovalProducesExactlyOneWriteToDb() {
+ ClusterNode node1 = new ClusterNode("id1", "node", new NetworkAddress("host", 1000));
+ ClusterNode node2 = new ClusterNode("id2", "node2", new NetworkAddress("host2", 1000));
+
+ topology.putNode(node1);
+ topology.putNode(node2);
+
+ topology.removeNodes(Set.of(node1, node2));
+
+ // Expecting 3 writes because there are two puts and one removal. The test verifies that the removal produces just 1 write.
+ verify(storage, times(3)).put(eq("logical".getBytes(UTF_8)), any());
+ }
+
+ @Test
+ void onTopologyLeapIsTriggeredOnSnapshotRestore() {
+ topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+ topology.fireTopologyLeap();
+
+ verify(listener).onTopologyLeap(topologyCaptor.capture());
+
+ LogicalTopologySnapshot capturedSnapshot = topologyCaptor.getValue();
+ assertThat(capturedSnapshot.version(), is(1L));
+ assertThat(capturedSnapshot.nodes(), hasSize(1));
+ assertThat(capturedSnapshot.nodes().iterator().next().id(), is("id1"));
+ }
+
+ @Test
+ void onAppearedListenersExceptionsDoNotBreakNotification() {
+ LogicalTopologyEventListener secondListener = mock(LogicalTopologyEventListener.class);
+
+ doThrow(new RuntimeException("Oops")).when(listener).onAppeared(any(), any());
+
+ topology.addEventListener(secondListener);
+
+ topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+ verify(listener).onAppeared(any(), any());
+ verify(secondListener).onAppeared(any(), any());
+ }
+
+ @Test
+ void onAppearedListenerErrorIsRethrown() {
+ doThrow(new TestError()).when(listener).onAppeared(any(), any());
+
+ assertThrows(
+ TestError.class,
+ () -> topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)))
+ );
+ }
+
+ @Test
+ void onDisappearedListenersExceptionsDoNotBreakNotification() {
+ LogicalTopologyEventListener secondListener = mock(LogicalTopologyEventListener.class);
+
+ doThrow(new RuntimeException("Oops")).when(listener).onDisappeared(any(), any());
+
+ topology.addEventListener(secondListener);
+
+ ClusterNode node = new ClusterNode("id1", "node", new NetworkAddress("host", 1000));
+
+ topology.putNode(node);
+ topology.removeNodes(Set.of(node));
+
+ verify(listener).onDisappeared(any(), any());
+ verify(secondListener).onDisappeared(any(), any());
+ }
+
+ @Test
+ void onDisappearedListenerErrorIsRethrown() {
+ doThrow(new TestError()).when(listener).onDisappeared(any(), any());
+
+ ClusterNode node = new ClusterNode("id1", "node", new NetworkAddress("host", 1000));
+
+ topology.putNode(node);
+
+ assertThrows(TestError.class, () -> topology.removeNodes(Set.of(node)));
+ }
+
+ @Test
+ void onTopologyLeapListenersExceptionsDoNotBreakNotification() {
+ LogicalTopologyEventListener secondListener = mock(LogicalTopologyEventListener.class);
+
+ doThrow(new RuntimeException("Oops")).when(listener).onTopologyLeap(any());
+
+ topology.addEventListener(secondListener);
+
+ topology.fireTopologyLeap();
+
+ verify(listener).onTopologyLeap(any());
+ verify(secondListener).onTopologyLeap(any());
+ }
+
+ @Test
+ void onTopologyLeapListenerErrorIsRethrown() {
+ doThrow(new TestError()).when(listener).onTopologyLeap(any());
+
+ assertThrows(TestError.class, () -> topology.fireTopologyLeap());
+ }
+
+ @Test
+ void eventListenerStopsGettingEventsAfterListenerRemoval() {
+ topology.removeEventListener(listener);
+
+ topology.putNode(new ClusterNode("id1", "node", new NetworkAddress("host", 1000)));
+
+ verify(listener, never()).onAppeared(any(), any());
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestError extends Error {
+ }
}
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyServiceImplTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyServiceImplTest.java
new file mode 100644
index 0000000000..b30a3dc240
--- /dev/null
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/topology/LogicalTopologyServiceImplTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.topology;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.network.LogicalTopologyEventListener;
+import org.apache.ignite.network.LogicalTopologySnapshot;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class LogicalTopologyServiceImplTest {
+ @Mock
+ private LogicalTopology logicalTopology;
+
+ @Mock
+ private ClusterManagementGroupManager cmgManager;
+
+ @InjectMocks
+ private LogicalTopologyServiceImpl logicalTopologyService;
+
+ @Mock
+ private LogicalTopologyEventListener listener;
+
+ @Test
+ void delegatesAddEventListener() {
+ logicalTopologyService.addEventListener(listener);
+
+ verify(logicalTopology).addEventListener(listener);
+ }
+
+ @Test
+ void delegatesRemoveEventListener() {
+ logicalTopologyService.removeEventListener(listener);
+
+ verify(logicalTopology).removeEventListener(listener);
+ }
+
+ @Test
+ void delegatesLogicalTopologyOnLeader() {
+ CompletableFuture<LogicalTopologySnapshot> snapshotFuture = new CompletableFuture<>();
+
+ doReturn(snapshotFuture).when(cmgManager).logicalTopology();
+
+ assertThat(logicalTopologyService.logicalTopologyOnLeader(), is(snapshotFuture));
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
index d701d13b0d..74dacefbe0 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
@@ -20,13 +20,18 @@ package org.apache.ignite.internal;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
-import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
@@ -47,6 +52,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
/**
* Abstract integration test that starts and stops a cluster.
*/
+@SuppressWarnings("ALL")
@ExtendWith(WorkDirectoryExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractTest {
@@ -79,57 +85,113 @@ public abstract class AbstractClusterIntegrationTest extends BaseIgniteAbstractT
*/
@BeforeEach
void startNodes(TestInfo testInfo) {
- String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
-
- List<CompletableFuture<Ignite>> futures = IntStream.range(0, nodes())
- .mapToObj(i -> {
- String nodeName = testNodeName(testInfo, i);
-
- String config = IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, BASE_PORT + i, connectNodeAddr);
-
- return IgnitionManager.start(nodeName, config, WORK_DIR.resolve(nodeName));
- })
+ List<CompletableFuture<Ignite>> futures = IntStream.range(0, initialNodes())
+ .mapToObj(i -> startNode0(i, testInfo))
.collect(toList());
- String metaStorageNodeName = testNodeName(testInfo, nodes() - 1);
+ String metaStorageNodeName = testNodeName(testInfo, initialNodes() - 1);
IgnitionManager.init(metaStorageNodeName, List.of(metaStorageNodeName), "cluster");
for (CompletableFuture<Ignite> future : futures) {
- assertThat(future, willCompleteSuccessfully());
+ assertThat(future, willSucceedIn(10, TimeUnit.SECONDS));
clusterNodes.add(future.join());
}
}
+ private static CompletableFuture<Ignite> startNode0(int nodeIndex, TestInfo testInfo) {
+ String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+ String nodeName = testNodeName(testInfo, nodeIndex);
+
+ String config = IgniteStringFormatter.format(NODE_BOOTSTRAP_CFG, BASE_PORT + nodeIndex, connectNodeAddr);
+
+ return IgnitionManager.start(nodeName, config, WORK_DIR.resolve(nodeName));
+ }
+
/**
- * Get a count of nodes in the Ignite cluster.
+ * Starts an Ignite node with the given index.
*
- * @return Count of nodes.
+ * @param nodeIndex Zero-based index (used to build node name).
+ * @param testInfo Test info (used to build node name).
+ * @return Started Ignite node.
*/
- protected int nodes() {
+ protected Ignite startNode(int nodeIndex, TestInfo testInfo) {
+ CompletableFuture<Ignite> future = startNode0(nodeIndex, testInfo);
+
+ assertThat(future, willSucceedIn(10, TimeUnit.SECONDS));
+
+ Ignite ignite = future.join();
+
+ if (nodeIndex < clusterNodes.size()) {
+ clusterNodes.set(nodeIndex, ignite);
+ } else if (nodeIndex == clusterNodes.size()) {
+ clusterNodes.add(ignite);
+ } else {
+ throw new IllegalArgumentException("Cannot start node with index " + nodeIndex + " because we only have "
+ + clusterNodes.size() + " nodes");
+ }
+
+ return ignite;
+ }
+
+ /**
+ * Returns count of nodes in the Ignite cluster started before each test.
+ *
+ * @return Count of nodes in initial cluster.
+ */
+ protected int initialNodes() {
return 3;
}
/**
- * After all.
+ * Stops all nodes that are not stopped yet.
*/
@AfterEach
void stopNodes(TestInfo testInfo) throws Exception {
LOG.info("Start tearDown()");
- clusterNodes.clear();
-
- List<AutoCloseable> closeables = IntStream.range(0, nodes())
- .mapToObj(i -> testNodeName(testInfo, i))
- .map(nodeName -> (AutoCloseable) () -> IgnitionManager.stop(nodeName))
+ List<AutoCloseable> closeables = clusterNodes.stream()
+ .filter(Objects::nonNull)
+ .map(node -> (AutoCloseable) () -> IgnitionManager.stop(node.name()))
.collect(toList());
IgniteUtils.closeAll(closeables);
+ clusterNodes.clear();
+
LOG.info("End tearDown()");
}
+ /**
+ * Stops a node by index.
+ *
+ * @param nodeIndex Node index.
+ * @param testInfo Test info (used to construct node name).
+ */
+ protected final void stopNode(int nodeIndex, TestInfo testInfo) {
+ assertThat(clusterNodes.size(), is(greaterThan(nodeIndex)));
+ assertThat(clusterNodes.get(nodeIndex), is(notNullValue()));
+
+ IgnitionManager.stop(testNodeName(testInfo, nodeIndex));
+
+ clusterNodes.set(nodeIndex, null);
+ }
+
+ /**
+ * Restarts a node by index.
+ *
+ * @param nodeIndex Node index.
+ * @param testInfo Test info (used to construct node name).
+ * @return New node instance.
+ */
+ protected final Ignite restartNode(int nodeIndex, TestInfo testInfo) {
+ stopNode(nodeIndex, testInfo);
+
+ return startNode(nodeIndex, testInfo);
+ }
+
/**
* Invokes before the test will start.
*
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
index fc1a7d6da6..09b6c53289 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
@@ -248,7 +248,7 @@ class ItComputeTest extends AbstractClusterIntegrationTest {
}
private List<String> allNodeNames() {
- return IntStream.range(0, nodes())
+ return IntStream.range(0, initialNodes())
.mapToObj(this::node)
.map(Ignite::name)
.collect(toList());
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
new file mode 100644
index 0000000000..1993f8f908
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.AbstractClusterIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.LogicalTopologyEventListener;
+import org.apache.ignite.network.LogicalTopologySnapshot;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Integration tests for functionality of logical topology events subscription.
+ */
+@SuppressWarnings("resource")
+class ItLogicalTopologyTest extends AbstractClusterIntegrationTest {
+ private final List<Event> events = new CopyOnWriteArrayList<>();
+
+ private final LogicalTopologyEventListener listener = new LogicalTopologyEventListener() {
+ @Override
+ public void onAppeared(ClusterNode appearedNode, LogicalTopologySnapshot newTopology) {
+ events.add(new Event(true, appearedNode, newTopology.version()));
+ }
+
+ @Override
+ public void onDisappeared(ClusterNode disappearedNode, LogicalTopologySnapshot newTopology) {
+ events.add(new Event(false, disappearedNode, newTopology.version()));
+ }
+ };
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @Test
+ void receivesLogicalTopologyEvents(TestInfo testInfo) throws Exception {
+ IgniteImpl entryNode = node(0);
+
+ entryNode.logicalTopologyService().addEventListener(listener);
+
+ // Checking that onAppeared() is received.
+ Ignite secondIgnite = startNode(1, testInfo);
+
+ assertTrue(waitForCondition(() -> !events.isEmpty(), 10_000));
+
+ assertThat(events, hasSize(1));
+
+ Event firstEvent = events.get(0);
+
+ assertTrue(firstEvent.appeared);
+ assertThat(firstEvent.node.name(), is(secondIgnite.name()));
+ assertThat(firstEvent.topologyVersion, is(2L));
+
+ // Checking that onDisappeared() is received.
+ stopNode(1, testInfo);
+
+ assertTrue(waitForCondition(() -> events.size() > 1, 10_000));
+
+ assertThat(events, hasSize(2));
+
+ Event secondEvent = events.get(1);
+
+ assertFalse(secondEvent.appeared);
+ assertThat(secondEvent.node.name(), is(secondIgnite.name()));
+ assertThat(secondEvent.topologyVersion, is(3L));
+ }
+
+ @Test
+ void receivesLogicalTopologyEventsFromRestart(TestInfo testInfo) throws Exception {
+ IgniteImpl entryNode = node(0);
+
+ Ignite secondIgnite = startNode(1, testInfo);
+
+ entryNode.logicalTopologyService().addEventListener(listener);
+
+ restartNode(1, testInfo);
+
+ assertTrue(waitForCondition(() -> events.size() >= 4, 10_000));
+
+ assertThat(events, hasSize(4));
+
+ Event leaveEvent = events.get(2);
+
+ assertFalse(leaveEvent.appeared);
+ assertThat(leaveEvent.node.name(), is(secondIgnite.name()));
+ assertThat(leaveEvent.topologyVersion, is(3L));
+
+ Event joinEvent = events.get(3);
+
+ assertTrue(joinEvent.appeared);
+ assertThat(joinEvent.node.name(), is(secondIgnite.name()));
+ assertThat(joinEvent.topologyVersion, is(4L));
+ }
+
+ private static class Event {
+ private final boolean appeared;
+ private final ClusterNode node;
+ private final long topologyVersion;
+
+ private Event(boolean appeared, ClusterNode node, long topologyVersion) {
+ this.appeared = appeared;
+ this.node = node;
+ this.topologyVersion = topologyVersion;
+ }
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
index 9a2d59cd3c..bde357dd6f 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/inmemory/ItRaftStorageVolatilityTest.java
@@ -62,7 +62,7 @@ class ItRaftStorageVolatilityTest extends AbstractClusterIntegrationTest {
private static final String TABLE_NAME = "test";
@Override
- protected int nodes() {
+ protected int initialNodes() {
return 1;
}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 58b4d7f4a3..68c83ee438 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
import org.apache.ignite.internal.cluster.management.rest.ClusterManagementRestFactory;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import org.apache.ignite.internal.component.RestAddressReporter;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.compute.ComputeComponent;
@@ -112,6 +113,7 @@ import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.LogicalTopologyService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkAddress;
@@ -209,6 +211,8 @@ public class IgniteImpl implements Ignite {
private final ClusterManagementGroupManager cmgMgr;
+ private final LogicalTopologyService logicalTopologyService;
+
/** Client handler module. */
private final ClientHandlerModule clientHandlerModule;
@@ -322,16 +326,18 @@ public class IgniteImpl implements Ignite {
// TODO: IGNITE-16841 - use common RocksDB instance to store cluster state as well.
clusterStateStorage = new RocksDbClusterStateStorage(workDir.resolve(CMG_DB_PATH));
- var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage);
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
cmgMgr = new ClusterManagementGroupManager(
vaultMgr,
clusterSvc,
raftMgr,
clusterStateStorage,
- logicalTopologyService
+ logicalTopology
);
+ logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgMgr);
+
metaStorageMgr = new MetaStorageManager(
vaultMgr,
clusterSvc,
@@ -809,4 +815,9 @@ public class IgniteImpl implements Ignite {
public DistributionZoneManager distributionZoneManager() {
return distributionZoneManager;
}
+
+ @TestOnly
+ public LogicalTopologyService logicalTopologyService() {
+ return logicalTopologyService;
+ }
}