You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sa...@apache.org on 2023/02/14 12:43:23 UTC
[ignite-3] branch main updated: IGNITE-18554 Remove MetaStorage learners on topology events (#1542)
This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new dc56fe75cf IGNITE-18554 Remove MetaStorage learners on topology events (#1542)
dc56fe75cf is described below
commit dc56fe75cf4c29b608caa40b74cd249afe29fb9d
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Tue Feb 14 14:43:17 2023 +0200
IGNITE-18554 Remove MetaStorage learners on topology events (#1542)
---
.../cluster/management/ItClusterManagerTest.java | 2 +
.../management/ClusterManagementGroupManager.java | 23 +-
.../management/raft/CmgRaftGroupListener.java | 8 +-
.../cluster/management/raft/ValidationManager.java | 34 +-
modules/metastorage/build.gradle | 2 +
.../impl/ItMetaStorageManagerImplTest.java | 13 +-
.../impl/ItMetaStorageMultipleNodesTest.java | 345 +++++++++++++++++++++
.../metastorage/impl/ItMetaStorageServiceTest.java | 35 ++-
.../metastorage/impl/ItMetaStorageWatchTest.java | 96 +++---
.../internal/metastorage/impl/EntryImpl.java | 3 +
.../metastorage/impl/MetaStorageManagerImpl.java | 133 +-------
.../impl/MetaStorageRaftGroupEventsListener.java | 230 ++++++++++++++
.../raft/client/TopologyAwareRaftGroupService.java | 4 +-
.../internal/raft/RaftGroupEventsListener.java | 18 +-
.../internal/raft/service/RaftGroupService.java | 8 +-
.../ignite/internal/raft/RaftGroupServiceImpl.java | 4 +-
.../ItDistributedConfigurationPropertiesTest.java | 6 +-
.../ItDistributedConfigurationStorageTest.java | 6 +-
.../storage/ItRebalanceDistributedTest.java | 6 +-
.../runner/app/ItIgniteNodeRestartTest.java | 6 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 +
21 files changed, 743 insertions(+), 240 deletions(-)
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
index aa252d828a..e295b7af8c 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -139,6 +140,7 @@ public class ItClusterManagerTest extends BaseItClusterManagementTest {
/**
* Tests a scenario when a node is restarted.
*/
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18795")
@Test
void testNodeRestart(TestInfo testInfo) throws Exception {
startCluster(2, testInfo);
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 32b827434d..d3f58bc4e8 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -58,10 +58,8 @@ import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
-import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.Status;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -522,7 +520,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
new RaftNodeId(CmgGroupId.INSTANCE, serverPeer),
raftConfiguration,
new CmgRaftGroupListener(clusterStateStorage, logicalTopology, this::onLogicalTopologyChanged),
- createCmgRaftGroupEventsListener()
+ this::onElectedAsLeader
)
.thenApply(service -> new CmgRaftService(service, clusterService, logicalTopology));
} catch (Exception e) {
@@ -549,25 +547,6 @@ public class ClusterManagementGroupManager implements IgniteComponent {
}
}
- private RaftGroupEventsListener createCmgRaftGroupEventsListener() {
- return new RaftGroupEventsListener() {
- @Override
- public void onLeaderElected(long term) {
- ClusterManagementGroupManager.this.onElectedAsLeader(term);
- }
-
- @Override
- public void onNewPeersConfigurationApplied(PeersAndLearners configuration) {
- // No-op.
- }
-
- @Override
- public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {
- // No-op.
- }
- };
- }
-
/**
* Starts the CMG Raft service using the given {@code state} and persists it to the local storage.
*/
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index 3500713e15..945625b555 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -185,11 +185,13 @@ public class CmgRaftGroupListener implements RaftGroupListener {
ClusterNode node = command.node().asClusterNode();
if (validationManager.isNodeValidated(node)) {
- logicalTopology.putNode(node);
+ validationManager.completeValidation(node);
- LOG.info("Node added to the logical topology [node={}]", node.name());
+ logicalTopology.putNode(node);
- validationManager.completeValidation(node);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Node added to the logical topology [node={}]", node.name());
+ }
return null;
} else {
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java
index 3fbc42e1a3..8b822c437e 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java
@@ -17,10 +17,11 @@
package org.apache.ignite.internal.cluster.management.raft;
+import static java.util.stream.Collectors.toSet;
+
import java.util.Collection;
+import java.util.Comparator;
import java.util.Set;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
import org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.cluster.management.ClusterTag;
import org.apache.ignite.internal.cluster.management.raft.commands.InitCmgStateCommand;
@@ -90,7 +91,7 @@ class ValidationManager {
}
/**
- * Validates a given node and issues a validation token.
+ * Validates a given node and saves it in the set of validated nodes.
*
* @return {@code null} in case of successful validation or a {@link ValidationErrorResponse} otherwise.
*/
@@ -134,16 +135,17 @@ class ValidationManager {
void removeValidatedNodes(Collection<ClusterNode> nodes) {
Set<String> validatedNodeIds = storage.getValidatedNodes().stream()
.map(ClusterNode::id)
- // Using a sorted collection to have a stable notification order.
- .collect(Collectors.toCollection(TreeSet::new));
+ .collect(toSet());
- nodes.forEach(node -> {
- if (validatedNodeIds.contains(node.id())) {
- storage.removeValidatedNode(node);
+ // Using a sorted stream to have a stable notification order.
+ nodes.stream()
+ .filter(node -> validatedNodeIds.contains(node.id()))
+ .sorted(Comparator.comparing(ClusterNode::id))
+ .forEach(node -> {
+ storage.removeValidatedNode(node);
- logicalTopology.onNodeInvalidated(node);
- }
- });
+ logicalTopology.onNodeInvalidated(node);
+ });
}
/**
@@ -152,6 +154,16 @@ class ValidationManager {
* @param node Node that wishes to join the logical topology.
*/
void completeValidation(ClusterNode node) {
+ // Remove all other versions of this node, if they were validated at some point, but not removed from the physical topology.
+ storage.getValidatedNodes().stream()
+ .filter(n -> n.name().equals(node.name()) && !n.id().equals(node.id()))
+ .sorted(Comparator.comparing(ClusterNode::id))
+ .forEach(nodeVersion -> {
+ storage.removeValidatedNode(nodeVersion);
+
+ logicalTopology.onNodeInvalidated(nodeVersion);
+ });
+
storage.removeValidatedNode(node);
}
}
diff --git a/modules/metastorage/build.gradle b/modules/metastorage/build.gradle
index 7202db5c39..197562dae0 100644
--- a/modules/metastorage/build.gradle
+++ b/modules/metastorage/build.gradle
@@ -52,6 +52,8 @@ dependencies {
integrationTestImplementation testFixtures(project(':ignite-raft'))
integrationTestImplementation testFixtures(project(':ignite-configuration'))
integrationTestImplementation testFixtures(project(':ignite-vault'))
+ integrationTestImplementation testFixtures(project(':ignite-metastorage'))
+ integrationTestImplementation testFixtures(project(':ignite-cluster-management'))
testFixturesImplementation project(':ignite-core')
testFixturesImplementation project(':ignite-rocksdb-common')
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index 9941d8b5cb..c2411ee654 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -33,12 +33,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Path;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -52,8 +52,7 @@ import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
@@ -72,9 +71,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
/**
* Integration tests for {@link MetaStorageManagerImpl}.
*/
-@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ConfigurationExtension.class)
-public class ItMetaStorageManagerImplTest {
+public class ItMetaStorageManagerImplTest extends IgniteAbstractTest {
private VaultManager vaultManager;
private ClusterService clusterService;
@@ -86,8 +84,7 @@ public class ItMetaStorageManagerImplTest {
private MetaStorageManagerImpl metaStorageManager;
@BeforeEach
- void setUp(TestInfo testInfo, @WorkDirectory Path workDir, @InjectConfiguration RaftConfiguration raftConfiguration)
- throws NodeStoppingException {
+ void setUp(TestInfo testInfo, @InjectConfiguration RaftConfiguration raftConfiguration) throws NodeStoppingException {
var addr = new NetworkAddress("localhost", 10_000);
clusterService = clusterService(testInfo, addr.port(), new StaticNodeFinder(List.of(addr)));
@@ -107,6 +104,7 @@ public class ItMetaStorageManagerImplTest {
vaultManager,
clusterService,
cmgManager,
+ mock(LogicalTopologyService.class),
raftManager,
storage
);
@@ -266,6 +264,7 @@ public class ItMetaStorageManagerImplTest {
vaultManager,
clusterService,
cmgManager,
+ mock(LogicalTopologyService.class),
raftManager,
storage
);
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
new file mode 100644
index 0000000000..f7e1787f0c
--- /dev/null
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
+import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftManager;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.DefaultMessagingService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.utils.ClusterServiceTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests for scenarios when Meta Storage nodes join and leave a cluster.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class ItMetaStorageMultipleNodesTest extends IgniteAbstractTest {
+ @InjectConfiguration
+ private static RaftConfiguration raftConfiguration;
+
+ @InjectConfiguration("mock.failoverTimeout=0")
+ private static ClusterManagementConfiguration cmgConfiguration;
+
+ private static class Node {
+ private final VaultManager vaultManager;
+
+ private final ClusterService clusterService;
+
+ private final RaftManager raftManager;
+
+ private final ClusterStateStorage clusterStateStorage = new TestClusterStateStorage();
+
+ private final ClusterManagementGroupManager cmgManager;
+
+ private final MetaStorageManagerImpl metaStorageManager;
+
+ Node(ClusterService clusterService, Path dataPath) {
+ this.clusterService = clusterService;
+
+ this.vaultManager = new VaultManager(new InMemoryVaultService());
+
+ Path basePath = dataPath.resolve(name());
+
+ this.raftManager = new Loza(
+ clusterService,
+ raftConfiguration,
+ basePath.resolve("raft"),
+ new HybridClockImpl()
+ );
+
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+ this.cmgManager = new ClusterManagementGroupManager(
+ vaultManager,
+ clusterService,
+ raftManager,
+ clusterStateStorage,
+ logicalTopology,
+ cmgConfiguration
+ );
+
+ this.metaStorageManager = new MetaStorageManagerImpl(
+ vaultManager,
+ clusterService,
+ cmgManager,
+ new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
+ raftManager,
+ new SimpleInMemoryKeyValueStorage(name())
+ );
+ }
+
+ void start() throws NodeStoppingException {
+ List<IgniteComponent> components =
+ List.of(vaultManager, clusterService, raftManager, clusterStateStorage, cmgManager, metaStorageManager);
+
+ components.forEach(IgniteComponent::start);
+
+ metaStorageManager.deployWatches();
+ }
+
+ String name() {
+ return clusterService.localConfiguration().getName();
+ }
+
+ void stop() throws Exception {
+ List<IgniteComponent> components =
+ List.of(metaStorageManager, cmgManager, raftManager, clusterStateStorage, clusterService, vaultManager);
+
+ Stream<AutoCloseable> beforeNodeStop = components.stream().map(c -> c::beforeNodeStop);
+
+ Stream<AutoCloseable> nodeStop = components.stream().map(c -> c::stop);
+
+ IgniteUtils.closeAll(Stream.concat(beforeNodeStop, nodeStop));
+ }
+
+ CompletableFuture<Set<String>> getMetaStorageLearners() {
+ return metaStorageManager
+ .metaStorageServiceFuture()
+ .thenApply(MetaStorageServiceImpl::raftGroupService)
+ .thenCompose(service -> service.refreshMembers(false).thenApply(v -> service.learners()))
+ .thenApply(learners -> learners.stream().map(Peer::consistentId).collect(toSet()));
+ }
+
+ void startDroppingMessagesTo(Node recipient, Class<? extends NetworkMessage> msgType) {
+ ((DefaultMessagingService) clusterService.messagingService())
+ .dropMessages((recipientConsistentId, message) ->
+ recipient.name().equals(recipientConsistentId) && msgType.isInstance(message));
+ }
+
+ void stopDroppingMessages() {
+ ((DefaultMessagingService) clusterService.messagingService()).stopDroppingMessages();
+ }
+ }
+
+ private final List<Node> nodes = new ArrayList<>();
+
+ private Node startNode(TestInfo testInfo) throws NodeStoppingException {
+ var nodeFinder = new StaticNodeFinder(List.of(new NetworkAddress("localhost", 10_000)));
+
+ ClusterService clusterService = ClusterServiceTestUtils.clusterService(testInfo, 10_000 + nodes.size(), nodeFinder);
+
+ var node = new Node(clusterService, workDir);
+
+ node.start();
+
+ nodes.add(node);
+
+ return node;
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(nodes.stream().map(node -> node::stop));
+ }
+
+ /**
+ * Tests that an incoming node gets registered as a Learner and receives Meta Storage updates.
+ */
+ @Test
+ void testLearnerJoin(TestInfo testInfo) throws NodeStoppingException {
+ Node firstNode = startNode(testInfo);
+
+ firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test");
+
+ var key = new ByteArray("foo");
+ byte[] value = "bar".getBytes(StandardCharsets.UTF_8);
+
+ CompletableFuture<Boolean> invokeFuture = firstNode.metaStorageManager.invoke(notExists(key), put(key, value), noop());
+
+ assertThat(invokeFuture, willBe(true));
+
+ Node secondNode = startNode(testInfo);
+
+ // Check that reading remote data works correctly.
+ assertThat(secondNode.metaStorageManager.get(key).thenApply(Entry::value), willBe(value));
+
+ // Check that the new node will receive events.
+ var awaitFuture = new CompletableFuture<EntryEvent>();
+
+ secondNode.metaStorageManager.registerExactWatch(key, new WatchListener() {
+ @Override
+ public void onUpdate(WatchEvent event) {
+ // Skip the first update event, because it's not guaranteed to arrive here (insert may have happened before the watch was
+ // registered).
+ if (event.revision() != 1) {
+ awaitFuture.complete(event.entryEvent());
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ awaitFuture.completeExceptionally(e);
+ }
+ });
+
+ byte[] newValue = "baz".getBytes(StandardCharsets.UTF_8);
+
+ invokeFuture = firstNode.metaStorageManager.invoke(revision(key).eq(1), put(key, newValue), noop());
+
+ assertThat(invokeFuture, willBe(true));
+
+ var expectedEntryEvent = new EntryEvent(
+ new EntryImpl(key.bytes(), value, 1, 1),
+ new EntryImpl(key.bytes(), newValue, 2, 2)
+ );
+
+ assertThat(awaitFuture, willBe(expectedEntryEvent));
+
+ // Check that the second node has been registered as a learner.
+ assertThat(firstNode.getMetaStorageLearners(), willBe(Set.of(secondNode.name())));
+ }
+
+ /**
+ * Tests a case when a node leaves the physical topology without entering the logical topology.
+ */
+ @Test
+ void testLearnerLeavePhysicalTopology(TestInfo testInfo) throws Exception {
+ Node firstNode = startNode(testInfo);
+ Node secondNode = startNode(testInfo);
+
+ firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test");
+
+ // Try reading some data to make sure that Raft has been configured correctly.
+ assertThat(secondNode.metaStorageManager.get(new ByteArray("test")).thenApply(Entry::value), willBe(nullValue()));
+
+ // Check that the second node has been registered as a learner.
+ waitForCondition(() -> firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 10_000);
+
+ // Stop the second node.
+ secondNode.stop();
+
+ nodes.remove(1);
+
+ assertTrue(waitForCondition(() -> firstNode.getMetaStorageLearners().join().isEmpty(), 10_000));
+ }
+
+ /**
+ * Tests a case when a node leaves the physical topology without entering the logical topology.
+ */
+ @Test
+ void testLearnerLeaveLogicalTopology(TestInfo testInfo) throws Exception {
+ Node firstNode = startNode(testInfo);
+ Node secondNode = startNode(testInfo);
+
+ firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test");
+
+ assertThat(allOf(firstNode.cmgManager.onJoinReady(), secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
+
+ CompletableFuture<Set<String>> logicalTopologyNodes = firstNode.cmgManager
+ .logicalTopology()
+ .thenApply(logicalTopology -> logicalTopology.nodes().stream().map(ClusterNode::name).collect(toSet()));
+
+ assertThat(logicalTopologyNodes, willBe(Set.of(firstNode.name(), secondNode.name())));
+
+ // Try reading some data to make sure that Raft has been configured correctly.
+ assertThat(secondNode.metaStorageManager.get(new ByteArray("test")).thenApply(Entry::value), willBe(nullValue()));
+
+ // Check that the second node has been registered as a learner.
+ waitForCondition(() -> firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 10_000);
+
+ // Stop the second node.
+ secondNode.stop();
+
+ nodes.remove(1);
+
+ assertTrue(waitForCondition(() -> firstNode.getMetaStorageLearners().join().isEmpty(), 10_000));
+ }
+
+ /**
+ * Tests a scenario when a node gets kicked out of the Logical Topology due to a network partition. It should then be able to join
+ * the Meta Storage Raft group successfully.
+ */
+ @Test
+ void testLearnerLeaveAndJoinBecauseOfNetworkPartition(TestInfo testInfo) throws Exception {
+ Node firstNode = startNode(testInfo);
+ Node secondNode = startNode(testInfo);
+
+ firstNode.cmgManager.initCluster(List.of(firstNode.name()), List.of(firstNode.name()), "test");
+
+ assertThat(allOf(firstNode.cmgManager.onJoinReady(), secondNode.cmgManager.onJoinReady()), willCompleteSuccessfully());
+
+ CompletableFuture<Set<String>> logicalTopologyNodes = firstNode.cmgManager
+ .logicalTopology()
+ .thenApply(logicalTopology -> logicalTopology.nodes().stream().map(ClusterNode::name).collect(toSet()));
+
+ assertThat(logicalTopologyNodes, willBe(Set.of(firstNode.name(), secondNode.name())));
+
+ waitForCondition(() -> firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 10_000);
+
+ // Make first node lose the second node from the Physical and Logical topologies.
+ firstNode.startDroppingMessagesTo(secondNode, ScaleCubeMessage.class);
+
+ assertTrue(waitForCondition(() -> firstNode.getMetaStorageLearners().join().isEmpty(), 10_000));
+
+ // Make the first node discover the second node again. The second node should be added as a Meta Storage Learner again.
+ firstNode.stopDroppingMessages();
+
+ assertTrue(waitForCondition(() -> firstNode.getMetaStorageLearners().join().equals(Set.of(secondNode.name())), 10_000));
+ }
+}
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
index ca9631a110..2b25b4f2de 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java
@@ -60,8 +60,10 @@ import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -854,11 +856,9 @@ public class ItMetaStorageServiceTest {
/**
* Tests {@link MetaStorageService#closeCursors(String)}.
- *
- * @throws Exception If failed.
*/
@Test
- public void testCursorsCleanup() throws Exception {
+ public void testCursorsCleanup() throws InterruptedException {
startNodes(2);
Node leader = nodes.get(0);
@@ -873,14 +873,23 @@ public class ItMetaStorageServiceTest {
return cursor;
});
- class MockSubscriber implements Subscriber<Entry> {
- private Subscription subscription;
+ var subscriptionLatch = new CountDownLatch(3);
+ var closeCursorLatch = new CountDownLatch(1);
+ class MockSubscriber implements Subscriber<Entry> {
private final CompletableFuture<Entry> result = new CompletableFuture<>();
@Override
public void onSubscribe(Subscription subscription) {
- this.subscription = subscription;
+ subscriptionLatch.countDown();
+
+ try {
+ assertTrue(closeCursorLatch.await(10, TimeUnit.SECONDS));
+ } catch (Throwable e) {
+ onError(e);
+ }
+
+ subscription.request(1);
}
@Override
@@ -895,6 +904,7 @@ public class ItMetaStorageServiceTest {
@Override
public void onComplete() {
+ result.completeExceptionally(new AssertionError("No items produced"));
}
}
@@ -908,11 +918,16 @@ public class ItMetaStorageServiceTest {
learner.metaStorageService.range(new ByteArray(EXPECTED_RESULT_ENTRY.key()), null).subscribe(node1Subscriber0);
- leader.metaStorageService.closeCursors(leader.clusterService.topologyService().localMember().id()).get();
+ // Wait for all cursors to be registered on the server side.
+ assertTrue(subscriptionLatch.await(10, TimeUnit.SECONDS));
+
+ String leaderId = leader.clusterService.topologyService().localMember().id();
+
+ CompletableFuture<Void> closeCursorsFuture = leader.metaStorageService.closeCursors(leaderId);
+
+ assertThat(closeCursorsFuture, willCompleteSuccessfully());
- node0Subscriber0.subscription.request(1);
- node0Subscriber1.subscription.request(1);
- node1Subscriber0.subscription.request(1);
+ closeCursorLatch.countDown();
assertThat(node0Subscriber0.result, willFailFast(NoSuchElementException.class));
assertThat(node0Subscriber1.result, willFailFast(NoSuchElementException.class));
diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index b8d88f453f..635175d0ed 100644
--- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -24,15 +24,12 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -40,9 +37,14 @@ import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
+import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
@@ -50,12 +52,11 @@ import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterService;
@@ -71,55 +72,68 @@ import org.junit.jupiter.api.extension.ExtendWith;
/**
* Tests for Meta Storage Watches.
*/
-@ExtendWith(WorkDirectoryExtension.class)
@ExtendWith(ConfigurationExtension.class)
-public class ItMetaStorageWatchTest {
+public class ItMetaStorageWatchTest extends IgniteAbstractTest {
private static class Node {
- private final ClusterService clusterService;
+ private final List<IgniteComponent> components = new ArrayList<>();
- private final RaftManager raftManager;
+ private final ClusterService clusterService;
private final MetaStorageManager metaStorageManager;
- private final CompletableFuture<Set<String>> metaStorageNodesFuture = new CompletableFuture<>();
+ private final ClusterManagementGroupManager cmgManager;
+
+ Node(ClusterService clusterService, Path dataPath) {
+ var vaultManager = new VaultManager(new InMemoryVaultService());
+
+ components.add(vaultManager);
- Node(ClusterService clusterService, RaftConfiguration raftConfiguration, Path dataPath) {
this.clusterService = clusterService;
+ components.add(clusterService);
+
Path basePath = dataPath.resolve(name());
- this.raftManager = new Loza(
+ var raftManager = new Loza(
clusterService,
raftConfiguration,
basePath.resolve("raft"),
new HybridClockImpl()
);
- var vaultManager = mock(VaultManager.class);
+ components.add(raftManager);
- when(vaultManager.get(any())).thenReturn(CompletableFuture.completedFuture(null));
- when(vaultManager.put(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
- when(vaultManager.putAll(any())).thenReturn(CompletableFuture.completedFuture(null));
+ var clusterStateStorage = new TestClusterStateStorage();
- var cmgManager = mock(ClusterManagementGroupManager.class);
+ components.add(clusterStateStorage);
- when(cmgManager.metaStorageNodes()).thenReturn(metaStorageNodesFuture);
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
+
+ this.cmgManager = new ClusterManagementGroupManager(
+ vaultManager,
+ clusterService,
+ raftManager,
+ clusterStateStorage,
+ logicalTopology,
+ cmgConfiguration
+ );
+
+ components.add(cmgManager);
this.metaStorageManager = new MetaStorageManagerImpl(
vaultManager,
clusterService,
cmgManager,
+ new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
raftManager,
new RocksDbKeyValueStorage(name(), basePath.resolve("storage"))
);
- }
- void start(Set<String> metaStorageNodes) {
- clusterService.start();
- raftManager.start();
- metaStorageManager.start();
+ components.add(metaStorageManager);
+ }
- metaStorageNodesFuture.complete(metaStorageNodes);
+ void start() {
+ components.forEach(IgniteComponent::start);
}
String name() {
@@ -127,9 +141,11 @@ public class ItMetaStorageWatchTest {
}
void stop() throws Exception {
- Stream<AutoCloseable> beforeNodeStop = Stream.of(metaStorageManager, raftManager, clusterService).map(c -> c::beforeNodeStop);
+ Collections.reverse(components);
- Stream<AutoCloseable> nodeStop = Stream.of(metaStorageManager, raftManager, clusterService).map(c -> c::stop);
+ Stream<AutoCloseable> beforeNodeStop = components.stream().map(c -> c::beforeNodeStop);
+
+ Stream<AutoCloseable> nodeStop = components.stream().map(c -> c::stop);
IgniteUtils.closeAll(Stream.concat(beforeNodeStop, nodeStop));
}
@@ -137,11 +153,11 @@ public class ItMetaStorageWatchTest {
private TestInfo testInfo;
- @WorkDirectory
- private Path workDir;
+ @InjectConfiguration
+ private static RaftConfiguration raftConfiguration;
@InjectConfiguration
- private RaftConfiguration raftConfiguration;
+ private static ClusterManagementConfiguration cmgConfiguration;
private final List<Node> nodes = new ArrayList<>();
@@ -155,16 +171,20 @@ public class ItMetaStorageWatchTest {
IgniteUtils.closeAll(nodes.stream().map(node -> node::stop));
}
- private void startNodes(int amount) {
- List<NetworkAddress> localAddresses = findLocalAddresses(10_000, 10_000 + nodes.size() + amount);
+ private void startCluster(int size) throws NodeStoppingException {
+ List<NetworkAddress> localAddresses = findLocalAddresses(10_000, 10_000 + nodes.size() + size);
var nodeFinder = new StaticNodeFinder(localAddresses);
localAddresses.stream()
.map(addr -> ClusterServiceTestUtils.clusterService(testInfo, addr.port(), nodeFinder))
- .forEach(clusterService -> nodes.add(new Node(clusterService, raftConfiguration, workDir)));
+ .forEach(clusterService -> nodes.add(new Node(clusterService, workDir)));
+
+ nodes.parallelStream().forEach(Node::start);
+
+ String name = nodes.get(0).name();
- nodes.parallelStream().forEach(node -> node.start(Set.of(nodes.get(0).name())));
+ nodes.get(0).cmgManager.initCluster(List.of(name), List.of(name), "test");
}
@Test
@@ -229,7 +249,7 @@ public class ItMetaStorageWatchTest {
private void testWatches(BiConsumer<Node, CountDownLatch> registerWatchAction) throws Exception {
int numNodes = 3;
- startNodes(numNodes);
+ startCluster(numNodes);
var latch = new CountDownLatch(numNodes);
@@ -256,10 +276,10 @@ public class ItMetaStorageWatchTest {
* Tests that metastorage missed metastorage events are replayed after deploying watches.
*/
@Test
- void testReplayUpdates() throws InterruptedException {
+ void testReplayUpdates() throws Exception {
int numNodes = 3;
- startNodes(numNodes);
+ startCluster(numNodes);
var exactLatch = new CountDownLatch(numNodes);
var prefixLatch = new CountDownLatch(numNodes);
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/EntryImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/EntryImpl.java
index 123b022779..130c38c58d 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/EntryImpl.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/EntryImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.impl;
import java.util.Arrays;
import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.jetbrains.annotations.Nullable;
@@ -46,9 +47,11 @@ public final class EntryImpl implements Entry {
private static final long serialVersionUID = 3636551347117181271L;
/** Key. */
+ @IgniteToStringInclude
private final byte[] key;
/** Value. */
+ @IgniteToStringInclude
private final byte @Nullable [] val;
/** Revision. */
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 8f0bea21d6..7a0921e64b 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -17,14 +17,11 @@
package org.apache.ignite.internal.metastorage.impl;
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -32,6 +29,7 @@ import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.Entry;
@@ -52,7 +50,6 @@ import org.apache.ignite.internal.raft.PeersAndLearners;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.RaftNodeId;
-import org.apache.ignite.internal.raft.Status;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -61,8 +58,6 @@ import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.TopologyEventHandler;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -94,6 +89,8 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
private final ClusterManagementGroupManager cmgMgr;
+ private final LogicalTopologyService logicalTopologyService;
+
/** Meta storage service. */
private final CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut = new CompletableFuture<>();
@@ -123,6 +120,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
VaultManager vaultMgr,
ClusterService clusterService,
ClusterManagementGroupManager cmgMgr,
+ LogicalTopologyService logicalTopologyService,
RaftManager raftMgr,
KeyValueStorage storage
) {
@@ -130,6 +128,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
this.clusterService = clusterService;
this.raftMgr = raftMgr;
this.cmgMgr = cmgMgr;
+ this.logicalTopologyService = logicalTopologyService;
this.storage = storage;
}
@@ -153,26 +152,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
new RaftNodeId(MetastorageGroupId.INSTANCE, localPeer),
configuration,
new MetaStorageListener(storage),
- new RaftGroupEventsListener() {
- @Override
- public void onLeaderElected(long term) {
- // TODO: add listener to remove learners when they leave Logical Topology
- // see https://issues.apache.org/jira/browse/IGNITE-18554
-
- registerTopologyEventListener();
-
- // Update the configuration immediately in case we missed some updates.
- addLearners(clusterService.topologyService().allMembers());
- }
-
- @Override
- public void onNewPeersConfigurationApplied(PeersAndLearners configuration) {
- }
-
- @Override
- public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {
- }
- }
+ new MetaStorageRaftGroupEventsListener(busyLock, clusterService, logicalTopologyService, metaStorageSvcFut)
);
} else {
PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageNodes, Set.of(thisNodeName));
@@ -195,88 +175,6 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
return raftServiceFuture.thenApply(raftService -> new MetaStorageServiceImpl(raftService, busyLock, thisNode));
}
- private void registerTopologyEventListener() {
- clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
- @Override
- public void onAppeared(ClusterNode member) {
- addLearners(List.of(member));
- }
-
- @Override
- public void onDisappeared(ClusterNode member) {
- metaStorageSvcFut.thenAccept(service -> isCurrentNodeLeader(service.raftGroupService())
- .thenAccept(isLeader -> {
- if (isLeader) {
- service.closeCursors(member.id());
- }
- }));
- }
- });
- }
-
- private void addLearners(Collection<ClusterNode> nodes) {
- if (!busyLock.enterBusy()) {
- LOG.info("Skipping Meta Storage configuration update because the node is stopping");
-
- return;
- }
-
- try {
- metaStorageSvcFut
- .thenApply(MetaStorageServiceImpl::raftGroupService)
- .thenCompose(raftService -> isCurrentNodeLeader(raftService)
- .thenCompose(isLeader -> isLeader ? addLearners(raftService, nodes) : completedFuture(null)))
- .whenComplete((v, e) -> {
- if (e != null) {
- LOG.error("Unable to change peers on topology update", e);
- }
- });
- } finally {
- busyLock.leaveBusy();
- }
- }
-
- private CompletableFuture<Void> addLearners(RaftGroupService raftService, Collection<ClusterNode> nodes) {
- if (!busyLock.enterBusy()) {
- LOG.info("Skipping Meta Storage configuration update because the node is stopping");
-
- return completedFuture(null);
- }
-
- try {
- Set<String> peers = raftService.peers().stream()
- .map(Peer::consistentId)
- .collect(toSet());
-
- Set<String> learners = nodes.stream()
- .map(ClusterNode::name)
- .filter(name -> !peers.contains(name))
- .collect(toSet());
-
- if (learners.isEmpty()) {
- return completedFuture(null);
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("New Meta Storage learners detected: " + learners);
- }
-
- PeersAndLearners newConfiguration = PeersAndLearners.fromConsistentIds(peers, learners);
-
- return raftService.addLearners(newConfiguration.learners());
- } finally {
- busyLock.leaveBusy();
- }
- }
-
- private CompletableFuture<Boolean> isCurrentNodeLeader(RaftGroupService raftService) {
- String name = clusterService.topologyService().localMember().name();
-
- return raftService.refreshLeader()
- .thenApply(v -> raftService.leader().consistentId().equals(name));
- }
-
- /** {@inheritDoc} */
@Override
public void start() {
this.appliedRevision = readAppliedRevision().join();
@@ -304,7 +202,6 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
});
}
- /** {@inheritDoc} */
@Override
public void stop() throws Exception {
if (!isStopped.compareAndSet(false, true)) {
@@ -404,7 +301,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
*
* @see MetaStorageService#getAll(Set, long)
*/
- public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound) {
+ public CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
@@ -421,7 +318,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
*
* @see MetaStorageService#put(ByteArray, byte[])
*/
- public @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte[] val) {
+ public CompletableFuture<Void> put(ByteArray key, byte[] val) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
@@ -438,7 +335,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
*
* @see MetaStorageService#getAndPut(ByteArray, byte[])
*/
- public @NotNull CompletableFuture<Entry> getAndPut(@NotNull ByteArray key, byte[] val) {
+ public CompletableFuture<Entry> getAndPut(ByteArray key, byte[] val) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
@@ -455,7 +352,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
*
* @see MetaStorageService#putAll(Map)
*/
- public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) {
+ public CompletableFuture<Void> putAll(Map<ByteArray, byte[]> vals) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
@@ -472,7 +369,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
*
* @see MetaStorageService#getAndPutAll(Map)
*/
- public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndPutAll(@NotNull Map<ByteArray, byte[]> vals) {
+ public CompletableFuture<Map<ByteArray, Entry>> getAndPutAll(Map<ByteArray, byte[]> vals) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
@@ -489,7 +386,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
*
* @see MetaStorageService#remove(ByteArray)
*/
- public @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key) {
+ public CompletableFuture<Void> remove(ByteArray key) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
@@ -506,7 +403,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
*
* @see MetaStorageService#getAndRemove(ByteArray)
*/
- public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull ByteArray key) {
+ public CompletableFuture<Entry> getAndRemove(ByteArray key) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
@@ -523,7 +420,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
*
* @see MetaStorageService#removeAll(Set)
*/
- public @NotNull CompletableFuture<Void> removeAll(@NotNull Set<ByteArray> keys) {
+ public CompletableFuture<Void> removeAll(Set<ByteArray> keys) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
@@ -540,7 +437,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager {
*
* @see MetaStorageService#getAndRemoveAll(Set)
*/
- public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Set<ByteArray> keys) {
+ public CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(Set<ByteArray> keys) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new NodeStoppingException());
}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java
new file mode 100644
index 0000000000..9fe455fff4
--- /dev/null
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageRaftGroupEventsListener.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.stream.Collectors.toSet;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.PeersAndLearners;
+import org.apache.ignite.internal.raft.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Raft Group Events listener that registers Logical Topology listener for updating the list of Meta Storage Raft group listeners.
+ */
+public class MetaStorageRaftGroupEventsListener implements RaftGroupEventsListener {
+ private static final IgniteLogger LOG = Loggers.forClass(MetaStorageManagerImpl.class);
+
+ private final IgniteSpinBusyLock busyLock;
+
+ private final String nodeName;
+
+ private final LogicalTopologyService logicalTopologyService;
+
+ private final CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut;
+
+ /**
+ * Future required to enable serialized processing of topology events.
+ *
+ * <p>While Logical Topology events are linearized, we usually start a bunch of async operations inside the event listener and need
+ * them to finish in the same order.
+ *
+ * <p>Multi-threaded access is guarded by {@code serializationFutureMux}.
+ */
+ private CompletableFuture<Void> serializationFuture = null;
+
+ private final Object serializationFutureMux = new Object();
+
+ MetaStorageRaftGroupEventsListener(
+ IgniteSpinBusyLock busyLock,
+ ClusterService clusterService,
+ LogicalTopologyService logicalTopologyService,
+ CompletableFuture<MetaStorageServiceImpl> metaStorageSvcFut
+ ) {
+ this.busyLock = busyLock;
+ this.nodeName = clusterService.localConfiguration().getName();
+ this.logicalTopologyService = logicalTopologyService;
+ this.metaStorageSvcFut = metaStorageSvcFut;
+ }
+
+ @Override
+ public void onLeaderElected(long term) {
+ synchronized (serializationFutureMux) {
+ registerTopologyEventListeners();
+
+ // Update learner configuration (in case we missed some topology updates) and initialize the serialization future.
+ serializationFuture = executeIfLeaderImpl(this::resetLearners);
+ }
+ }
+
+ private void registerTopologyEventListeners() {
+ logicalTopologyService.addEventListener(new LogicalTopologyEventListener() {
+ @Override
+ public void onNodeValidated(ClusterNode validatedNode) {
+ executeIfLeader((service, term) -> addLearner(service.raftGroupService(), validatedNode));
+ }
+
+ @Override
+ public void onNodeInvalidated(ClusterNode invalidatedNode) {
+ executeIfLeader((service, term) -> {
+ CompletableFuture<Void> closeCursorsFuture = service.closeCursors(invalidatedNode.id())
+ .exceptionally(e -> {
+ LOG.error("Unable to close cursor for " + invalidatedNode, e);
+
+ return null;
+ });
+
+ CompletableFuture<Void> removeLearnersFuture = removeLearner(service.raftGroupService(), invalidatedNode);
+
+ return allOf(closeCursorsFuture, removeLearnersFuture);
+ });
+ }
+
+ @Override
+ public void onNodeLeft(ClusterNode leftNode, LogicalTopologySnapshot newTopology) {
+ onNodeInvalidated(leftNode);
+ }
+
+ @Override
+ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
+ executeIfLeader(MetaStorageRaftGroupEventsListener.this::resetLearners);
+ }
+ });
+ }
+
+ @FunctionalInterface
+ private interface OnLeaderAction {
+ CompletableFuture<Void> apply(MetaStorageServiceImpl service, long term);
+ }
+
+ /**
+ * Executes the given action if the current node is the Meta Storage leader.
+ */
+ private void executeIfLeader(OnLeaderAction action) {
+ if (!busyLock.enterBusy()) {
+ LOG.info("Skipping Meta Storage configuration update because the node is stopping");
+
+ return;
+ }
+
+ try {
+ synchronized (serializationFutureMux) {
+ // We are definitely not a leader if the serialization future has not been initialized.
+ if (serializationFuture == null) {
+ return;
+ }
+
+ serializationFuture = serializationFuture
+ // we don't care about exceptions here, they should be logged independently
+ .handle((v, e) -> executeIfLeaderImpl(action))
+ .thenCompose(Function.identity());
+ }
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private CompletableFuture<Void> executeIfLeaderImpl(OnLeaderAction action) {
+ return metaStorageSvcFut.thenCompose(service -> service.raftGroupService().refreshAndGetLeaderWithTerm()
+ .thenCompose(leaderWithTerm -> {
+ String leaderName = leaderWithTerm.leader().consistentId();
+
+ return leaderName.equals(nodeName) ? action.apply(service, leaderWithTerm.term()) : completedFuture(null);
+ }));
+ }
+
+ private CompletableFuture<Void> addLearner(RaftGroupService raftService, ClusterNode learner) {
+ return updateConfigUnderLock(() -> isPeer(raftService, learner)
+ ? completedFuture(null)
+ : raftService.addLearners(List.of(new Peer(learner.name()))));
+ }
+
+ private static boolean isPeer(RaftGroupService raftService, ClusterNode node) {
+ return raftService.peers().stream().anyMatch(peer -> peer.consistentId().equals(node.name()));
+ }
+
+ private CompletableFuture<Void> removeLearner(RaftGroupService raftService, ClusterNode learner) {
+ return updateConfigUnderLock(() -> logicalTopologyService.validatedNodesOnLeader()
+ .thenCompose(validatedNodes -> updateConfigUnderLock(() -> {
+ if (isPeer(raftService, learner)) {
+ return completedFuture(null);
+ }
+
+ // Due to possible races, we can have multiple versions of the same node in the validated set. We only remove
+ // a learner if there are no such versions left.
+ if (validatedNodes.stream().anyMatch(n -> n.name().equals(learner.name()))) {
+ return completedFuture(null);
+ }
+
+ return raftService.removeLearners(List.of(new Peer(learner.name())));
+ })));
+ }
+
+ private CompletableFuture<Void> resetLearners(MetaStorageServiceImpl service, long term) {
+ return updateConfigUnderLock(() -> logicalTopologyService.validatedNodesOnLeader()
+ .thenCompose(validatedNodes -> updateConfigUnderLock(() -> {
+ RaftGroupService raftService = service.raftGroupService();
+
+ Set<String> peers = raftService.peers().stream().map(Peer::consistentId).collect(toSet());
+
+ Set<String> learners = validatedNodes.stream()
+ .map(ClusterNode::name)
+ .filter(name -> !peers.contains(name))
+ .collect(toSet());
+
+ PeersAndLearners newPeerConfiguration = PeersAndLearners.fromConsistentIds(peers, learners);
+
+ // We can't use 'resetLearners' call here because it does not support empty lists of learners.
+ return raftService.changePeersAsync(newPeerConfiguration, term);
+ })));
+ }
+
+ private CompletableFuture<Void> updateConfigUnderLock(Supplier<CompletableFuture<Void>> action) {
+ if (!busyLock.enterBusy()) {
+ LOG.info("Skipping Meta Storage configuration update because the node is stopping");
+
+ return completedFuture(null);
+ }
+
+ try {
+ return action.get()
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.error("Unable to change peers on topology update", e);
+ }
+ });
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+}
diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
index d37dd7e70f..47d16c65cf 100644
--- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
+++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/raft/client/TopologyAwareRaftGroupService.java
@@ -367,12 +367,12 @@ public class TopologyAwareRaftGroupService implements RaftGroupService {
}
@Override
- public CompletableFuture<Void> removeLearners(List<Peer> learners) {
+ public CompletableFuture<Void> removeLearners(Collection<Peer> learners) {
return raftClient.removeLearners(learners);
}
@Override
- public CompletableFuture<Void> resetLearners(List<Peer> learners) {
+ public CompletableFuture<Void> resetLearners(Collection<Peer> learners) {
return raftClient.resetLearners(learners);
}
diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
index 6647bfeb5c..fe9be4c453 100644
--- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
+++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/RaftGroupEventsListener.java
@@ -33,7 +33,7 @@ public interface RaftGroupEventsListener {
*
* @param configuration New Raft group configuration.
*/
- void onNewPeersConfigurationApplied(PeersAndLearners configuration);
+ default void onNewPeersConfigurationApplied(PeersAndLearners configuration) {}
/**
* Invoked on the leader if membership reconfiguration failed, because of {@link Status}.
@@ -42,22 +42,10 @@ public interface RaftGroupEventsListener {
* @param configuration Configuration that failed to be applied.
* @param term Raft term of the current leader.
*/
- void onReconfigurationError(Status status, PeersAndLearners configuration, long term);
+ default void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {}
/**
* No-op raft group events listener.
*/
- RaftGroupEventsListener noopLsnr = new RaftGroupEventsListener() {
- /** {@inheritDoc} */
- @Override
- public void onLeaderElected(long term) { }
-
- /** {@inheritDoc} */
- @Override
- public void onNewPeersConfigurationApplied(PeersAndLearners configuration) { }
-
- /** {@inheritDoc} */
- @Override
- public void onReconfigurationError(Status status, PeersAndLearners configuration, long term) {}
- };
+ RaftGroupEventsListener noopLsnr = term -> {};
}
diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
index 72eeaa453d..e86f83e994 100644
--- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
+++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupService.java
@@ -181,10 +181,10 @@ public interface RaftGroupService {
*
* <p>This operation is executed on a group leader.
*
- * @param learners List of learners.
+ * @param learners Collection of learners.
* @return A future.
*/
- CompletableFuture<Void> removeLearners(List<Peer> learners);
+ CompletableFuture<Void> removeLearners(Collection<Peer> learners);
/**
* Set learners of the raft group to needed list of learners.
@@ -194,10 +194,10 @@ public interface RaftGroupService {
*
* <p>This operation is executed on a group leader.
*
- * @param learners List of learners.
+ * @param learners Collection of learners.
* @return A future.
*/
- CompletableFuture<Void> resetLearners(List<Peer> learners);
+ CompletableFuture<Void> resetLearners(Collection<Peer> learners);
/**
* Takes a state machine snapshot on a given group peer.
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index b806120105..b5156316fd 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -360,7 +360,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
}
@Override
- public CompletableFuture<Void> removeLearners(List<Peer> learners) {
+ public CompletableFuture<Void> removeLearners(Collection<Peer> learners) {
Peer leader = this.leader;
if (leader == null) {
@@ -378,7 +378,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
}
@Override
- public CompletableFuture<Void> resetLearners(List<Peer> learners) {
+ public CompletableFuture<Void> resetLearners(Collection<Peer> learners) {
Peer leader = this.leader;
if (leader == null) {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index fa44708e99..a107f23c40 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManag
import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import org.apache.ignite.internal.configuration.storage.ConfigurationStorageListener;
import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -127,14 +128,14 @@ public class ItDistributedConfigurationPropertiesTest {
raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl());
var clusterStateStorage = new TestClusterStateStorage();
- var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage);
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
raftManager,
clusterStateStorage,
- logicalTopologyService,
+ logicalTopology,
clusterManagementConfiguration
);
@@ -142,6 +143,7 @@ public class ItDistributedConfigurationPropertiesTest {
vaultManager,
clusterService,
cmgManager,
+ new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
raftManager,
new SimpleInMemoryKeyValueStorage(name())
);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 791df8a8cb..7e14edb256 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManag
import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -101,14 +102,14 @@ public class ItDistributedConfigurationStorageTest {
raftManager = new Loza(clusterService, raftConfiguration, workDir, new HybridClockImpl());
var clusterStateStorage = new TestClusterStateStorage();
- var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage);
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
raftManager,
clusterStateStorage,
- logicalTopologyService,
+ logicalTopology,
clusterManagementConfiguration
);
@@ -116,6 +117,7 @@ public class ItDistributedConfigurationStorageTest {
vaultManager,
clusterService,
cmgManager,
+ new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
raftManager,
new SimpleInMemoryKeyValueStorage(name())
);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 13dd06aa18..a809d22747 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManag
import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.NodeBootstrapConfiguration;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -642,14 +643,14 @@ public class ItRebalanceDistributedTest {
txManager = new TxManagerImpl(replicaSvc, lockManager, hybridClock);
var clusterStateStorage = new TestClusterStateStorage();
- var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage);
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
cmgManager = new ClusterManagementGroupManager(
vaultManager,
clusterService,
raftManager,
clusterStateStorage,
- logicalTopologyService,
+ logicalTopology,
clusterManagementConfiguration
);
@@ -659,6 +660,7 @@ public class ItRebalanceDistributedTest {
vaultManager,
clusterService,
cmgManager,
+ new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
raftManager,
testInfo.getTestMethod().get().isAnnotationPresent(UseRocksMetaStorage.class)
? new RocksDbKeyValueStorage(nodeName, resolveDir(dir, "metaStorage"))
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 6b5675902e..71e53c799e 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManag
import org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage;
import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl;
+import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.ConfigurationModule;
import org.apache.ignite.internal.configuration.ConfigurationModules;
@@ -269,14 +270,14 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
var clusterStateStorage = new RocksDbClusterStateStorage(dir.resolve("cmg"));
- var logicalTopologyService = new LogicalTopologyImpl(clusterStateStorage);
+ var logicalTopology = new LogicalTopologyImpl(clusterStateStorage);
var cmgManager = new ClusterManagementGroupManager(
vault,
clusterSvc,
raftMgr,
clusterStateStorage,
- logicalTopologyService,
+ logicalTopology,
clusterManagementConfiguration
);
@@ -284,6 +285,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
vault,
clusterSvc,
cmgManager,
+ new LogicalTopologyServiceImpl(logicalTopology, cmgManager),
raftMgr,
new RocksDbKeyValueStorage(name, dir.resolve("metastorage"))
);
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index c7f1b0247d..afe6311409 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -349,6 +349,7 @@ public class IgniteImpl implements Ignite {
vaultMgr,
clusterSvc,
cmgMgr,
+ logicalTopologyService,
raftMgr,
new RocksDbKeyValueStorage(name, workDir.resolve(METASTORAGE_DB_PATH))
);