You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/06/03 13:00:31 UTC

[ignite-3] branch main updated: IGNITE-17067 Send init response after CMG leader was elected (#840)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e9761bc3 IGNITE-17067 Send init response after CMG leader was elected (#840)
9e9761bc3 is described below

commit 9e9761bc302cba01bd0eb46b293be83a6c99c564
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Fri Jun 3 16:00:27 2022 +0300

    IGNITE-17067 Send init response after CMG leader was elected (#840)
---
 .../cluster/management/ItClusterManagerTest.java   | 50 +++++++++---
 .../internal/cluster/management/MockNode.java      |  4 +
 .../management/ClusterManagementGroupManager.java  | 91 ++++++++++------------
 3 files changed, 85 insertions(+), 60 deletions(-)

diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
index c4848d573..5d10d2103 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java
@@ -101,9 +101,9 @@ public class ItClusterManagerTest {
     void testInit(TestInfo testInfo) throws Exception {
         startCluster(2, testInfo);
 
-        String[] cmgNodes = { cluster.get(0).localMember().name() };
+        String[] cmgNodes = { cluster.get(0).name() };
 
-        String[] metaStorageNodes = { cluster.get(1).localMember().name() };
+        String[] metaStorageNodes = { cluster.get(1).name() };
 
         initCluster(metaStorageNodes, cmgNodes);
 
@@ -153,7 +153,7 @@ public class ItClusterManagerTest {
 
         // complete initialization with one node to check that it finishes correctly
 
-        String[] aliveNodes = {cluster.get(0).localMember().name()};
+        String[] aliveNodes = {cluster.get(0).name()};
 
         initCluster(aliveNodes, aliveNodes);
 
@@ -169,9 +169,9 @@ public class ItClusterManagerTest {
     void testNodeRestart(TestInfo testInfo) throws Exception {
         startCluster(2, testInfo);
 
-        String[] cmgNodes = {cluster.get(0).localMember().name()};
+        String[] cmgNodes = {cluster.get(0).name()};
 
-        String[] metaStorageNodes = {cluster.get(1).localMember().name()};
+        String[] metaStorageNodes = {cluster.get(1).name()};
 
         initCluster(metaStorageNodes, cmgNodes);
 
@@ -190,6 +190,33 @@ public class ItClusterManagerTest {
         assertThat(cluster.get(1).clusterManager().logicalTopology(), will(containsInAnyOrder(expectedTopology)));
     }
 
+    /**
+     * Tests executing the init command with incorrect node names.
+     */
+    @Test
+    void testInitInvalidNodes(TestInfo testInfo) throws Exception {
+        startCluster(2, testInfo);
+
+        ClusterManagementGroupManager clusterManager = cluster.get(0).clusterManager();
+
+        // non-existent node
+        assertThrowsWithCause(
+                () -> clusterManager.initCluster(List.of("wrong"), List.of(), "cluster"),
+                InitException.class,
+                "Node \"wrong\" is not present in the physical topology"
+        );
+
+        // successful init
+        clusterManager.initCluster(List.of(cluster.get(0).name()), List.of(), "cluster");
+
+        // different node
+        assertThrowsWithCause(
+                () -> clusterManager.initCluster(List.of(cluster.get(1).name()), List.of(), "cluster"),
+                InitException.class,
+                "Init CMG request denied, reason: CMG node names do not match."
+        );
+    }
+
     /**
      * Tests a scenario, when every node in a cluster gets restarted.
      */
@@ -198,11 +225,11 @@ public class ItClusterManagerTest {
         startCluster(3, testInfo);
 
         String[] cmgNodes = {
-                cluster.get(0).localMember().name(),
-                cluster.get(1).localMember().name()
+                cluster.get(0).name(),
+                cluster.get(1).name()
         };
 
-        String[] metaStorageNodes = { cluster.get(2).localMember().name() };
+        String[] metaStorageNodes = { cluster.get(2).name() };
 
         initCluster(cmgNodes, metaStorageNodes);
 
@@ -255,7 +282,7 @@ public class ItClusterManagerTest {
     void testNodeLeave(TestInfo testInfo) throws Exception {
         startCluster(2, testInfo);
 
-        String[] cmgNodes = { cluster.get(0).localMember().name() };
+        String[] cmgNodes = { cluster.get(0).name() };
 
         initCluster(cmgNodes, cmgNodes);
 
@@ -279,7 +306,7 @@ public class ItClusterManagerTest {
         // Start a cluster and initialize it
         startCluster(2, testInfo);
 
-        String[] cmgNodes = { cluster.get(0).localMember().name() };
+        String[] cmgNodes = { cluster.get(0).name() };
 
         initCluster(cmgNodes, cmgNodes);
 
@@ -361,8 +388,7 @@ public class ItClusterManagerTest {
 
     private String[] clusterNodeNames() {
         return cluster.stream()
-                .map(MockNode::localMember)
-                .map(ClusterNode::name)
+                .map(MockNode::name)
                 .toArray(String[]::new);
     }
 
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
index 4be44456e..81e8b1342 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/MockNode.java
@@ -148,6 +148,10 @@ public class MockNode {
         return clusterService.topologyService().localMember();
     }
 
+    public String name() {
+        return localMember().name();
+    }
+
     public ClusterManagementGroupManager clusterManager() {
         return clusterManager;
     }
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index 8ea630f1c..d825f1d6d 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -229,7 +229,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
 
                                             return failedFuture(new IllegalStateException("Cluster state is empty"));
                                         } else {
-                                            return onLeaderElected(service, state);
+                                            return onLeaderElected(service, state).thenApply(v -> service);
                                         }
                                     });
                         })
@@ -267,54 +267,50 @@ public class ClusterManagementGroupManager implements IgniteComponent {
             // Every node, that receives the init command, tries to initialize the CMG state. Raft listener will correctly
             // handle this case by applying only the first attempt and returning the actual cluster state for all other
             // attempts.
-            raftService = raftService.thenCompose(service -> doInit(service, msg, addr, correlationId)
-                    .handle((state, e) -> {
-                        if (e != null) {
-                            // Init failed, error response has been delivered to the user.
-                            return completedFuture(service);
-                        }
-
-                        var localState = new LocalState(state.cmgNodes(), state.clusterTag());
-
-                        return localStateStorage.saveLocalState(localState)
-                                .thenCompose(v -> joinCluster(service, state.clusterTag()))
-                                .thenCompose(v -> service.isCurrentNodeLeader()
-                                        .thenCompose(isLeader -> {
-                                            if (isLeader) {
-                                                return onLeaderElected(service, state);
-                                            } else {
-                                                return completedFuture(service);
-                                            }
-                                        }));
-                    })
-                    .thenCompose(Function.identity())
-            );
+            raftService = raftService
+                    .thenCompose(service -> doInit(service, msg)
+                            .handle((v, e) -> {
+                                NetworkMessage response;
+
+                                if (e == null) {
+                                    LOG.info("CMG initialized successfully");
+
+                                    response = msgFactory.initCompleteMessage().build();
+                                } else {
+                                    if (e instanceof CompletionException) {
+                                        e = e.getCause();
+                                    }
+
+                                    LOG.error("Error when initializing the CMG: {}", e, e.getMessage());
+
+                                    response = msgFactory.initErrorMessage()
+                                            .cause(e.getMessage())
+                                            .shouldCancel(!(e instanceof IllegalInitArgumentException))
+                                            .build();
+                                }
+
+                                clusterService.messagingService().respond(addr, response, correlationId);
+
+                                return service;
+                            }));
         }
     }
 
-    private CompletableFuture<ClusterState> doInit(CmgRaftService service, CmgInitMessage msg, NetworkAddress addr, long correlationId) {
+    private CompletableFuture<Void> doInit(CmgRaftService service, CmgInitMessage msg) {
         return service.initClusterState(clusterState(msg))
-                .whenComplete((state, e) -> {
-                    NetworkMessage response;
-
-                    if (e == null) {
-                        LOG.info("CMG initialized successfully");
-
-                        response = msgFactory.initCompleteMessage().build();
-                    } else {
-                        if (e instanceof CompletionException) {
-                            e = e.getCause();
-                        }
-
-                        LOG.error("Error when initializing the CMG: {}", e, e.getMessage());
-
-                        response = msgFactory.initErrorMessage()
-                                .cause(e.getMessage())
-                                .shouldCancel(!(e instanceof IllegalInitArgumentException))
-                                .build();
-                    }
+                .thenCompose(state -> {
+                    var localState = new LocalState(state.cmgNodes(), state.clusterTag());
 
-                    clusterService.messagingService().respond(addr, response, correlationId);
+                    return localStateStorage.saveLocalState(localState)
+                            .thenCompose(v -> joinCluster(service, state.clusterTag()))
+                            .thenCompose(v -> service.isCurrentNodeLeader()
+                                    .thenCompose(isLeader -> {
+                                        if (isLeader) {
+                                            return onLeaderElected(service, state);
+                                        } else {
+                                            return completedFuture(null);
+                                        }
+                                    }));
                 });
     }
 
@@ -334,12 +330,11 @@ public class ClusterManagementGroupManager implements IgniteComponent {
      *     <li>Broadcasts the current CMG state to all nodes in the physical topology.</li>
      * </ol>
      */
-    private CompletableFuture<CmgRaftService> onLeaderElected(CmgRaftService service, ClusterState state) {
+    private CompletableFuture<Void> onLeaderElected(CmgRaftService service, ClusterState state) {
         LOG.info("CMG leader has been elected, executing onLeaderElected callback");
 
         return updateLogicalTopology(service)
-                .thenApply(v -> service)
-                .whenComplete((s, e) -> {
+                .whenComplete((v, e) -> {
                     if (e == null) {
                         LOG.info("onLeaderElected callback executed successfully");
 
@@ -347,7 +342,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
                         TopologyService topologyService = clusterService.topologyService();
 
                         // TODO: remove listeners if leadership is lost, see https://issues.apache.org/jira/browse/IGNITE-16842
-                        topologyService.addEventHandler(cmgLeaderTopologyEventHandler(s));
+                        topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
 
                         // Send the ClusterStateMessage to all members of the physical topology. We do not wait for the send operation
                         // to being unable to send ClusterState messages should not fail the CMG service startup.