You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/06/03 13:00:31 UTC
[ignite-3] branch main updated: IGNITE-17067 Send init response after CMG leader was elected (#840)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9e9761bc3 IGNITE-17067 Send init response after CMG leader was elected (#840)
9e9761bc3 is described below
commit 9e9761bc302cba01bd0eb46b293be83a6c99c564
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Fri Jun 3 16:00:27 2022 +0300
IGNITE-17067 Send init response after CMG leader was elected (#840)
---
.../cluster/management/ItClusterManagerTest.java | 50 +++++++++---
.../internal/cluster/management/MockNode.java | 4 +
.../management/ClusterManagementGroupManager.java | 91 ++++++++++------------
3 files changed, 85 insertions(+), 60 deletions(-)
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
index c4848d573..5d10d2103 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
@@ -101,9 +101,9 @@ public class ItClusterManagerTest {
void testInit(TestInfo testInfo) throws Exception {
startCluster(2, testInfo);
- String[] cmgNodes = { cluster.get(0).localMember().name() };
+ String[] cmgNodes = { cluster.get(0).name() };
- String[] metaStorageNodes = { cluster.get(1).localMember().name() };
+ String[] metaStorageNodes = { cluster.get(1).name() };
initCluster(metaStorageNodes, cmgNodes);
@@ -153,7 +153,7 @@ public class ItClusterManagerTest {
// complete initialization with one node to check that it finishes correctly
- String[] aliveNodes = {cluster.get(0).localMember().name()};
+ String[] aliveNodes = {cluster.get(0).name()};
initCluster(aliveNodes, aliveNodes);
@@ -169,9 +169,9 @@ public class ItClusterManagerTest {
void testNodeRestart(TestInfo testInfo) throws Exception {
startCluster(2, testInfo);
- String[] cmgNodes = {cluster.get(0).localMember().name()};
+ String[] cmgNodes = {cluster.get(0).name()};
- String[] metaStorageNodes = {cluster.get(1).localMember().name()};
+ String[] metaStorageNodes = {cluster.get(1).name()};
initCluster(metaStorageNodes, cmgNodes);
@@ -190,6 +190,33 @@ public class ItClusterManagerTest {
assertThat(cluster.get(1).clusterManager().logicalTopology(), will(containsInAnyOrder(expectedTopology)));
}
+ /**
+ * Tests executing the init command with incorrect node names.
+ */
+ @Test
+ void testInitInvalidNodes(TestInfo testInfo) throws Exception {
+ startCluster(2, testInfo);
+
+ ClusterManagementGroupManager clusterManager = cluster.get(0).clusterManager();
+
+ // non-existent node
+ assertThrowsWithCause(
+ () -> clusterManager.initCluster(List.of("wrong"), List.of(), "cluster"),
+ InitException.class,
+ "Node \"wrong\" is not present in the physical topology"
+ );
+
+ // successful init
+ clusterManager.initCluster(List.of(cluster.get(0).name()), List.of(), "cluster");
+
+ // different node
+ assertThrowsWithCause(
+ () -> clusterManager.initCluster(List.of(cluster.get(1).name()), List.of(), "cluster"),
+ InitException.class,
+ "Init CMG request denied, reason: CMG node names do not match."
+ );
+ }
+
/**
* Tests a scenario, when every node in a cluster gets restarted.
*/
@@ -198,11 +225,11 @@ public class ItClusterManagerTest {
startCluster(3, testInfo);
String[] cmgNodes = {
- cluster.get(0).localMember().name(),
- cluster.get(1).localMember().name()
+ cluster.get(0).name(),
+ cluster.get(1).name()
};
- String[] metaStorageNodes = { cluster.get(2).localMember().name() };
+ String[] metaStorageNodes = { cluster.get(2).name() };
initCluster(cmgNodes, metaStorageNodes);
@@ -255,7 +282,7 @@ public class ItClusterManagerTest {
void testNodeLeave(TestInfo testInfo) throws Exception {
startCluster(2, testInfo);
- String[] cmgNodes = { cluster.get(0).localMember().name() };
+ String[] cmgNodes = { cluster.get(0).name() };
initCluster(cmgNodes, cmgNodes);
@@ -279,7 +306,7 @@ public class ItClusterManagerTest {
// Start a cluster and initialize it
startCluster(2, testInfo);
- String[] cmgNodes = { cluster.get(0).localMember().name() };
+ String[] cmgNodes = { cluster.get(0).name() };
initCluster(cmgNodes, cmgNodes);
@@ -361,8 +388,7 @@ public class ItClusterManagerTest {
private String[] clusterNodeNames() {
return cluster.stream()
- .map(MockNode::localMember)
- .map(ClusterNode::name)
+ .map(MockNode::name)
.toArray(String[]::new);
}
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 4be44456e..81e8b1342 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -148,6 +148,10 @@ public class MockNode {
return clusterService.topologyService().localMember();
}
+ public String name() {
+ return localMember().name();
+ }
+
public ClusterManagementGroupManager clusterManager() {
return clusterManager;
}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 8ea630f1c..d825f1d6d 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -229,7 +229,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
return failedFuture(new IllegalStateException("Cluster state is empty"));
} else {
- return onLeaderElected(service, state);
+ return onLeaderElected(service, state).thenApply(v -> service);
}
});
})
@@ -267,54 +267,50 @@ public class ClusterManagementGroupManager implements IgniteComponent {
// Every node, that receives the init command, tries to initialize the CMG state. Raft listener will correctly
// handle this case by applying only the first attempt and returning the actual cluster state for all other
// attempts.
- raftService = raftService.thenCompose(service -> doInit(service, msg, addr, correlationId)
- .handle((state, e) -> {
- if (e != null) {
- // Init failed, error response has been delivered to the user.
- return completedFuture(service);
- }
-
- var localState = new LocalState(state.cmgNodes(), state.clusterTag());
-
- return localStateStorage.saveLocalState(localState)
- .thenCompose(v -> joinCluster(service, state.clusterTag()))
- .thenCompose(v -> service.isCurrentNodeLeader()
- .thenCompose(isLeader -> {
- if (isLeader) {
- return onLeaderElected(service, state);
- } else {
- return completedFuture(service);
- }
- }));
- })
- .thenCompose(Function.identity())
- );
+ raftService = raftService
+ .thenCompose(service -> doInit(service, msg)
+ .handle((v, e) -> {
+ NetworkMessage response;
+
+ if (e == null) {
+ LOG.info("CMG initialized successfully");
+
+ response = msgFactory.initCompleteMessage().build();
+ } else {
+ if (e instanceof CompletionException) {
+ e = e.getCause();
+ }
+
+ LOG.error("Error when initializing the CMG: {}", e, e.getMessage());
+
+ response = msgFactory.initErrorMessage()
+ .cause(e.getMessage())
+ .shouldCancel(!(e instanceof IllegalInitArgumentException))
+ .build();
+ }
+
+ clusterService.messagingService().respond(addr, response, correlationId);
+
+ return service;
+ }));
}
}
- private CompletableFuture<ClusterState> doInit(CmgRaftService service, CmgInitMessage msg, NetworkAddress addr, long correlationId) {
+ private CompletableFuture<Void> doInit(CmgRaftService service, CmgInitMessage msg) {
return service.initClusterState(clusterState(msg))
- .whenComplete((state, e) -> {
- NetworkMessage response;
-
- if (e == null) {
- LOG.info("CMG initialized successfully");
-
- response = msgFactory.initCompleteMessage().build();
- } else {
- if (e instanceof CompletionException) {
- e = e.getCause();
- }
-
- LOG.error("Error when initializing the CMG: {}", e, e.getMessage());
-
- response = msgFactory.initErrorMessage()
- .cause(e.getMessage())
- .shouldCancel(!(e instanceof IllegalInitArgumentException))
- .build();
- }
+ .thenCompose(state -> {
+ var localState = new LocalState(state.cmgNodes(), state.clusterTag());
- clusterService.messagingService().respond(addr, response, correlationId);
+ return localStateStorage.saveLocalState(localState)
+ .thenCompose(v -> joinCluster(service, state.clusterTag()))
+ .thenCompose(v -> service.isCurrentNodeLeader()
+ .thenCompose(isLeader -> {
+ if (isLeader) {
+ return onLeaderElected(service, state);
+ } else {
+ return completedFuture(null);
+ }
+ }));
});
}
@@ -334,12 +330,11 @@ public class ClusterManagementGroupManager implements IgniteComponent {
* <li>Broadcasts the current CMG state to all nodes in the physical topology.</li>
* </ol>
*/
- private CompletableFuture<CmgRaftService> onLeaderElected(CmgRaftService service, ClusterState state) {
+ private CompletableFuture<Void> onLeaderElected(CmgRaftService service, ClusterState state) {
LOG.info("CMG leader has been elected, executing onLeaderElected callback");
return updateLogicalTopology(service)
- .thenApply(v -> service)
- .whenComplete((s, e) -> {
+ .whenComplete((v, e) -> {
if (e == null) {
LOG.info("onLeaderElected callback executed successfully");
@@ -347,7 +342,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
TopologyService topologyService = clusterService.topologyService();
// TODO: remove listeners if leadership is lost, see https://issues.apache.org/jira/browse/IGNITE-16842
- topologyService.addEventHandler(cmgLeaderTopologyEventHandler(s));
+ topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
// Send the ClusterStateMessage to all members of the physical topology. We do not wait for the send operation
// to being unable to send ClusterState messages should not fail the CMG service startup.