You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "Flaugh24 (via GitHub)" <gi...@apache.org> on 2023/05/11 17:23:10 UTC

[GitHub] [ignite-3] Flaugh24 opened a new pull request, #2066: IGNITE-19464 Move out logic of applying cluster configuration from ClusterManagementGroupManager

Flaugh24 opened a new pull request, #2066:
URL: https://github.com/apache/ignite-3/pull/2066

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2066: IGNITE-19464 Move out logic of applying cluster configuration from ClusterManagementGroupManager

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2066:
URL: https://github.com/apache/ignite-3/pull/2066#discussion_r1197500025


##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java:
##########
@@ -31,37 +31,37 @@ public class DistributedConfigurationUpdater implements IgniteComponent {
 
     private static final IgniteLogger LOG = Loggers.forClass(DistributedConfigurationUpdater.class);
 
-    private final CompletableFuture<ConfigurationPresentation<String>> clusterCfgPresentation = new CompletableFuture<>();
+    private final ClusterManagementGroupManager cmgMgr;
 
-    public void setDistributedConfigurationPresentation(ConfigurationPresentation<String> presentation) {
-        clusterCfgPresentation.complete(presentation);
-    }
+    private final ConfigurationPresentation<String> presentation;
 
-    /**
-     * Applies changes to the cluster configuration when {@link DistributedConfigurationUpdater#clusterCfgPresentation}
-     * is complete.
-     *
-     * @param configurationToApply Cluster configuration that should be applied.
-     * @return Future that will be completed when cluster configuration is updated.
-     */
-    public CompletableFuture<Void> updateConfiguration(String configurationToApply) {
-        return clusterCfgPresentation.thenCompose(presentation -> presentation.update(configurationToApply))
-                .whenComplete((v, e) -> {
-                    if (e != null) {
-                        LOG.error("Unable to update cluster configuration", e);
-                    } else {
-                        LOG.info("Cluster configuration updated successfully");
-                    }
-                });
+    public DistributedConfigurationUpdater(ClusterManagementGroupManager cmgMgr, ConfigurationPresentation<String> presentation) {
+        this.cmgMgr = cmgMgr;
+        this.presentation = presentation;
     }
 
     @Override
     public void start() {
-
+        cmgMgr.clusterConfigurationToUpdate()
+                .thenApply(action -> {

Review Comment:
   This should be `thenAccept`, not `thenApply` (as was written in my original comment), what's the point of creating a future in the `else` branch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2066: IGNITE-19464 Move out logic of applying cluster configuration from ClusterManagementGroupManager

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2066:
URL: https://github.com/apache/ignite-3/pull/2066#discussion_r1195224735


##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -351,6 +355,45 @@ void testLeaderChangeDuringJoin(TestInfo testInfo) throws Exception {
         assertThat(node.clusterManager().onJoinReady(), willCompleteSuccessfully());
     }
 
+    @Test
+    void testClusterConfigurationIsRemovedFromClusterStateAfterUpdating(TestInfo testInfo) throws Exception {
+        // Start a cluster of 3 nodes so that the CMG leader node could be stopped later.
+        startCluster(3, testInfo);
+
+        String[] cmgNodes = clusterNodeNames();
+
+        // Start the CMG on all 3 nodes.
+        String clusterConfiguration = "security.authentication.enabled:true";
+        initCluster(cmgNodes, cmgNodes, clusterConfiguration);
+
+        // Find the CMG leader and stop it.
+        MockNode leaderNode = findLeaderNode(cluster).orElseThrow();
+
+        // Read cluster configuration from the cluster state and remove it.
+        UpdateDistributedConfigurationAction configurationAction = leaderNode.clusterManager()
+                .clusterConfigurationToUpdate()
+                .get();
+
+        assertThat(configurationAction.configuration(), is(clusterConfiguration));
+        configurationAction.nextAction().apply(CompletableFuture.completedFuture(null)).join();
+
+        // Stop the cluster leader.
+        stopNodes(List.of(leaderNode));

Review Comment:
   Please leave a clarifying comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] Flaugh24 commented on pull request #2066: IGNITE-19464 Move out logic of applying cluster configuration from ClusterManagementGroupManager

Posted by "Flaugh24 (via GitHub)" <gi...@apache.org>.
Flaugh24 commented on PR #2066:
URL: https://github.com/apache/ignite-3/pull/2066#issuecomment-1544395558

   https://issues.apache.org/jira/browse/IGNITE-19464


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2066: IGNITE-19464 Move out logic of applying cluster configuration from ClusterManagementGroupManager

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2066:
URL: https://github.com/apache/ignite-3/pull/2066#discussion_r1193112863


##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +387,41 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
+        raftServiceAfterJoin().thenCompose(service ->
+                service.readClusterState()
+                        .thenAccept(state -> {

Review Comment:
   I think we need to use `whenComplete` here to complete `updateDistributedConfigurationActionFuture` with exceptions in case previous operations fail



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +387,41 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
+        raftServiceAfterJoin().thenCompose(service ->
+                service.readClusterState()
+                        .thenAccept(state -> {
+                            updateDistributedConfigurationActionFuture.complete(
+                                    new UpdateDistributedConfigurationAction(
+                                            state.clusterConfigurationToApply(),
+                                            (result) -> removeClusterConfigFromClusterState(result, service))
+                            );
+                        }));
     }
 
-    private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService service) {
-        return service.readClusterState()
-                .thenCompose(state -> {
-                    if (state == null) {
-                        LOG.info("No CMG state found in the Raft service");
-                        return completedFuture(null);
-                    } else if (state.clusterConfigurationToApply() == null) {
-                        // Config was applied or wasn't provided
-                        LOG.info("No cluster configuration found in the Raft service");
-                        return completedFuture(null);
+    private CompletableFuture<Void> removeClusterConfigFromClusterState(
+            CompletableFuture<Void> configurationAppliedFuture,
+            CmgRaftService service
+    ) {
+        return configurationAppliedFuture.thenCombine(
+                        service.readClusterState(),
+                        (ignored, state) -> {
+                            Collection<String> cmgNodes = state.cmgNodes();

Review Comment:
   These local variables look redundant, we can inline them



##########
modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.UpdateDistributedConfigurationAction;
+import org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DistributedConfigurationUpdaterTest {
+
+    @Mock
+    public ConfigurationPresentation<String> presentation;
+
+    @Mock
+    public ClusterManagementGroupManager cmgMgr;
+
+    @Test
+    public void nextActionIsCompletedAfterUpdatingConfiguration() {
+
+        // Set up mocks.
+        when(presentation.update(anyString())).thenReturn(CompletableFuture.completedFuture(null));
+
+        CompletableFuture<Void> nextAction = new CompletableFuture<>();
+        String configuration = "security.authentication.enabled:true";
+        UpdateDistributedConfigurationAction updateDistributedConfigurationAction =
+                new UpdateDistributedConfigurationAction(
+                        configuration,
+                        (result) -> result.whenComplete((v, e) -> nextAction.complete(null)));
+
+        when(cmgMgr.clusterConfigurationToUpdate())
+                .thenReturn(CompletableFuture.completedFuture(updateDistributedConfigurationAction));
+
+        // Run updater.
+        DistributedConfigurationUpdater distributedConfigurationUpdater = new DistributedConfigurationUpdater(

Review Comment:
   `distributedConfigurationUpdater` can be extracted into a field 



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Composite action to update the distributed configuration.
+ */
+public class UpdateDistributedConfigurationAction {
+
+    /**
+     * Configuration that should be applied.
+     */
+    private final String configuration;
+
+    private final Function<CompletableFuture<Void>, CompletableFuture<Void>> nextAction;
+
+
+    /**
+     * Constructor.
+     *
+     * @param configuration Configuration that should be applied.
+     * @param nextAction The next action to be performed.
+     */
+    public UpdateDistributedConfigurationAction(
+            @Nullable String configuration,
+            Function<CompletableFuture<Void>, CompletableFuture<Void>> nextAction

Review Comment:
   Why do we need the first `CompletableFuture` parameter here? I can see that it is only used in `DistributedConfigurationUpdater` in order to pass an exception here. Why do we need that?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +387,41 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
+        raftServiceAfterJoin().thenCompose(service ->
+                service.readClusterState()
+                        .thenAccept(state -> {
+                            updateDistributedConfigurationActionFuture.complete(
+                                    new UpdateDistributedConfigurationAction(
+                                            state.clusterConfigurationToApply(),
+                                            (result) -> removeClusterConfigFromClusterState(result, service))
+                            );
+                        }));
     }
 
-    private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService service) {
-        return service.readClusterState()
-                .thenCompose(state -> {
-                    if (state == null) {
-                        LOG.info("No CMG state found in the Raft service");
-                        return completedFuture(null);
-                    } else if (state.clusterConfigurationToApply() == null) {
-                        // Config was applied or wasn't provided
-                        LOG.info("No cluster configuration found in the Raft service");
-                        return completedFuture(null);
+    private CompletableFuture<Void> removeClusterConfigFromClusterState(
+            CompletableFuture<Void> configurationAppliedFuture,
+            CmgRaftService service
+    ) {
+        return configurationAppliedFuture.thenCombine(

Review Comment:
   This links to my previous question: I think `configurationAppliedFuture` is redundant. Moreover, in case of an exception we will still read the cluster state for some reason



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java:
##########
@@ -31,37 +31,38 @@ public class DistributedConfigurationUpdater implements IgniteComponent {
 
     private static final IgniteLogger LOG = Loggers.forClass(DistributedConfigurationUpdater.class);
 
-    private final CompletableFuture<ConfigurationPresentation<String>> clusterCfgPresentation = new CompletableFuture<>();
+    private final ClusterManagementGroupManager cmgMgr;
 
-    public void setDistributedConfigurationPresentation(ConfigurationPresentation<String> presentation) {
-        clusterCfgPresentation.complete(presentation);
+    private final ConfigurationPresentation<String> presentation;
+
+    public DistributedConfigurationUpdater(ClusterManagementGroupManager cmgMgr, ConfigurationPresentation<String> presentation) {
+        this.cmgMgr = cmgMgr;
+        this.presentation = presentation;
     }
 
-    /**
-     * Applies changes to the cluster configuration when {@link DistributedConfigurationUpdater#clusterCfgPresentation}
-     * is complete.
-     *
-     * @param configurationToApply Cluster configuration that should be applied.
-     * @return Future that will be completed when cluster configuration is updated.
-     */
-    public CompletableFuture<Void> updateConfiguration(String configurationToApply) {
-        return clusterCfgPresentation.thenCompose(presentation -> presentation.update(configurationToApply))
-                .whenComplete((v, e) -> {
+    @Override
+    public void start() {
+        cmgMgr.clusterConfigurationToUpdate()

Review Comment:
   How is this going to work? This future is only completed on the CMG leader, what will happen on all other nodes? Will this future be stuck forever? I think we need to have a mechanism to cancel it on all nodes apart from the CMG leader



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +387,41 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
+        raftServiceAfterJoin().thenCompose(service ->
+                service.readClusterState()
+                        .thenAccept(state -> {
+                            updateDistributedConfigurationActionFuture.complete(
+                                    new UpdateDistributedConfigurationAction(
+                                            state.clusterConfigurationToApply(),
+                                            (result) -> removeClusterConfigFromClusterState(result, service))
+                            );
+                        }));
     }
 
-    private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService service) {
-        return service.readClusterState()
-                .thenCompose(state -> {
-                    if (state == null) {
-                        LOG.info("No CMG state found in the Raft service");
-                        return completedFuture(null);
-                    } else if (state.clusterConfigurationToApply() == null) {
-                        // Config was applied or wasn't provided
-                        LOG.info("No cluster configuration found in the Raft service");
-                        return completedFuture(null);
+    private CompletableFuture<Void> removeClusterConfigFromClusterState(
+            CompletableFuture<Void> configurationAppliedFuture,
+            CmgRaftService service
+    ) {
+        return configurationAppliedFuture.thenCombine(
+                        service.readClusterState(),
+                        (ignored, state) -> {
+                            Collection<String> cmgNodes = state.cmgNodes();
+                            Collection<String> msNodes = state.metaStorageNodes();
+                            IgniteProductVersion igniteVersion = state.igniteVersion();
+                            ClusterTag clusterTag = state.clusterTag();
+                            return msgFactory.clusterState()
+                                    .cmgNodes(Set.copyOf(cmgNodes))
+                                    .metaStorageNodes(Set.copyOf(msNodes))
+                                    .version(igniteVersion.toString())
+                                    .clusterTag(clusterTag)
+                                    .build();
+                        })
+                .thenCompose(service::updateClusterState)
+                .whenComplete((v2, e2) -> {

Review Comment:
   `v2` and `e2`?



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java:
##########
@@ -31,37 +31,38 @@ public class DistributedConfigurationUpdater implements IgniteComponent {
 
     private static final IgniteLogger LOG = Loggers.forClass(DistributedConfigurationUpdater.class);
 
-    private final CompletableFuture<ConfigurationPresentation<String>> clusterCfgPresentation = new CompletableFuture<>();
+    private final ClusterManagementGroupManager cmgMgr;
 
-    public void setDistributedConfigurationPresentation(ConfigurationPresentation<String> presentation) {
-        clusterCfgPresentation.complete(presentation);
+    private final ConfigurationPresentation<String> presentation;
+
+    public DistributedConfigurationUpdater(ClusterManagementGroupManager cmgMgr, ConfigurationPresentation<String> presentation) {
+        this.cmgMgr = cmgMgr;
+        this.presentation = presentation;
     }
 
-    /**
-     * Applies changes to the cluster configuration when {@link DistributedConfigurationUpdater#clusterCfgPresentation}
-     * is complete.
-     *
-     * @param configurationToApply Cluster configuration that should be applied.
-     * @return Future that will be completed when cluster configuration is updated.
-     */
-    public CompletableFuture<Void> updateConfiguration(String configurationToApply) {
-        return clusterCfgPresentation.thenCompose(presentation -> presentation.update(configurationToApply))
-                .whenComplete((v, e) -> {
+    @Override
+    public void start() {
+        cmgMgr.clusterConfigurationToUpdate()
+                .thenApply(action -> {
+                    if (action.configuration() != null) {
+                        presentation.update(action.configuration());

Review Comment:
   `presentation.update` returns a future, which we ignore here, that's a bug



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -351,6 +355,45 @@ void testLeaderChangeDuringJoin(TestInfo testInfo) throws Exception {
         assertThat(node.clusterManager().onJoinReady(), willCompleteSuccessfully());
     }
 
+    @Test
+    void testClusterConfigurationIsRemovedFromClusterStateAfterUpdating(TestInfo testInfo) throws Exception {
+        // Start a cluster of 3 nodes so that the CMG leader node could be stopped later.
+        startCluster(3, testInfo);
+
+        String[] cmgNodes = clusterNodeNames();
+
+        // Start the CMG on all 3 nodes.
+        String clusterConfiguration = "security.authentication.enabled:true";
+        initCluster(cmgNodes, cmgNodes, clusterConfiguration);
+
+        // Find the CMG leader and stop it.
+        MockNode leaderNode = findLeaderNode(cluster).orElseThrow();
+
+        // Read cluster configuration from the cluster state and remove it.
+        UpdateDistributedConfigurationAction configurationAction = leaderNode.clusterManager()
+                .clusterConfigurationToUpdate()
+                .get();
+
+        assertThat(configurationAction.configuration(), is(clusterConfiguration));
+        configurationAction.nextAction().apply(CompletableFuture.completedFuture(null)).join();
+
+        // Stop the cluster leader.
+        stopNodes(List.of(leaderNode));

Review Comment:
   What scenario are you testing here? Why do we need to stop the leader?



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -440,11 +495,17 @@ private void waitForLogicalTopology() throws InterruptedException {
         }, 10000));
     }
 
+
     private void initCluster(String[] metaStorageNodes, String[] cmgNodes) throws NodeStoppingException {
+        initCluster(metaStorageNodes, cmgNodes, null);
+    }
+
+    private void initCluster(String[] metaStorageNodes, String[] cmgNodes, String clusterConfiguration) throws NodeStoppingException {

Review Comment:
   `clusterConfiguration` should be annotated as `@Nullable`



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -414,6 +457,18 @@ void nonCmgNodeAddedLaterGetsLogicalTopologyChanges(TestInfo testInfo) throws Ex
         assertTrue(waitForCondition(() -> nonCmgTopology.getLogicalTopology().nodes().size() == 2, 10_000));
     }
 
+    private Optional<MockNode> findLeaderNode(List<MockNode> cluster) {

Review Comment:
   Since you have extracted a method, please use it in other places in this class as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2066: IGNITE-19464 Move out logic of applying cluster configuration from ClusterManagementGroupManager

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2066:
URL: https://github.com/apache/ignite-3/pull/2066#discussion_r1195257941


##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,46 +387,45 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
+        raftServiceAfterJoin().whenComplete((service, e) -> {
+            if (e != null) {
+                LOG.error("Error when joining to the raft service", e);
+                updateDistributedConfigurationActionFuture.completeExceptionally(e);
+            } else {
+                service.readClusterState()
+                        .thenAccept(state -> updateDistributedConfigurationActionFuture.complete(

Review Comment:
   Why do we read the state here, then ignore it and read it again in `updateClusterConfigurationAndRemoveFromState`?



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java:
##########
@@ -31,37 +30,27 @@ public class DistributedConfigurationUpdater implements IgniteComponent {
 
     private static final IgniteLogger LOG = Loggers.forClass(DistributedConfigurationUpdater.class);
 
-    private final CompletableFuture<ConfigurationPresentation<String>> clusterCfgPresentation = new CompletableFuture<>();
+    private final ClusterManagementGroupManager cmgMgr;
 
-    public void setDistributedConfigurationPresentation(ConfigurationPresentation<String> presentation) {
-        clusterCfgPresentation.complete(presentation);
+    private final ConfigurationPresentation<String> presentation;
+
+    public DistributedConfigurationUpdater(ClusterManagementGroupManager cmgMgr, ConfigurationPresentation<String> presentation) {
+        this.cmgMgr = cmgMgr;
+        this.presentation = presentation;
     }
 
-    /**
-     * Applies changes to the cluster configuration when {@link DistributedConfigurationUpdater#clusterCfgPresentation}
-     * is complete.
-     *
-     * @param configurationToApply Cluster configuration that should be applied.
-     * @return Future that will be completed when cluster configuration is updated.
-     */
-    public CompletableFuture<Void> updateConfiguration(String configurationToApply) {
-        return clusterCfgPresentation.thenCompose(presentation -> presentation.update(configurationToApply))
+    @Override
+    public void start() {
+        cmgMgr.clusterConfigurationToUpdate()
+                .thenApply(action -> action.execute(presentation::update))

Review Comment:
   this must be `thenCompose`



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Action to update the distributed configuration.
+ */
+public class UpdateDistributedConfigurationAction {
+
+    /**
+     * Action that should be executed.
+     */
+    private final Function<Function<String, CompletableFuture<Void>>, CompletableFuture<Void>> action;

Review Comment:
   Well, the code became worse since the last time, I think =) It's too complicated now. I would suggest to do the following:
   1. Have a `String` field, just as you had before.
   2. Change the action type to `Supplier<CompletableFuture<Void>>`, which will be used by the CMG manager to cleanup the configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2066: IGNITE-19464 Move out logic of applying cluster configuration from ClusterManagementGroupManager

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2066:
URL: https://github.com/apache/ignite-3/pull/2066#discussion_r1192116839


##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java:
##########
@@ -35,9 +43,15 @@ public DistributedConfigurationUpdater(ClusterManagementGroupManager cmgMgr, Con
     public void start() {
         HoconPresentation presentation = new HoconPresentation(clusterCfgMgr.configurationRegistry());
         cmgMgr.clusterConfigurationToUpdate().thenAccept(action -> {
-            if (action.currentAction() != null) {
-                presentation.update(action.currentAction())
-                        .thenApply(v -> action.nextAction());
+            if (action.configuration() != null) {
+                presentation.update(action.configuration())
+                        .handle((v, e) -> {
+                            if (e != null) {
+                                LOG.error("Failed to update the distributed configuration", e);
+                            }
+                            return action.nextAction();

Review Comment:
   Will "nextAction" nullify the configuration?
   In such case, it is not applied, right? You shouldn't return the next action.



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,25 +387,17 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
-    }
-
-    private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService service) {
-        return service.readClusterState()
-                .thenCompose(state -> {
-                    if (state == null) {
-                        LOG.info("No CMG state found in the Raft service");
-                        return completedFuture(null);
-                    } else if (state.clusterConfigurationToApply() == null) {
-                        // Config was applied or wasn't provided
-                        LOG.info("No cluster configuration found in the Raft service");
-                        return completedFuture(null);
-                    } else {
-                        LOG.info("Cluster configuration is found in the Raft service, going to apply it");
-                        return distributedConfigurationUpdater.updateConfiguration(state.clusterConfigurationToApply())
-                                .thenCompose(unused -> removeClusterConfigFromClusterState(service));
-                    }
-                });
+        raftServiceAfterJoin().thenAccept(service -> {
+            service.readClusterState()
+                    .thenAccept(state -> {
+                        updateDistributedConfigurationActionFuture.complete(
+                                new UpdateDistributedConfigurationAction(
+                                        state.clusterConfigurationToApply(),
+                                        removeClusterConfigFromClusterState(service)

Review Comment:
   Looks like we start deleting the configuration before it has been applied. This is clearly a bug



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java:
##########
@@ -35,9 +43,15 @@ public DistributedConfigurationUpdater(ClusterManagementGroupManager cmgMgr, Con
     public void start() {
         HoconPresentation presentation = new HoconPresentation(clusterCfgMgr.configurationRegistry());
         cmgMgr.clusterConfigurationToUpdate().thenAccept(action -> {
-            if (action.currentAction() != null) {
-                presentation.update(action.currentAction())
-                        .thenApply(v -> action.nextAction());
+            if (action.configuration() != null) {
+                presentation.update(action.configuration())
+                        .handle((v, e) -> {
+                            if (e != null) {
+                                LOG.error("Failed to update the distributed configuration", e);
+                            }
+                            return action.nextAction();
+                        })
+                        .thenApply(v -> v);

Review Comment:
   Please explain, why we need this code. It looks out of place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2066: IGNITE-19464 Move out logic of applying cluster configuration from ClusterManagementGroupManager

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2066:
URL: https://github.com/apache/ignite-3/pull/2066#discussion_r1196376153


##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java:
##########
@@ -31,37 +31,37 @@ public class DistributedConfigurationUpdater implements IgniteComponent {
 
     private static final IgniteLogger LOG = Loggers.forClass(DistributedConfigurationUpdater.class);
 
-    private final CompletableFuture<ConfigurationPresentation<String>> clusterCfgPresentation = new CompletableFuture<>();
+    private final ClusterManagementGroupManager cmgMgr;
 
-    public void setDistributedConfigurationPresentation(ConfigurationPresentation<String> presentation) {
-        clusterCfgPresentation.complete(presentation);
-    }
+    private final ConfigurationPresentation<String> presentation;
 
-    /**
-     * Applies changes to the cluster configuration when {@link DistributedConfigurationUpdater#clusterCfgPresentation}
-     * is complete.
-     *
-     * @param configurationToApply Cluster configuration that should be applied.
-     * @return Future that will be completed when cluster configuration is updated.
-     */
-    public CompletableFuture<Void> updateConfiguration(String configurationToApply) {
-        return clusterCfgPresentation.thenCompose(presentation -> presentation.update(configurationToApply))
-                .whenComplete((v, e) -> {
-                    if (e != null) {
-                        LOG.error("Unable to update cluster configuration", e);
-                    } else {
-                        LOG.info("Cluster configuration updated successfully");
-                    }
-                });
+    public DistributedConfigurationUpdater(ClusterManagementGroupManager cmgMgr, ConfigurationPresentation<String> presentation) {
+        this.cmgMgr = cmgMgr;
+        this.presentation = presentation;
     }
 
     @Override
     public void start() {
-
+        cmgMgr.clusterConfigurationToUpdate()
+                .thenCompose(action -> {

Review Comment:
   Why do you need to use `thenCompose` here? This future is not returned anywhere. `thenAccept` looks more suitable



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -388,44 +388,47 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().whenComplete((service, e) -> {
-            if (e != null) {
-                LOG.error("Error when joining to the raft service", e);
-                updateDistributedConfigurationActionFuture.completeExceptionally(e);
-            } else {
-                service.readClusterState()
-                        .thenAccept(state -> {
-                            String configuration = state.clusterConfigurationToApply();
-                            if (configuration != null) {
-                                updateDistributedConfigurationActionFuture.complete(
-                                        new UpdateDistributedConfigurationAction(
-                                                configuration,
-                                                () -> removeClusterConfigFromClusterState(service)
-                                        ));
-                            } else {
-                                updateDistributedConfigurationActionFuture.cancel(true);
-                            }
-                        });
-            }
-        });
+        raftServiceAfterJoin().thenCompose(service -> service.readClusterState()
+                .thenAccept(state -> {

Review Comment:
   `thenAccept` and `whenComplete` can be united into a single `whenComplete` here



##########
modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.UpdateDistributedConfigurationAction;
+import org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DistributedConfigurationUpdaterTest {
+
+    @Mock
+    public ConfigurationPresentation<String> presentation;
+
+    @Mock
+    public ClusterManagementGroupManager cmgMgr;
+
+    @Test
+    public void nextActionIsCompletedAfterUpdatingConfiguration() {
+
+        // Set up mocks.
+        when(presentation.update(anyString())).thenReturn(completedFuture(null));
+
+        CompletableFuture<Void> nextAction = new CompletableFuture<>();
+        String configuration = "security.authentication.enabled:true";
+        UpdateDistributedConfigurationAction updateDistributedConfigurationAction =
+                new UpdateDistributedConfigurationAction(
+                        configuration,
+                        () -> {
+                            nextAction.complete(null);
+                            return null;
+                        }
+                );
+
+        when(cmgMgr.clusterConfigurationToUpdate())
+                .thenReturn(completedFuture(updateDistributedConfigurationAction));
+
+        // Run updater.
+        DistributedConfigurationUpdater distributedConfigurationUpdater = new DistributedConfigurationUpdater(
+                cmgMgr,
+                presentation
+        );
+
+        distributedConfigurationUpdater.start();
+
+        // Verify that configuration was updated.
+        verify(presentation, times(1)).update(configuration);
+
+        // Verify that next action is completed.
+        nextAction.join();

Review Comment:
   This comment was not fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2066: IGNITE-19464 Move out logic of applying cluster configuration from ClusterManagementGroupManager

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2066:
URL: https://github.com/apache/ignite-3/pull/2066#discussion_r1191994722


##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -828,6 +818,10 @@ public CompletableFuture<Void> onJoinReady() {
         }
     }
 
+    public CompletableFuture<UpdateDistributedConfigurationAction> clusterConfigurationToUpdate() {
+        return updateDistributedConfigurationActionFuture;

Review Comment:
   We should complete this future on the node stop. What if somebody's waiting and we stop the node too early?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import java.util.concurrent.CompletableFuture;
+import org.jetbrains.annotations.Nullable;
+
+public class UpdateDistributedConfigurationAction implements Action<String, CompletableFuture<Void>> {
+
+    private final String cfgToUpdate;

Review Comment:
   I know this will sound picky, but I don't think that it's correct to name this thing "configuration to update", because "object to update" usually means that the object itself will be updated, not that it contains the update for some other object.
   Could it simple be "configurationString" or "configuration"? I'd be less confused in that case



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/Action.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+public interface Action<A1, A2> {

Review Comment:
   I would love to see a Javadoc. Looks like some primitive version of a state machine, I wonder what it's for.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2066: IGNITE-19464 Move out logic of applying cluster configuration from ClusterManagementGroupManager

Posted by "sashapolo (via GitHub)" <gi...@apache.org>.
sashapolo commented on code in PR #2066:
URL: https://github.com/apache/ignite-3/pull/2066#discussion_r1196143551


##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +388,44 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
-    }
-
-    private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService service) {
-        return service.readClusterState()
-                .thenCompose(state -> {
-                    if (state == null) {
-                        LOG.info("No CMG state found in the Raft service");
-                        return completedFuture(null);
-                    } else if (state.clusterConfigurationToApply() == null) {
-                        // Config was applied or wasn't provided
-                        LOG.info("No cluster configuration found in the Raft service");
-                        return completedFuture(null);
-                    } else {
-                        LOG.info("Cluster configuration is found in the Raft service, going to apply it");
-                        return distributedConfigurationUpdater.updateConfiguration(state.clusterConfigurationToApply())
-                                .thenCompose(unused -> removeClusterConfigFromClusterState(service));
-                    }
-                });
+        raftServiceAfterJoin().whenComplete((service, e) -> {
+            if (e != null) {
+                LOG.error("Error when joining to the raft service", e);

Review Comment:
   `joining to the raft service` ? What does that mean?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +388,44 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
-    }
-
-    private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService service) {
-        return service.readClusterState()
-                .thenCompose(state -> {
-                    if (state == null) {
-                        LOG.info("No CMG state found in the Raft service");
-                        return completedFuture(null);
-                    } else if (state.clusterConfigurationToApply() == null) {
-                        // Config was applied or wasn't provided
-                        LOG.info("No cluster configuration found in the Raft service");
-                        return completedFuture(null);
-                    } else {
-                        LOG.info("Cluster configuration is found in the Raft service, going to apply it");
-                        return distributedConfigurationUpdater.updateConfiguration(state.clusterConfigurationToApply())
-                                .thenCompose(unused -> removeClusterConfigFromClusterState(service));
-                    }
-                });
+        raftServiceAfterJoin().whenComplete((service, e) -> {
+            if (e != null) {
+                LOG.error("Error when joining to the raft service", e);
+                updateDistributedConfigurationActionFuture.completeExceptionally(e);
+            } else {
+                service.readClusterState()
+                        .thenAccept(state -> {

Review Comment:
   I'll copy my previous comment here: I think we need to use `whenComplete` here to complete `updateDistributedConfigurationActionFuture` with exceptions in case previous operations fail



##########
modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.UpdateDistributedConfigurationAction;
+import org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DistributedConfigurationUpdaterTest {
+
+    @Mock
+    public ConfigurationPresentation<String> presentation;
+
+    @Mock
+    public ClusterManagementGroupManager cmgMgr;
+
+    @Test
+    public void nextActionIsCompletedAfterUpdatingConfiguration() {
+
+        // Set up mocks.
+        when(presentation.update(anyString())).thenReturn(completedFuture(null));
+
+        CompletableFuture<Void> nextAction = new CompletableFuture<>();
+        String configuration = "security.authentication.enabled:true";
+        UpdateDistributedConfigurationAction updateDistributedConfigurationAction =
+                new UpdateDistributedConfigurationAction(
+                        configuration,
+                        () -> {
+                            nextAction.complete(null);
+                            return null;

Review Comment:
   Wouldn't this lead to an NPE? Why not return `nextAction`?



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java:
##########
@@ -31,37 +31,34 @@ public class DistributedConfigurationUpdater implements IgniteComponent {
 
     private static final IgniteLogger LOG = Loggers.forClass(DistributedConfigurationUpdater.class);
 
-    private final CompletableFuture<ConfigurationPresentation<String>> clusterCfgPresentation = new CompletableFuture<>();
+    private final ClusterManagementGroupManager cmgMgr;
 
-    public void setDistributedConfigurationPresentation(ConfigurationPresentation<String> presentation) {
-        clusterCfgPresentation.complete(presentation);
+    private final ConfigurationPresentation<String> presentation;
+
+    public DistributedConfigurationUpdater(ClusterManagementGroupManager cmgMgr, ConfigurationPresentation<String> presentation) {
+        this.cmgMgr = cmgMgr;
+        this.presentation = presentation;
     }
 
-    /**
-     * Applies changes to the cluster configuration when {@link DistributedConfigurationUpdater#clusterCfgPresentation}
-     * is complete.
-     *
-     * @param configurationToApply Cluster configuration that should be applied.
-     * @return Future that will be completed when cluster configuration is updated.
-     */
-    public CompletableFuture<Void> updateConfiguration(String configurationToApply) {
-        return clusterCfgPresentation.thenCompose(presentation -> presentation.update(configurationToApply))
+    @Override
+    public void start() {
+        cmgMgr.clusterConfigurationToUpdate()
+                .thenCompose(action -> {
+                    if (action.configuration() != null) {
+                        return presentation.update(action.configuration()).thenApply(ignored -> action);
+                    } else {
+                        return CompletableFuture.completedFuture(action);
+                    }
+                })
+                .thenCompose(action -> action.nextAction().get())

Review Comment:
   Why do we need to run the `nextAction` even if no configuration was provided in the previous step?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +388,44 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
-    }
-
-    private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService service) {
-        return service.readClusterState()
-                .thenCompose(state -> {
-                    if (state == null) {
-                        LOG.info("No CMG state found in the Raft service");
-                        return completedFuture(null);
-                    } else if (state.clusterConfigurationToApply() == null) {
-                        // Config was applied or wasn't provided
-                        LOG.info("No cluster configuration found in the Raft service");
-                        return completedFuture(null);
-                    } else {
-                        LOG.info("Cluster configuration is found in the Raft service, going to apply it");
-                        return distributedConfigurationUpdater.updateConfiguration(state.clusterConfigurationToApply())
-                                .thenCompose(unused -> removeClusterConfigFromClusterState(service));
-                    }
-                });
+        raftServiceAfterJoin().whenComplete((service, e) -> {
+            if (e != null) {
+                LOG.error("Error when joining to the raft service", e);
+                updateDistributedConfigurationActionFuture.completeExceptionally(e);
+            } else {
+                service.readClusterState()
+                        .thenAccept(state -> {
+                            String configuration = state.clusterConfigurationToApply();
+                            if (configuration != null) {
+                                updateDistributedConfigurationActionFuture.complete(
+                                        new UpdateDistributedConfigurationAction(
+                                                configuration,
+                                                () -> removeClusterConfigFromClusterState(service)
+                                        ));
+                            } else {
+                                updateDistributedConfigurationActionFuture.cancel(true);
+                            }
+                        });
+            }
+        });
     }
 
     private CompletableFuture<Void> removeClusterConfigFromClusterState(CmgRaftService service) {
         return service.readClusterState()
                 .thenCompose(state -> {

Review Comment:
   I think we should check that configuration is actually present in the cluster state (i.e. it is not null) and only then update it



##########
modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.UpdateDistributedConfigurationAction;
+import org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DistributedConfigurationUpdaterTest {
+
+    @Mock
+    public ConfigurationPresentation<String> presentation;
+
+    @Mock
+    public ClusterManagementGroupManager cmgMgr;
+
+    @Test
+    public void nextActionIsCompletedAfterUpdatingConfiguration() {
+
+        // Set up mocks.
+        when(presentation.update(anyString())).thenReturn(completedFuture(null));
+
+        CompletableFuture<Void> nextAction = new CompletableFuture<>();
+        String configuration = "security.authentication.enabled:true";
+        UpdateDistributedConfigurationAction updateDistributedConfigurationAction =
+                new UpdateDistributedConfigurationAction(
+                        configuration,
+                        () -> {
+                            nextAction.complete(null);
+                            return null;
+                        }
+                );
+
+        when(cmgMgr.clusterConfigurationToUpdate())
+                .thenReturn(completedFuture(updateDistributedConfigurationAction));
+
+        // Run updater.
+        DistributedConfigurationUpdater distributedConfigurationUpdater = new DistributedConfigurationUpdater(
+                cmgMgr,
+                presentation
+        );
+
+        distributedConfigurationUpdater.start();
+
+        // Verify that configuration was updated.
+        verify(presentation, times(1)).update(configuration);
+
+        // Verify that next action is completed.
+        nextAction.join();

Review Comment:
   please use `willCompleteSuccessfully()`



##########
modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.UpdateDistributedConfigurationAction;
+import org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DistributedConfigurationUpdaterTest {
+
+    @Mock
+    public ConfigurationPresentation<String> presentation;
+
+    @Mock
+    public ClusterManagementGroupManager cmgMgr;
+
+    @Test
+    public void nextActionIsCompletedAfterUpdatingConfiguration() {
+
+        // Set up mocks.
+        when(presentation.update(anyString())).thenReturn(completedFuture(null));
+
+        CompletableFuture<Void> nextAction = new CompletableFuture<>();
+        String configuration = "security.authentication.enabled:true";
+        UpdateDistributedConfigurationAction updateDistributedConfigurationAction =
+                new UpdateDistributedConfigurationAction(
+                        configuration,
+                        () -> {
+                            nextAction.complete(null);
+                            return null;
+                        }
+                );
+
+        when(cmgMgr.clusterConfigurationToUpdate())
+                .thenReturn(completedFuture(updateDistributedConfigurationAction));
+
+        // Run updater.
+        DistributedConfigurationUpdater distributedConfigurationUpdater = new DistributedConfigurationUpdater(
+                cmgMgr,
+                presentation
+        );
+
+        distributedConfigurationUpdater.start();
+
+        // Verify that configuration was updated.
+        verify(presentation, times(1)).update(configuration);
+
+        // Verify that next action is completed.
+        nextAction.join();
+        assertThat(nextAction.isDone(), is(true));
+    }
+
+    @Test
+    public void nextActionIsCompletedIfConfigurationNull() {
+
+        // Set up mocks.
+        CompletableFuture<Void> nextAction = new CompletableFuture<>();
+        UpdateDistributedConfigurationAction updateDistributedConfigurationAction =
+                new UpdateDistributedConfigurationAction(
+                        null,
+                        () -> {
+                            nextAction.complete(null);
+                            return null;
+                        }
+                );
+
+        when(cmgMgr.clusterConfigurationToUpdate())
+                .thenReturn(completedFuture(updateDistributedConfigurationAction));
+
+        // Run updater.
+        DistributedConfigurationUpdater distributedConfigurationUpdater = new DistributedConfigurationUpdater(
+                cmgMgr,
+                presentation
+        );
+
+        distributedConfigurationUpdater.start();
+
+        // Verify that configuration wasn't updated.
+        verify(presentation, never()).update(any());
+
+        // Verify that next action is completed.
+        nextAction.join();

Review Comment:
   please use `willCompleteSuccessfully()`



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -351,6 +355,45 @@ void testLeaderChangeDuringJoin(TestInfo testInfo) throws Exception {
         assertThat(node.clusterManager().onJoinReady(), willCompleteSuccessfully());
     }
 
+    @Test
+    void testClusterConfigurationIsRemovedFromClusterStateAfterUpdating(TestInfo testInfo) throws Exception {
+        // Start a cluster of 3 nodes so that the CMG leader node could be stopped later.
+        startCluster(3, testInfo);
+
+        String[] cmgNodes = clusterNodeNames();
+
+        // Start the CMG on all 3 nodes.
+        String clusterConfiguration = "security.authentication.enabled:true";
+        initCluster(cmgNodes, cmgNodes, clusterConfiguration);
+
+        // Find the CMG leader and stop it.
+        MockNode leaderNode = findLeaderNode(cluster).orElseThrow();
+
+        // Read cluster configuration from the cluster state and remove it.
+        UpdateDistributedConfigurationAction configurationAction = leaderNode.clusterManager()
+                .clusterConfigurationToUpdate()
+                .get();
+
+        assertThat(configurationAction.configuration(), is(clusterConfiguration));
+        configurationAction.nextAction().apply(CompletableFuture.completedFuture(null)).join();
+
+        // Stop the cluster leader.
+        stopNodes(List.of(leaderNode));

Review Comment:
   This comment has not been fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] PakhomovAlexander merged pull request #2066: IGNITE-19464 Move out logic of applying cluster configuration from ClusterManagementGroupManager

Posted by "PakhomovAlexander (via GitHub)" <gi...@apache.org>.
PakhomovAlexander merged PR #2066:
URL: https://github.com/apache/ignite-3/pull/2066


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] Flaugh24 commented on a diff in pull request #2066: IGNITE-19464 Move out logic of applying cluster configuration from ClusterManagementGroupManager

Posted by "Flaugh24 (via GitHub)" <gi...@apache.org>.
Flaugh24 commented on code in PR #2066:
URL: https://github.com/apache/ignite-3/pull/2066#discussion_r1195158045


##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -351,6 +355,45 @@ void testLeaderChangeDuringJoin(TestInfo testInfo) throws Exception {
         assertThat(node.clusterManager().onJoinReady(), willCompleteSuccessfully());
     }
 
+    @Test
+    void testClusterConfigurationIsRemovedFromClusterStateAfterUpdating(TestInfo testInfo) throws Exception {
+        // Start a cluster of 3 nodes so that the CMG leader node could be stopped later.
+        startCluster(3, testInfo);
+
+        String[] cmgNodes = clusterNodeNames();
+
+        // Start the CMG on all 3 nodes.
+        String clusterConfiguration = "security.authentication.enabled:true";
+        initCluster(cmgNodes, cmgNodes, clusterConfiguration);
+
+        // Find the CMG leader and stop it.
+        MockNode leaderNode = findLeaderNode(cluster).orElseThrow();
+
+        // Read cluster configuration from the cluster state and remove it.
+        UpdateDistributedConfigurationAction configurationAction = leaderNode.clusterManager()
+                .clusterConfigurationToUpdate()
+                .get();
+
+        assertThat(configurationAction.configuration(), is(clusterConfiguration));
+        configurationAction.nextAction().apply(CompletableFuture.completedFuture(null)).join();
+
+        // Stop the cluster leader.
+        stopNodes(List.of(leaderNode));

Review Comment:
   To verify the new leader doesn't have configuration to update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org