You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/02/19 19:46:02 UTC

[GitHub] [ignite-3] sanpwc commented on a change in pull request #659: IGNITE-16528 Implement init command handling

sanpwc commented on a change in pull request #659:
URL: https://github.com/apache/ignite-3/pull/659#discussion_r810206420



##########
File path: examples/src/integrationTest/java/org/apache/ignite/example/AbstractExamplesTest.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.example;
+
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Base class for creating tests for examples.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public abstract class AbstractExamplesTest {

Review comment:
       There's a mess with our test hierarchy. There are IgniteAbstractTest and BaseIgniteAbstractTest with some test logging included. Did you consider extending given classes? 

##########
File path: examples/src/integrationTest/java/org/apache/ignite/example/sql/jdbc/ItSqlExamplesTest.java
##########
@@ -62,39 +49,4 @@ public void testSqlJdbcExample() throws Exception {
                         + "    Richard, Miles, St. Petersburg\n"
         );
     }
-
-    /**
-     * Start node.
-     *
-     * @param workDir Work directory for the started node. Must not be {@code null}.
-     */
-    @BeforeEach
-    public void startNode(@WorkDirectory Path workDir) throws IOException {
-        IgnitionManager.start(
-                "my-first-node",
-                Files.readString(Path.of("config", "ignite-config.json")),
-                workDir
-        );
-    }
-
-    /**
-     * Stop node.
-     */
-    @AfterEach
-    public void stopNode() {
-        IgnitionManager.stop("my-first-node");
-    }
-
-    /**
-     * Removes a work directory created by {@link SqlExamplesTest}.
-     */
-    @BeforeEach
-    @AfterEach
-    public void removeWorkDir() {

Review comment:
       Could you please explain why we don't need such method? Is there any built-in cleanup logic within WorkDirectoryExtension?

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/rest/InitCommandHandler.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join.rest;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.apache.ignite.internal.join.ClusterInitializer;
+import org.apache.ignite.internal.join.Leaders;
+import org.apache.ignite.internal.rest.ErrorResult;
+import org.apache.ignite.internal.rest.netty.RestApiHttpRequest;
+import org.apache.ignite.internal.rest.netty.RestApiHttpResponse;
+import org.apache.ignite.internal.rest.routes.RequestHandler;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * REST handler for the {@link InitCommand}.
+ */
+public class InitCommandHandler implements RequestHandler {
+    private static final IgniteLogger log = IgniteLogger.forClass(InitCommandHandler.class);
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final ClusterInitializer clusterInitializer;
+
+    public InitCommandHandler(ClusterInitializer clusterInitializer) {
+        this.clusterInitializer = clusterInitializer;
+    }
+
+    @Override
+    public CompletableFuture<RestApiHttpResponse> handle(RestApiHttpRequest request, RestApiHttpResponse response) {
+        try {
+            InitCommand command = readContent(request);
+
+            if (log.isInfoEnabled()) {
+                log.info(
+                        "Received init command:\n\tMeta Storage nodes: {}\n\tCMG nodes: {}",
+                        command.metaStorageNodes(),
+                        command.cmgNodes()
+                );
+            }
+
+            return clusterInitializer.initCluster(command.metaStorageNodes(), command.cmgNodes())
+                    .thenApply(leaders -> successResponse(response, leaders))

Review comment:
       What's the point of returning leaders? I believe that we shouldn't expose such details of realization that imposes that we use raft. We might easily move to concept of leader -> leaseholder like cockroach does or use leader-less replication framework in addition to raft.

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
##########
@@ -409,27 +417,55 @@ public ConfigurationRegistry clusterConfiguration() {
     }
 
     /**
-     * Returns client handler module.
+     * Returns the id of the current node.
      */
-    public ClientHandlerModule clientHandlerModule() {
-        return clientHandlerModule;
+    public String id() {
+        return clusterSvc.topologyService().localMember().id();
     }
 
     /**
-     * Returns the id of the current node.
+     * Returns the local address of REST endpoints.
      */
-    public String id() {
-        return clusterSvc.topologyService().localMember().id();
+    public NetworkAddress restAddress() {
+        return NetworkAddress.from(restModule.localAddress());
+    }
+
+    /**
+     * Returns the local address of the Thin Client.
+     */
+    public NetworkAddress clientAddress() {

Review comment:
       - clientAddress() could throw IgniteInternalException - please add corresponding @throws clause
   - What's the proper way for client to retrieve rest/client and other ports, especially if they are specified within portRange? From my point of view we might add [rest/client/etc]Adresses to Ignite.localNode() **public** api and in that case we should wrap IgniteInternalExcpetion with proper public one.

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/messages/MetastorageInitMessage.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join.messages;
+
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Message for initializing the Meta Storage.
+ */
+@Transferable(InitMessageGroup.METASTORAGE_INIT)
+public interface MetastorageInitMessage extends NetworkMessage {

Review comment:
       Well, I believe that we don't need MetastorageInitMessage for this purpose. 
   I rather prefer following flow:
   onCMGReady cmgLeader send clusterStateMessage to every node that it sees in topology. This message besides other properties contains metaStorageNodes that are used to start metastorage raft group.
   ```
   register onLeaderElected callback:
       onLeaderElected(() -> {
           ClusterState :: CmgGroup.run(
               new adjustClusterStateCmd(
                   vaultSvc.get("CMGPeers"),
                   vaultSvc.get("MSPeers")
               )
           );
           within clusterState
               [stopSeld || trigger CMG Leader actions] :: fullValidation();
                   register clusterSvc.onAppeared(node ->
   clusterSvc.msgSvc.invoke(
   node.id, clusterState
   )
   );
                   clusterSvc.topologySvc.forEach(node ->
   clusterSvc.msgSvc.invoke(
   node.id, clusterState
   )
   );
       });
   
   ``` 

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/Utils.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Various utility methods.
+ */
+class Utils {
+    /**
+     * Resolves given node names (consistent IDs) into {@link ClusterNode}s.
+     *
+     * @param nodeNames node names.
+     * @return list of resolved {@code ClusterNode}s.
+     * @throws InitException if any of the given nodes are not present in the physical topology.
+     */
+    static List<ClusterNode> resolveNodes(ClusterService clusterService, Collection<String> nodeNames) {

Review comment:
       I'd rather move given method to some general utils or cluster service utils in order to use it in MetastorageManager.
   ```
       private CompletableFuture<String> initializeMetaStorage(ClusterService clusterService, MetastorageInitMessage initMsg)
               throws NodeStoppingException {
           TopologyService topologyService = clusterService.topologyService();
   
           List<ClusterNode> metastorageNodes = Arrays.stream(initMsg.metastorageNodes())
                   .map(nodeId -> {
                       ClusterNode node = topologyService.getByConsistentId(nodeId);
   
                       if (node == null) {
                           throw new InitException("Node " + nodeId + " is not present in the physical topology");
                       }
   
                       return node;
                   })
                   .collect(Collectors.toList());
   ```

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
##########
@@ -409,27 +417,55 @@ public ConfigurationRegistry clusterConfiguration() {
     }
 
     /**
-     * Returns client handler module.
+     * Returns the id of the current node.
      */
-    public ClientHandlerModule clientHandlerModule() {
-        return clientHandlerModule;
+    public String id() {
+        return clusterSvc.topologyService().localMember().id();
     }
 
     /**
-     * Returns the id of the current node.
+     * Returns the local address of REST endpoints.
      */
-    public String id() {
-        return clusterSvc.topologyService().localMember().id();
+    public NetworkAddress restAddress() {

Review comment:
       Seems that we need better solution here.
   
   First of all we definitely need to distinguish cluster and node local attributes and methods, meaning that tables(), transactions() are cluster wide and name(), restAddress(), etc are node local. I believe that we should introduce an abstraction of Ignite.localNode() with corresponding name(), restAddress() and similar methods.

##########
File path: examples/src/integrationTest/java/org/apache/ignite/example/AbstractExamplesTest.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.example;
+
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Base class for creating tests for examples.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public abstract class AbstractExamplesTest {
+    private static final String TEST_NODE_NAME = "ignite-node";
+
+    /**
+     * Starts a node.
+     *
+     * @param workDir Work directory for the started node. Must not be {@code null}.
+     */
+    @BeforeEach
+    public void startNode(@WorkDirectory Path workDir) throws Exception {
+        IgniteImpl ignite = (IgniteImpl) IgnitionManager.start(
+                TEST_NODE_NAME,
+                Path.of("config", "ignite-config.json"),
+                workDir,
+                null
+        );
+
+        ignite.init(List.of(ignite.name()));

Review comment:
       I believe that we should also adjust examples in order to reflect that init phase was added.

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/ClusterInitializer.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join;
+
+import static org.apache.ignite.internal.join.Utils.resolveNodes;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.join.messages.CancelInitMessage;
+import org.apache.ignite.internal.join.messages.CmgInitMessage;
+import org.apache.ignite.internal.join.messages.InitErrorMessage;
+import org.apache.ignite.internal.join.messages.InitMessagesFactory;
+import org.apache.ignite.internal.join.messages.LeaderElectedMessage;
+import org.apache.ignite.internal.join.messages.MetastorageInitMessage;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkMessage;
+
+/**
+ * Class for performing cluster initialization.
+ */
+public class ClusterInitializer {
+    private static final IgniteLogger log = IgniteLogger.forClass(ClusterInitializer.class);
+
+    private final ClusterService clusterService;
+
+    /** Constructor. */
+    ClusterInitializer(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    /**
+     * Initializes the cluster that this node is present in.
+     *
+     * @param metaStorageNodeNames names of nodes that will host the Meta Storage. Cannot be empty.
+     * @param cmgNodeNames names of nodes that will host the Cluster Management Group. Can be empty,
+     *      in which case {@code metaStorageNodeNames} will be used instead.
+     * @return future that resolves into leader node IDs if completed successfully.
+     */
+    public CompletableFuture<Leaders> initCluster(Collection<String> metaStorageNodeNames, Collection<String> cmgNodeNames) {
+        try {
+            if (metaStorageNodeNames.isEmpty()) {
+                throw new IllegalArgumentException("List of metastorage nodes must no be empty");
+            }
+
+            cmgNodeNames = cmgNodeNames.isEmpty() ? metaStorageNodeNames : cmgNodeNames;
+
+            @SuppressWarnings("unused")
+            List<ClusterNode> metastorageNodes = resolveNodes(clusterService, metaStorageNodeNames);
+
+            @SuppressWarnings("unused")
+            List<ClusterNode> cmgNodes = resolveNodes(clusterService, cmgNodeNames);
+
+            // TODO: init message should only be sent to metastorageNodes and cmgNodes respectively,
+            //  https://issues.apache.org/jira/browse/IGNITE-16471
+            Collection<ClusterNode> allMembers = clusterService.topologyService().allMembers();
+
+            var msgFactory = new InitMessagesFactory();
+
+            MetastorageInitMessage metaStorageInitMessage = msgFactory.metastorageInitMessage()
+                    .metastorageNodes(metaStorageNodeNames.toArray(String[]::new))
+                    .build();
+
+            CompletableFuture<String> metaStorageInitFuture = invokeMessage(allMembers, metaStorageInitMessage);
+
+            CmgInitMessage cmgInitMessage = msgFactory.cmgInitMessage()
+                    .cmgNodes(cmgNodeNames.toArray(String[]::new))
+                    .build();
+
+            CompletableFuture<String> cmgInitFuture = invokeMessage(allMembers, cmgInitMessage);
+
+            return metaStorageInitFuture

Review comment:
       I believe that concept of clusterStateMessage-based metastorage init is more robust and simple. Please see my comment in MetastorageInitMessage.

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/ClusterManagementGroupManager.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join;
+
+import static org.apache.ignite.internal.join.Utils.resolveNodes;
+
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpMethod;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.join.messages.CancelInitMessage;
+import org.apache.ignite.internal.join.messages.CmgInitMessage;
+import org.apache.ignite.internal.join.messages.InitMessageGroup;
+import org.apache.ignite.internal.join.messages.InitMessagesFactory;
+import org.apache.ignite.internal.join.rest.InitCommandHandler;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.rest.RestModule;
+import org.apache.ignite.internal.rest.routes.Route;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.raft.client.Peer;
+
+/**
+ * Ignite component responsible for cluster initialization and managing the Cluster Management Raft Group.
+ */
+public class ClusterManagementGroupManager implements IgniteComponent {
+    private static final IgniteLogger log = IgniteLogger.forClass(ClusterManagementGroupManager.class);
+
+    /** CMG Raft group name. */
+    private static final String CMG_RAFT_GROUP_NAME = "cmg_raft_group";
+
+    /** Init REST endpoint path. */
+    private static final String REST_ENDPOINT = "/management/v1/cluster/init";
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    private final ClusterService clusterService;
+
+    private final Loza raftManager;
+
+    private final RestModule restModule;
+
+    /** Handles cluster initialization flow. */
+    private final ClusterInitializer clusterInitializer;
+
+    /** Constructor. */
+    public ClusterManagementGroupManager(ClusterService clusterService, Loza raftManager, RestModule restModule) {
+        this.clusterService = clusterService;
+        this.raftManager = raftManager;
+        this.restModule = restModule;
+        this.clusterInitializer = new ClusterInitializer(clusterService);
+
+        MessagingService messagingService = clusterService.messagingService();
+
+        var msgFactory = new InitMessagesFactory();
+
+        messagingService.addMessageHandler(InitMessageGroup.class, (msg, addr, correlationId) -> {
+            if (!busyLock.enterBusy()) {
+                if (correlationId != null) {
+                    messagingService.respond(addr, errorResponse(msgFactory, new NodeStoppingException()), correlationId);
+                }
+
+                return;
+            }
+
+            try {
+                if (msg instanceof CancelInitMessage) {
+                    log.info("CMG initialization cancelled, reason: " + ((CancelInitMessage) msg).reason());
+
+                    raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+
+                    // TODO: drop the Raft storage as well, https://issues.apache.org/jira/browse/IGNITE-16471
+                } else if (msg instanceof CmgInitMessage) {
+                    assert correlationId != null;
+
+                    String[] nodeIds = ((CmgInitMessage) msg).cmgNodes();
+
+                    List<ClusterNode> nodes = resolveNodes(clusterService, Arrays.asList(nodeIds));
+
+                    raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, CmgRaftGroupListener::new)

Review comment:
       We should also populate Vault with both cmgNodes and metaStorageNodes on init message in order to reuse given meta on cluster restart.
   ```
   if(!vault.cmgPeers.isEmpty) {
           loza.prepareRaftGroup(vault.cmgPeers);
   } else {
       restMgr.registerCmdHndl(
               initMsg,
               (initParams) ->{
                   vaultSvc.putAll(
   Map.of(
                           "CMGPeers", initMsg.cmgPeers,
                            "MSPeers", initMsg.initPeers)
                       );
                   loza.prepareRaftGroup(initParams.cmgPeers)
               }
       );
   }
   
   ```

##########
File path: modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
##########
@@ -122,174 +115,233 @@
 
     /**
      * Future which will be completed with {@link IgniteUuid}, when aggregated watch will be successfully deployed. Can be resolved to
-     * {@link Optional#empty()} if no watch deployed at the moment.
-     */
-    private CompletableFuture<Optional<IgniteUuid>> deployFut = new CompletableFuture<>();
-
-    /**
-     * If true - all new watches will be deployed immediately.
+     * {@code null} if no watch deployed at the moment.
      *
-     * <p>If false - all new watches will be aggregated to one batch for further deploy by {@link MetaStorageManager#deployWatches()}
+     * <p>Multithreaded access is guarded by {@code this}.
      */
-    private boolean deployed;
-
-    /** Flag indicates if meta storage nodes were set on start. */
-    private boolean metaStorageNodesOnStart;
+    private CompletableFuture<IgniteUuid> deployFut = new CompletableFuture<>();
 
     /** Actual storage for the Metastorage. */
     private final KeyValueStorage storage;
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-    /** Prevents double stopping the component. */
-    AtomicBoolean stopGuard = new AtomicBoolean();
+    /**
+     * If true - all new watches will be deployed immediately.
+     *
+     * <p>If false - all new watches will be aggregated to one batch for further deploy by {@link MetaStorageManager#deployWatches()}.
+     *
+     * <p>Multithreaded access is guarded by {@code this}.
+     */
+    private boolean areWatchesDeployed = false;
+
+    /**
+     * Flag that indicates that the Meta Storage has been initialized.
+     */
+    private final AtomicBoolean isInitialized = new AtomicBoolean();
+
+    /**
+     * Prevents double stopping the component.
+     *
+     * <p>Multithreaded access is guarded by {@code this}.
+     */
+    private boolean isStopped = false;
 
     /**
      * The constructor.
      *
-     * @param vaultMgr      Vault manager.
-     * @param locCfgMgr     Local configuration manager.
-     * @param clusterNetSvc Cluster network service.
-     * @param raftMgr       Raft manager.
-     * @param storage       Storage. This component owns this resource and will manage its lifecycle.
+     * @param vaultMgr Vault manager.
+     * @param clusterService Cluster network service.
+     * @param raftMgr Raft manager.
+     * @param storage Storage. This component owns this resource and will manage its lifecycle.
      */
     public MetaStorageManager(
             VaultManager vaultMgr,
-            ConfigurationManager locCfgMgr,
-            ClusterService clusterNetSvc,
+            ClusterService clusterService,
             Loza raftMgr,
             KeyValueStorage storage
     ) {
         this.vaultMgr = vaultMgr;
-        this.locCfgMgr = locCfgMgr;
-        this.clusterNetSvc = clusterNetSvc;
         this.raftMgr = raftMgr;
         this.storage = storage;
-    }
 
-    /** {@inheritDoc} */
-    @Override
-    public void start() {
-        String[] metastorageNodes = this.locCfgMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
-                .metastorageNodes().value();
+        MessagingService messagingService = clusterService.messagingService();
 
-        Predicate<ClusterNode> metaStorageNodesContainsLocPred =
-                clusterNode -> Arrays.asList(metastorageNodes).contains(clusterNode.name());
+        var msgFactory = new InitMessagesFactory();
 
-        if (metastorageNodes.length > 0) {
-            metaStorageNodesOnStart = true;
-
-            List<ClusterNode> metaStorageMembers = clusterNetSvc.topologyService().allMembers().stream()
-                    .filter(metaStorageNodesContainsLocPred)
-                    .collect(Collectors.toList());
+        messagingService.addMessageHandler(InitMessageGroup.class, (msg, addr, correlationId) -> {
+            if (!busyLock.enterBusy()) {
+                if (correlationId != null) {
+                    messagingService.respond(addr, errorResponse(msgFactory, new NodeStoppingException()), correlationId);
+                }
 
-            // TODO: This is temporary solution for providing human-readable error when you try to start single-node cluster
-            // without hosting metastorage, this will be rewritten in init phase https://issues.apache.org/jira/browse/IGNITE-15114
-            if (metaStorageMembers.isEmpty()) {
-                throw new IgniteException(
-                        "Cannot start meta storage manager because there is no node in the cluster that hosts meta storage.");
+                return;
             }
 
-            // TODO: This is temporary solution. We need to prohibit starting several metastorage nodes
-            // as far as we do not have mechanism of changing raft peers when new metastorage node is joining to cluster.
-            // This will be rewritten in init phase https://issues.apache.org/jira/browse/IGNITE-15114
-            if (metastorageNodes.length > 1) {
-                throw new IgniteException(
-                        "Cannot start meta storage manager because it is not allowed to start several metastorage nodes.");
-            }
+            try {
+                if (msg instanceof CancelInitMessage) {

Review comment:
       It's not needed here either ;)

##########
File path: modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
##########
@@ -122,174 +115,233 @@
 
     /**
      * Future which will be completed with {@link IgniteUuid}, when aggregated watch will be successfully deployed. Can be resolved to
-     * {@link Optional#empty()} if no watch deployed at the moment.
-     */
-    private CompletableFuture<Optional<IgniteUuid>> deployFut = new CompletableFuture<>();
-
-    /**
-     * If true - all new watches will be deployed immediately.
+     * {@code null} if no watch deployed at the moment.
      *
-     * <p>If false - all new watches will be aggregated to one batch for further deploy by {@link MetaStorageManager#deployWatches()}
+     * <p>Multithreaded access is guarded by {@code this}.
      */
-    private boolean deployed;
-
-    /** Flag indicates if meta storage nodes were set on start. */
-    private boolean metaStorageNodesOnStart;
+    private CompletableFuture<IgniteUuid> deployFut = new CompletableFuture<>();
 
     /** Actual storage for the Metastorage. */
     private final KeyValueStorage storage;
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-    /** Prevents double stopping the component. */
-    AtomicBoolean stopGuard = new AtomicBoolean();
+    /**
+     * If true - all new watches will be deployed immediately.
+     *
+     * <p>If false - all new watches will be aggregated to one batch for further deploy by {@link MetaStorageManager#deployWatches()}.
+     *
+     * <p>Multithreaded access is guarded by {@code this}.
+     */
+    private boolean areWatchesDeployed = false;
+
+    /**
+     * Flag that indicates that the Meta Storage has been initialized.
+     */
+    private final AtomicBoolean isInitialized = new AtomicBoolean();
+
+    /**
+     * Prevents double stopping the component.
+     *
+     * <p>Multithreaded access is guarded by {@code this}.
+     */
+    private boolean isStopped = false;
 
     /**
      * The constructor.
      *
-     * @param vaultMgr      Vault manager.
-     * @param locCfgMgr     Local configuration manager.
-     * @param clusterNetSvc Cluster network service.
-     * @param raftMgr       Raft manager.
-     * @param storage       Storage. This component owns this resource and will manage its lifecycle.
+     * @param vaultMgr Vault manager.
+     * @param clusterService Cluster network service.
+     * @param raftMgr Raft manager.
+     * @param storage Storage. This component owns this resource and will manage its lifecycle.
      */
     public MetaStorageManager(
             VaultManager vaultMgr,
-            ConfigurationManager locCfgMgr,
-            ClusterService clusterNetSvc,
+            ClusterService clusterService,
             Loza raftMgr,
             KeyValueStorage storage
     ) {
         this.vaultMgr = vaultMgr;
-        this.locCfgMgr = locCfgMgr;
-        this.clusterNetSvc = clusterNetSvc;
         this.raftMgr = raftMgr;
         this.storage = storage;
-    }
 
-    /** {@inheritDoc} */
-    @Override
-    public void start() {
-        String[] metastorageNodes = this.locCfgMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
-                .metastorageNodes().value();
+        MessagingService messagingService = clusterService.messagingService();
 
-        Predicate<ClusterNode> metaStorageNodesContainsLocPred =
-                clusterNode -> Arrays.asList(metastorageNodes).contains(clusterNode.name());
+        var msgFactory = new InitMessagesFactory();
 
-        if (metastorageNodes.length > 0) {
-            metaStorageNodesOnStart = true;
-
-            List<ClusterNode> metaStorageMembers = clusterNetSvc.topologyService().allMembers().stream()
-                    .filter(metaStorageNodesContainsLocPred)
-                    .collect(Collectors.toList());
+        messagingService.addMessageHandler(InitMessageGroup.class, (msg, addr, correlationId) -> {

Review comment:
       don't need it here, yep?

##########
File path: modules/join/pom.xml
##########
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent/pom.xml</relativePath>
+    </parent>
+
+    <artifactId>ignite-join</artifactId>

Review comment:
       I agree with adding new module, but it's definitely not only "join" because it will also handle logical topology, metastorage configuration and adjustment,  probably rolling upgrade, etc. So I'd rather use "ignite-cluster-management" or similar here.
   
   Please also adjust package with corresponding name.

##########
File path: examples/src/integrationTest/java/org/apache/ignite/example/sql/jdbc/ItSqlExamplesTest.java
##########
@@ -17,35 +17,22 @@
 
 package org.apache.ignite.example.sql.jdbc;
 
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.example.AbstractExamplesTest;
 import org.apache.ignite.example.ExampleTestUtils;
-import org.apache.ignite.internal.testframework.WorkDirectory;
-import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
-import org.apache.ignite.internal.util.IgniteUtils;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
  * These tests check that all SQL JDBC examples pass correctly.
  */
-@ExtendWith(WorkDirectoryExtension.class)
-public class SqlExamplesTest {
-    /** Empty argument to invoke an example. */
-    protected static final String[] EMPTY_ARGS = new String[0];
-
+public class ItSqlExamplesTest extends AbstractExamplesTest {
     /**
      * Runs SqlJdbcExample and checks its output.
      *
      * @throws Exception If failed.
      */
     @Test
     public void testSqlJdbcExample() throws Exception {
-        ExampleTestUtils.assertConsoleOutputContains(SqlJdbcExample::main, EMPTY_ARGS,

Review comment:
       Here and below. EMPTY_ARGS are meaningful. I'd rather add such constant to AbstractExamplesTest in order to use wherever needed.

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/rest/InitCommandHandler.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join.rest;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.apache.ignite.internal.join.ClusterInitializer;
+import org.apache.ignite.internal.join.Leaders;
+import org.apache.ignite.internal.rest.ErrorResult;
+import org.apache.ignite.internal.rest.netty.RestApiHttpRequest;
+import org.apache.ignite.internal.rest.netty.RestApiHttpResponse;
+import org.apache.ignite.internal.rest.routes.RequestHandler;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * REST handler for the {@link InitCommand}.
+ */
+public class InitCommandHandler implements RequestHandler {
+    private static final IgniteLogger log = IgniteLogger.forClass(InitCommandHandler.class);
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final ClusterInitializer clusterInitializer;
+
+    public InitCommandHandler(ClusterInitializer clusterInitializer) {
+        this.clusterInitializer = clusterInitializer;
+    }
+
+    @Override
+    public CompletableFuture<RestApiHttpResponse> handle(RestApiHttpRequest request, RestApiHttpResponse response) {
+        try {
+            InitCommand command = readContent(request);
+
+            if (log.isInfoEnabled()) {
+                log.info(
+                        "Received init command:\n\tMeta Storage nodes: {}\n\tCMG nodes: {}",
+                        command.metaStorageNodes(),
+                        command.cmgNodes()
+                );
+            }
+
+            return clusterInitializer.initCluster(command.metaStorageNodes(), command.cmgNodes())
+                    .thenApply(leaders -> successResponse(response, leaders))
+                    .exceptionally(e -> errorResponse(response, e));
+        } catch (Exception e) {
+            return CompletableFuture.completedFuture(errorResponse(response, e));
+        }
+    }
+
+    private InitCommand readContent(RestApiHttpRequest restApiHttpRequest) throws IOException {
+        ByteBuf content = restApiHttpRequest.request().content();
+
+        try (InputStream is = new ByteBufInputStream(content)) {
+            return objectMapper.readValue(is, InitCommand.class);
+        }
+    }
+
+    private static RestApiHttpResponse successResponse(RestApiHttpResponse response, Leaders leaders) {
+        if (log.isInfoEnabled()) {
+            log.info(
+                    "Init command executed successfully.\n\tMeta Storage leader ID: {}\n\tCMG leader ID: {}",
+                    leaders.metaStorageLeader(),
+                    leaders.cmgLeader()
+            );
+        }
+
+        response.json(Map.of(
+                "metaStorageLeaderId", leaders.metaStorageLeader(),
+                "cmgLeaderId", leaders.cmgLeader()
+        ));
+
+        return response;
+    }
+
+    private static RestApiHttpResponse errorResponse(RestApiHttpResponse response, Throwable e) {
+        if (e instanceof CompletionException) {
+            e = e.getCause();

Review comment:
       What if e.getCause() is IgniteInternalException? Is it possible? If true, it'll be wrapped with Ignite(Public)Exception, isn't it?

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/ClusterManagementGroupManager.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join;
+
+import static org.apache.ignite.internal.join.Utils.resolveNodes;
+
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpMethod;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.join.messages.CancelInitMessage;
+import org.apache.ignite.internal.join.messages.CmgInitMessage;
+import org.apache.ignite.internal.join.messages.InitMessageGroup;
+import org.apache.ignite.internal.join.messages.InitMessagesFactory;
+import org.apache.ignite.internal.join.rest.InitCommandHandler;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.rest.RestModule;
+import org.apache.ignite.internal.rest.routes.Route;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.raft.client.Peer;
+
+/**
+ * Ignite component responsible for cluster initialization and managing the Cluster Management Raft Group.
+ */
+public class ClusterManagementGroupManager implements IgniteComponent {
+    private static final IgniteLogger log = IgniteLogger.forClass(ClusterManagementGroupManager.class);
+
+    /** CMG Raft group name. */
+    private static final String CMG_RAFT_GROUP_NAME = "cmg_raft_group";
+
+    /** Init REST endpoint path. */
+    private static final String REST_ENDPOINT = "/management/v1/cluster/init";
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    private final ClusterService clusterService;
+
+    private final Loza raftManager;
+
+    private final RestModule restModule;
+
+    /** Handles cluster initialization flow. */
+    private final ClusterInitializer clusterInitializer;
+
+    /** Constructor. */
+    public ClusterManagementGroupManager(ClusterService clusterService, Loza raftManager, RestModule restModule) {
+        this.clusterService = clusterService;
+        this.raftManager = raftManager;
+        this.restModule = restModule;
+        this.clusterInitializer = new ClusterInitializer(clusterService);
+
+        MessagingService messagingService = clusterService.messagingService();
+
+        var msgFactory = new InitMessagesFactory();
+
+        messagingService.addMessageHandler(InitMessageGroup.class, (msg, addr, correlationId) -> {
+            if (!busyLock.enterBusy()) {
+                if (correlationId != null) {
+                    messagingService.respond(addr, errorResponse(msgFactory, new NodeStoppingException()), correlationId);
+                }
+
+                return;
+            }
+
+            try {
+                if (msg instanceof CancelInitMessage) {
+                    log.info("CMG initialization cancelled, reason: " + ((CancelInitMessage) msg).reason());
+
+                    raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+
+                    // TODO: drop the Raft storage as well, https://issues.apache.org/jira/browse/IGNITE-16471
+                } else if (msg instanceof CmgInitMessage) {
+                    assert correlationId != null;
+
+                    String[] nodeIds = ((CmgInitMessage) msg).cmgNodes();
+
+                    List<ClusterNode> nodes = resolveNodes(clusterService, Arrays.asList(nodeIds));

Review comment:
       It worth to mention that according to cluster lifecycle document initial idea assumed that cmgNodes and metaStorageNodes are public **addresses** and not names. Despite that fact, personally I believe that node names suites even better here.

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/InitException.java
##########
@@ -15,8 +15,17 @@
  * limitations under the License.
  */
 
+package org.apache.ignite.internal.join;
+
 /**
- * Configuration schemas for Cluster node.
+ * Exception thrown when cluster initialization fails for some reason.
  */
+public class InitException extends RuntimeException {

Review comment:
       Please extend either IgniteInternalException or IgniteException if it's thrown from public api.

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/rest/InitCommandHandler.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join.rest;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.apache.ignite.internal.join.ClusterInitializer;
+import org.apache.ignite.internal.join.Leaders;
+import org.apache.ignite.internal.rest.ErrorResult;
+import org.apache.ignite.internal.rest.netty.RestApiHttpRequest;
+import org.apache.ignite.internal.rest.netty.RestApiHttpResponse;
+import org.apache.ignite.internal.rest.routes.RequestHandler;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * REST handler for the {@link InitCommand}.
+ */
+public class InitCommandHandler implements RequestHandler {
+    private static final IgniteLogger log = IgniteLogger.forClass(InitCommandHandler.class);
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final ClusterInitializer clusterInitializer;
+
+    public InitCommandHandler(ClusterInitializer clusterInitializer) {
+        this.clusterInitializer = clusterInitializer;
+    }
+
+    @Override
+    public CompletableFuture<RestApiHttpResponse> handle(RestApiHttpRequest request, RestApiHttpResponse response) {

Review comment:
       JavaDoc is missing.

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/ClusterManagementGroupManager.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join;
+
+import static org.apache.ignite.internal.join.Utils.resolveNodes;
+
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpMethod;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.join.messages.CancelInitMessage;
+import org.apache.ignite.internal.join.messages.CmgInitMessage;
+import org.apache.ignite.internal.join.messages.InitMessageGroup;
+import org.apache.ignite.internal.join.messages.InitMessagesFactory;
+import org.apache.ignite.internal.join.rest.InitCommandHandler;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.rest.RestModule;
+import org.apache.ignite.internal.rest.routes.Route;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.raft.client.Peer;
+
+/**
+ * Ignite component responsible for cluster initialization and managing the Cluster Management Raft Group.
+ */
+public class ClusterManagementGroupManager implements IgniteComponent {
+    private static final IgniteLogger log = IgniteLogger.forClass(ClusterManagementGroupManager.class);
+
+    /** CMG Raft group name. */
+    private static final String CMG_RAFT_GROUP_NAME = "cmg_raft_group";
+
+    /** Init REST endpoint path. */
+    private static final String REST_ENDPOINT = "/management/v1/cluster/init";
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    private final ClusterService clusterService;
+
+    private final Loza raftManager;
+
+    private final RestModule restModule;
+
+    /** Handles cluster initialization flow. */
+    private final ClusterInitializer clusterInitializer;
+
+    /** Constructor. */
+    public ClusterManagementGroupManager(ClusterService clusterService, Loza raftManager, RestModule restModule) {
+        this.clusterService = clusterService;
+        this.raftManager = raftManager;
+        this.restModule = restModule;
+        this.clusterInitializer = new ClusterInitializer(clusterService);
+
+        MessagingService messagingService = clusterService.messagingService();
+
+        var msgFactory = new InitMessagesFactory();
+
+        messagingService.addMessageHandler(InitMessageGroup.class, (msg, addr, correlationId) -> {

Review comment:
       The rule of thumb is to prefer start() to constructor for registering message handlers. Are there any practical reasons to do it in constructor in this particular case? 

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/rest/InitCommandHandler.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join.rest;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.apache.ignite.internal.join.ClusterInitializer;
+import org.apache.ignite.internal.join.Leaders;
+import org.apache.ignite.internal.rest.ErrorResult;
+import org.apache.ignite.internal.rest.netty.RestApiHttpRequest;
+import org.apache.ignite.internal.rest.netty.RestApiHttpResponse;
+import org.apache.ignite.internal.rest.routes.RequestHandler;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * REST handler for the {@link InitCommand}.
+ */
+public class InitCommandHandler implements RequestHandler {
+    private static final IgniteLogger log = IgniteLogger.forClass(InitCommandHandler.class);
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final ClusterInitializer clusterInitializer;
+
+    public InitCommandHandler(ClusterInitializer clusterInitializer) {

Review comment:
       JavaDoc is missing. By the way, why did our javadoc check miss this?

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/ClusterManagementGroupManager.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join;
+
+import static org.apache.ignite.internal.join.Utils.resolveNodes;
+
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpMethod;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.join.messages.CancelInitMessage;
+import org.apache.ignite.internal.join.messages.CmgInitMessage;
+import org.apache.ignite.internal.join.messages.InitMessageGroup;
+import org.apache.ignite.internal.join.messages.InitMessagesFactory;
+import org.apache.ignite.internal.join.rest.InitCommandHandler;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.rest.RestModule;
+import org.apache.ignite.internal.rest.routes.Route;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.raft.client.Peer;
+
+/**
+ * Ignite component responsible for cluster initialization and managing the Cluster Management Raft Group.
+ */
+public class ClusterManagementGroupManager implements IgniteComponent {
+    private static final IgniteLogger log = IgniteLogger.forClass(ClusterManagementGroupManager.class);
+
+    /** CMG Raft group name. */
+    private static final String CMG_RAFT_GROUP_NAME = "cmg_raft_group";
+
+    /** Init REST endpoint path. */
+    private static final String REST_ENDPOINT = "/management/v1/cluster/init";
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    private final ClusterService clusterService;
+
+    private final Loza raftManager;
+
+    private final RestModule restModule;
+
+    /** Handles cluster initialization flow. */
+    private final ClusterInitializer clusterInitializer;
+
+    /** Constructor. */
+    public ClusterManagementGroupManager(ClusterService clusterService, Loza raftManager, RestModule restModule) {
+        this.clusterService = clusterService;
+        this.raftManager = raftManager;
+        this.restModule = restModule;
+        this.clusterInitializer = new ClusterInitializer(clusterService);
+
+        MessagingService messagingService = clusterService.messagingService();
+
+        var msgFactory = new InitMessagesFactory();
+
+        messagingService.addMessageHandler(InitMessageGroup.class, (msg, addr, correlationId) -> {
+            if (!busyLock.enterBusy()) {
+                if (correlationId != null) {
+                    messagingService.respond(addr, errorResponse(msgFactory, new NodeStoppingException()), correlationId);
+                }
+
+                return;
+            }
+
+            try {
+                if (msg instanceof CancelInitMessage) {
+                    log.info("CMG initialization cancelled, reason: " + ((CancelInitMessage) msg).reason());
+
+                    raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+
+                    // TODO: drop the Raft storage as well, https://issues.apache.org/jira/browse/IGNITE-16471
+                } else if (msg instanceof CmgInitMessage) {
+                    assert correlationId != null;
+
+                    String[] nodeIds = ((CmgInitMessage) msg).cmgNodes();
+
+                    List<ClusterNode> nodes = resolveNodes(clusterService, Arrays.asList(nodeIds));
+
+                    raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, CmgRaftGroupListener::new)
+                            .whenComplete((service, e) -> {
+                                if (e == null) {
+                                    Peer leader = service.leader();
+
+                                    assert leader != null;
+
+                                    messagingService.respond(addr, leaderElectedResponse(msgFactory, leader), correlationId);
+                                } else {
+                                    messagingService.respond(addr, errorResponse(msgFactory, e), correlationId);
+                                }
+                            });
+                }
+            } catch (Exception e) {
+                log.error("Exception when initializing the CMG", e);
+
+                if (correlationId != null) {
+                    messagingService.respond(addr, errorResponse(msgFactory, e), correlationId);
+                }
+            } finally {
+                busyLock.leaveBusy();
+            }
+        });
+    }
+
+    /**
+     * Initializes the cluster that this node is present in.
+     *
+     * @param metaStorageNodeNames names of nodes that will host the Meta Storage.
+     * @param cmgNodeNames names of nodes that will host the Cluster Management Group.
+     */
+    public Leaders initCluster(Collection<String> metaStorageNodeNames, Collection<String> cmgNodeNames) throws NodeStoppingException {
+        if (!busyLock.enterBusy()) {
+            throw new NodeStoppingException();
+        }
+
+        try {
+            return clusterInitializer.initCluster(metaStorageNodeNames, cmgNodeNames).get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInternalException("Interrupted while initializing the cluster", e);
+        } catch (ExecutionException e) {
+            throw new IgniteInternalException("Unable to initialize the cluster", e.getCause());
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private static NetworkMessage errorResponse(InitMessagesFactory msgFactory, Throwable e) {
+        log.error("Exception when starting the CMG", e);
+
+        return msgFactory.initErrorMessage()
+                .cause(e.getMessage())
+                .build();
+    }
+
+    private NetworkMessage leaderElectedResponse(InitMessagesFactory msgFactory, Peer leader) {

Review comment:
       As was mentioned earlier leader election is a detail of realization, I believe that simple successfulResponse suites better here. 

##########
File path: modules/cli/src/integrationTest/java/org/apache/ignite/cli/ItConfigCommandTest.java
##########
@@ -32,74 +32,50 @@
 import io.micronaut.context.ApplicationContext;
 import io.micronaut.context.env.Environment;
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.io.PrintWriter;
-import java.net.ServerSocket;
 import java.nio.file.Path;
 import net.minidev.json.JSONObject;
 import net.minidev.json.JSONValue;
-import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.cli.spec.IgniteCliSpec;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
-import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.api.extension.ExtendWith;
 import picocli.CommandLine;
 
 /**
  * Integration test for {@code ignite config} commands.
  */
+@ExtendWith(WorkDirectoryExtension.class)
 public class ItConfigCommandTest extends AbstractCliTest {
     /** DI context. */
     private ApplicationContext ctx;
 
     /** stderr. */
-    private ByteArrayOutputStream err;
+    private final ByteArrayOutputStream err = new ByteArrayOutputStream();
 
     /** stdout. */
-    private ByteArrayOutputStream out;
-
-    /** Port for REST communication. */
-    private int restPort;
-
-    /** Port for thin client communication. */
-    private int clientPort;
-
-    /** Network port. */
-    private int networkPort;
+    private final ByteArrayOutputStream out = new ByteArrayOutputStream();
 
     /** Node. */
-    private Ignite node;
+    private IgniteImpl node;
 
     @BeforeEach
-    void setup(@TempDir Path workDir, TestInfo testInfo) throws IOException {
-        // TODO: IGNITE-15131 Must be replaced by receiving the actual port configs from the started node.

Review comment:
       Please check my comment in IgniteImpl class and if you are going to fix it within current ticket please resolve IGNITE-15131.

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/ClusterManagementGroupManager.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join;
+
+import static org.apache.ignite.internal.join.Utils.resolveNodes;
+
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpMethod;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.join.messages.CancelInitMessage;
+import org.apache.ignite.internal.join.messages.CmgInitMessage;
+import org.apache.ignite.internal.join.messages.InitMessageGroup;
+import org.apache.ignite.internal.join.messages.InitMessagesFactory;
+import org.apache.ignite.internal.join.rest.InitCommandHandler;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.rest.RestModule;
+import org.apache.ignite.internal.rest.routes.Route;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.raft.client.Peer;
+
+/**
+ * Ignite component responsible for cluster initialization and managing the Cluster Management Raft Group.
+ */
+public class ClusterManagementGroupManager implements IgniteComponent {
+    private static final IgniteLogger log = IgniteLogger.forClass(ClusterManagementGroupManager.class);
+
+    /** CMG Raft group name. */
+    private static final String CMG_RAFT_GROUP_NAME = "cmg_raft_group";
+
+    /** Init REST endpoint path. */
+    private static final String REST_ENDPOINT = "/management/v1/cluster/init";
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    private final ClusterService clusterService;
+
+    private final Loza raftManager;
+
+    private final RestModule restModule;
+
+    /** Handles cluster initialization flow. */
+    private final ClusterInitializer clusterInitializer;
+
+    /** Constructor. */
+    public ClusterManagementGroupManager(ClusterService clusterService, Loza raftManager, RestModule restModule) {
+        this.clusterService = clusterService;
+        this.raftManager = raftManager;
+        this.restModule = restModule;
+        this.clusterInitializer = new ClusterInitializer(clusterService);
+
+        MessagingService messagingService = clusterService.messagingService();
+
+        var msgFactory = new InitMessagesFactory();
+
+        messagingService.addMessageHandler(InitMessageGroup.class, (msg, addr, correlationId) -> {
+            if (!busyLock.enterBusy()) {
+                if (correlationId != null) {
+                    messagingService.respond(addr, errorResponse(msgFactory, new NodeStoppingException()), correlationId);
+                }
+
+                return;
+            }
+
+            try {
+                if (msg instanceof CancelInitMessage) {
+                    log.info("CMG initialization cancelled, reason: " + ((CancelInitMessage) msg).reason());
+
+                    raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);
+
+                    // TODO: drop the Raft storage as well, https://issues.apache.org/jira/browse/IGNITE-16471
+                } else if (msg instanceof CmgInitMessage) {
+                    assert correlationId != null;
+
+                    String[] nodeIds = ((CmgInitMessage) msg).cmgNodes();
+
+                    List<ClusterNode> nodes = resolveNodes(clusterService, Arrays.asList(nodeIds));
+
+                    raftManager.prepareRaftGroup(CMG_RAFT_GROUP_NAME, nodes, CmgRaftGroupListener::new)
+                            .whenComplete((service, e) -> {
+                                if (e == null) {
+                                    Peer leader = service.leader();
+
+                                    assert leader != null;

Review comment:
       We definitely need more reliable leader await logic here. It's perfectly fine for raft group not not elect leader for some time, because leader election process includes some randomized invariant inside  and because CmgInitMessage might be processed in different time frames on different nodes. I mean that if service.leader is null after service creation we should call refreshLeader that has retry logic inside.

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/messages/InitMessageGroup.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join.messages;
+
+import org.apache.ignite.network.annotations.MessageGroup;
+
+/**
+ * Message Group for cluster initialization and CMG management.
+ */
+@MessageGroup(groupType = 6, groupName = "InitMessages")
+public class InitMessageGroup {
+    /**
+     * Message type for {@link CmgInitMessage}.
+     */
+    public static final short CMG_INIT = 1;
+
+    /**
+     * Message type for {@link MetastorageInitMessage}.
+     */
+    public static final short METASTORAGE_INIT = 2;
+
+    /**
+     * Message type for {@link LeaderElectedMessage}.
+     */
+    public static final short LEADER_ELECTED = 3;

Review comment:
       Let's rename it to GROUP_READY

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/messages/LeaderElectedMessage.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join.messages;
+
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Successful response for initializing a Raft group.
+ */
+@Transferable(InitMessageGroup.LEADER_ELECTED)
+public interface LeaderElectedMessage extends NetworkMessage {

Review comment:
       Same as above, I believe that GROUP_READY is enough here, that actually means that we don't need exact leader name.

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/ClusterInitializer.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join;
+
+import static org.apache.ignite.internal.join.Utils.resolveNodes;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.join.messages.CancelInitMessage;
+import org.apache.ignite.internal.join.messages.CmgInitMessage;
+import org.apache.ignite.internal.join.messages.InitErrorMessage;
+import org.apache.ignite.internal.join.messages.InitMessagesFactory;
+import org.apache.ignite.internal.join.messages.LeaderElectedMessage;
+import org.apache.ignite.internal.join.messages.MetastorageInitMessage;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkMessage;
+
+/**
+ * Class for performing cluster initialization.
+ */
+public class ClusterInitializer {
+    private static final IgniteLogger log = IgniteLogger.forClass(ClusterInitializer.class);
+
+    private final ClusterService clusterService;
+
+    /** Constructor. */
+    ClusterInitializer(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    /**
+     * Initializes the cluster that this node is present in.
+     *
+     * @param metaStorageNodeNames names of nodes that will host the Meta Storage. Cannot be empty.
+     * @param cmgNodeNames names of nodes that will host the Cluster Management Group. Can be empty,
+     *      in which case {@code metaStorageNodeNames} will be used instead.
+     * @return future that resolves into leader node IDs if completed successfully.
+     */
+    public CompletableFuture<Leaders> initCluster(Collection<String> metaStorageNodeNames, Collection<String> cmgNodeNames) {
+        try {
+            if (metaStorageNodeNames.isEmpty()) {
+                throw new IllegalArgumentException("List of metastorage nodes must no be empty");
+            }
+
+            cmgNodeNames = cmgNodeNames.isEmpty() ? metaStorageNodeNames : cmgNodeNames;
+
+            @SuppressWarnings("unused")

Review comment:
       Could you please add a comment that it's a sort of validation, if it's really is.

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/ClusterManagementGroupManager.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join;
+
+import static org.apache.ignite.internal.join.Utils.resolveNodes;
+
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpMethod;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.join.messages.CancelInitMessage;
+import org.apache.ignite.internal.join.messages.CmgInitMessage;
+import org.apache.ignite.internal.join.messages.InitMessageGroup;
+import org.apache.ignite.internal.join.messages.InitMessagesFactory;
+import org.apache.ignite.internal.join.rest.InitCommandHandler;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.rest.RestModule;
+import org.apache.ignite.internal.rest.routes.Route;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.raft.client.Peer;
+
+/**
+ * Ignite component responsible for cluster initialization and managing the Cluster Management Raft Group.
+ */
+public class ClusterManagementGroupManager implements IgniteComponent {
+    private static final IgniteLogger log = IgniteLogger.forClass(ClusterManagementGroupManager.class);
+
+    /** CMG Raft group name. */
+    private static final String CMG_RAFT_GROUP_NAME = "cmg_raft_group";
+
+    /** Init REST endpoint path. */
+    private static final String REST_ENDPOINT = "/management/v1/cluster/init";
+
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+    /** Prevents double stopping the component. */
+    private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+    private final ClusterService clusterService;
+
+    private final Loza raftManager;
+
+    private final RestModule restModule;
+
+    /** Handles cluster initialization flow. */
+    private final ClusterInitializer clusterInitializer;
+
+    /** Constructor. */
+    public ClusterManagementGroupManager(ClusterService clusterService, Loza raftManager, RestModule restModule) {
+        this.clusterService = clusterService;
+        this.raftManager = raftManager;
+        this.restModule = restModule;
+        this.clusterInitializer = new ClusterInitializer(clusterService);
+
+        MessagingService messagingService = clusterService.messagingService();
+
+        var msgFactory = new InitMessagesFactory();
+
+        messagingService.addMessageHandler(InitMessageGroup.class, (msg, addr, correlationId) -> {
+            if (!busyLock.enterBusy()) {
+                if (correlationId != null) {
+                    messagingService.respond(addr, errorResponse(msgFactory, new NodeStoppingException()), correlationId);
+                }
+
+                return;
+            }
+
+            try {
+                if (msg instanceof CancelInitMessage) {
+                    log.info("CMG initialization cancelled, reason: " + ((CancelInitMessage) msg).reason());
+
+                    raftManager.stopRaftGroup(CMG_RAFT_GROUP_NAME);

Review comment:
       We should also remove cmgNodes and msNodes from Vault here.

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/Leaders.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join;
+
+import java.util.Objects;
+
+/**
+ * Data class containing consistent IDs of leaders of the Meta Storage and the CMG.
+ */
+public class Leaders {

Review comment:
       Seems that we don't need given class at all.

##########
File path: modules/metastorage/pom.xml
##########
@@ -38,6 +38,11 @@
             <artifactId>ignite-configuration</artifactId>
         </dependency>
 
+        <dependency>

Review comment:
       I actually don't think that we need it, cause ignite-runner will handle on clusterStateMessage that will include metaStorageNodes.

##########
File path: modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
##########
@@ -122,174 +115,233 @@
 
     /**
      * Future which will be completed with {@link IgniteUuid}, when aggregated watch will be successfully deployed. Can be resolved to
-     * {@link Optional#empty()} if no watch deployed at the moment.
-     */
-    private CompletableFuture<Optional<IgniteUuid>> deployFut = new CompletableFuture<>();
-
-    /**
-     * If true - all new watches will be deployed immediately.
+     * {@code null} if no watch deployed at the moment.
      *
-     * <p>If false - all new watches will be aggregated to one batch for further deploy by {@link MetaStorageManager#deployWatches()}
+     * <p>Multithreaded access is guarded by {@code this}.
      */
-    private boolean deployed;
-
-    /** Flag indicates if meta storage nodes were set on start. */
-    private boolean metaStorageNodesOnStart;
+    private CompletableFuture<IgniteUuid> deployFut = new CompletableFuture<>();
 
     /** Actual storage for the Metastorage. */
     private final KeyValueStorage storage;
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-    /** Prevents double stopping the component. */
-    AtomicBoolean stopGuard = new AtomicBoolean();

Review comment:
       Let's use consistent solution for all components and use either stopGuard or isStopped flag.
   Within CMGManager you've introduced stopGuard.

##########
File path: modules/join/src/main/java/org/apache/ignite/internal/join/ClusterInitializer.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.join;
+
+import static org.apache.ignite.internal.join.Utils.resolveNodes;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.join.messages.CancelInitMessage;
+import org.apache.ignite.internal.join.messages.CmgInitMessage;
+import org.apache.ignite.internal.join.messages.InitErrorMessage;
+import org.apache.ignite.internal.join.messages.InitMessagesFactory;
+import org.apache.ignite.internal.join.messages.LeaderElectedMessage;
+import org.apache.ignite.internal.join.messages.MetastorageInitMessage;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkMessage;
+
+/**
+ * Class for performing cluster initialization.
+ */
+public class ClusterInitializer {
+    private static final IgniteLogger log = IgniteLogger.forClass(ClusterInitializer.class);
+
+    private final ClusterService clusterService;
+
+    /** Constructor. */
+    ClusterInitializer(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    /**
+     * Initializes the cluster that this node is present in.
+     *
+     * @param metaStorageNodeNames names of nodes that will host the Meta Storage. Cannot be empty.
+     * @param cmgNodeNames names of nodes that will host the Cluster Management Group. Can be empty,
+     *      in which case {@code metaStorageNodeNames} will be used instead.
+     * @return future that resolves into leader node IDs if completed successfully.
+     */
+    public CompletableFuture<Leaders> initCluster(Collection<String> metaStorageNodeNames, Collection<String> cmgNodeNames) {
+        try {
+            if (metaStorageNodeNames.isEmpty()) {
+                throw new IllegalArgumentException("List of metastorage nodes must no be empty");
+            }
+
+            cmgNodeNames = cmgNodeNames.isEmpty() ? metaStorageNodeNames : cmgNodeNames;
+
+            @SuppressWarnings("unused")
+            List<ClusterNode> metastorageNodes = resolveNodes(clusterService, metaStorageNodeNames);
+
+            @SuppressWarnings("unused")
+            List<ClusterNode> cmgNodes = resolveNodes(clusterService, cmgNodeNames);
+
+            // TODO: init message should only be sent to metastorageNodes and cmgNodes respectively,
+            //  https://issues.apache.org/jira/browse/IGNITE-16471
+            Collection<ClusterNode> allMembers = clusterService.topologyService().allMembers();
+
+            var msgFactory = new InitMessagesFactory();
+
+            MetastorageInitMessage metaStorageInitMessage = msgFactory.metastorageInitMessage()
+                    .metastorageNodes(metaStorageNodeNames.toArray(String[]::new))
+                    .build();
+
+            CompletableFuture<String> metaStorageInitFuture = invokeMessage(allMembers, metaStorageInitMessage);
+
+            CmgInitMessage cmgInitMessage = msgFactory.cmgInitMessage()
+                    .cmgNodes(cmgNodeNames.toArray(String[]::new))
+                    .build();
+
+            CompletableFuture<String> cmgInitFuture = invokeMessage(allMembers, cmgInitMessage);
+
+            return metaStorageInitFuture
+                    .thenCombine(cmgInitFuture, Leaders::new)
+                    .whenComplete((leaders, e) -> {
+                        if (e != null) {
+                            log.error("Initialization failed, rolling back", e);
+
+                            CancelInitMessage cancelMessage = msgFactory.cancelInitMessage()
+                                    .reason(e.getMessage())
+                                    .build();
+
+                            cancelInit(allMembers, cancelMessage);
+                        }
+                    });
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        }
+    }
+
+    /**
+     * Sends a message to all provided nodes.
+     *
+     * @param nodes nodes to send message to.
+     * @param message message to send.
+     * @return future that either resolves to a leader node ID or fails if any of the nodes return an error response.
+     */
+    private CompletableFuture<String> invokeMessage(Collection<ClusterNode> nodes, NetworkMessage message) {

Review comment:
       As was mentioned above I don't think that we need leaderId, [ok] response if enough here. 

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
##########
@@ -143,11 +151,10 @@
     /**
      * The Constructor.
      *
-     * @param name                       Ignite node name.
-     * @param workDir                    Work directory for the started node. Must not be {@code null}.
-     * @param serviceProviderClassLoader The class loader to be used to load provider-configuration files and provider
-     *                                   classes, or {@code null} if the system class loader (or, failing that
-     *                                   the bootstrap class loader) is to be used
+     * @param name Ignite node name.
+     * @param workDir Work directory for the started node. Must not be {@code null}.
+     * @param serviceProviderClassLoader The class loader to be used to load provider-configuration files and provider classes, or {@code

Review comment:
       {@code
   null}
   Auf

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
##########
@@ -320,6 +327,7 @@ public void start(@Nullable String cfg) {
                     distributedTblMgr,
                     qryEngine,
                     restModule,
+                    cmgMgr,

Review comment:
       Order matters here, cmgMgr should start before metaStorageMgr.

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
##########
@@ -351,8 +359,8 @@ public void start(@Nullable String cfg) {
      */
     public void stop() {
         if (status.getAndSet(Status.STOPPING) == Status.STARTED) {
-            doStopNode(List.of(vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, txManager, metaStorageMgr, clusterCfgMgr, baselineMgr,
-                    distributedTblMgr, qryEngine, restModule, clientHandlerModule, nettyBootstrapFactory));
+            doStopNode(List.of(nettyBootstrapFactory, vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, txManager, metaStorageMgr, clusterCfgMgr,

Review comment:
       Please don't forget to adjust cmgMgr position here. 

##########
File path: modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
##########
@@ -122,174 +115,233 @@
 
     /**
      * Future which will be completed with {@link IgniteUuid}, when aggregated watch will be successfully deployed. Can be resolved to
-     * {@link Optional#empty()} if no watch deployed at the moment.
-     */
-    private CompletableFuture<Optional<IgniteUuid>> deployFut = new CompletableFuture<>();
-
-    /**
-     * If true - all new watches will be deployed immediately.
+     * {@code null} if no watch deployed at the moment.
      *
-     * <p>If false - all new watches will be aggregated to one batch for further deploy by {@link MetaStorageManager#deployWatches()}
+     * <p>Multithreaded access is guarded by {@code this}.
      */
-    private boolean deployed;
-
-    /** Flag indicates if meta storage nodes were set on start. */
-    private boolean metaStorageNodesOnStart;
+    private CompletableFuture<IgniteUuid> deployFut = new CompletableFuture<>();
 
     /** Actual storage for the Metastorage. */
     private final KeyValueStorage storage;
 
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
-    /** Prevents double stopping the component. */
-    AtomicBoolean stopGuard = new AtomicBoolean();
+    /**
+     * If true - all new watches will be deployed immediately.
+     *
+     * <p>If false - all new watches will be aggregated to one batch for further deploy by {@link MetaStorageManager#deployWatches()}.
+     *
+     * <p>Multithreaded access is guarded by {@code this}.
+     */
+    private boolean areWatchesDeployed = false;
+
+    /**
+     * Flag that indicates that the Meta Storage has been initialized.
+     */
+    private final AtomicBoolean isInitialized = new AtomicBoolean();
+
+    /**
+     * Prevents double stopping the component.
+     *
+     * <p>Multithreaded access is guarded by {@code this}.
+     */
+    private boolean isStopped = false;
 
     /**
      * The constructor.
      *
-     * @param vaultMgr      Vault manager.
-     * @param locCfgMgr     Local configuration manager.
-     * @param clusterNetSvc Cluster network service.
-     * @param raftMgr       Raft manager.
-     * @param storage       Storage. This component owns this resource and will manage its lifecycle.
+     * @param vaultMgr Vault manager.
+     * @param clusterService Cluster network service.
+     * @param raftMgr Raft manager.
+     * @param storage Storage. This component owns this resource and will manage its lifecycle.
      */
     public MetaStorageManager(
             VaultManager vaultMgr,
-            ConfigurationManager locCfgMgr,
-            ClusterService clusterNetSvc,
+            ClusterService clusterService,
             Loza raftMgr,
             KeyValueStorage storage
     ) {
         this.vaultMgr = vaultMgr;
-        this.locCfgMgr = locCfgMgr;
-        this.clusterNetSvc = clusterNetSvc;
         this.raftMgr = raftMgr;
         this.storage = storage;
-    }
 
-    /** {@inheritDoc} */
-    @Override
-    public void start() {
-        String[] metastorageNodes = this.locCfgMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
-                .metastorageNodes().value();
+        MessagingService messagingService = clusterService.messagingService();
 
-        Predicate<ClusterNode> metaStorageNodesContainsLocPred =
-                clusterNode -> Arrays.asList(metastorageNodes).contains(clusterNode.name());
+        var msgFactory = new InitMessagesFactory();

Review comment:
       As was mentioned earlier, I believe that we should use clusterState from within runner process to init metastorage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org