You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "sashapolo (via GitHub)" <gi...@apache.org> on 2023/05/14 10:12:19 UTC

[GitHub] [ignite-3] sashapolo commented on a diff in pull request #2066: IGNITE-19464 Move out logic of applying cluster configuration from ClusterManagementGroupManager

sashapolo commented on code in PR #2066:
URL: https://github.com/apache/ignite-3/pull/2066#discussion_r1193112863


##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +387,41 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
+        raftServiceAfterJoin().thenCompose(service ->
+                service.readClusterState()
+                        .thenAccept(state -> {

Review Comment:
   I think we need to use `whenComplete` here to complete `updateDistributedConfigurationActionFuture` with exceptions in case previous operations fail



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +387,41 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
+        raftServiceAfterJoin().thenCompose(service ->
+                service.readClusterState()
+                        .thenAccept(state -> {
+                            updateDistributedConfigurationActionFuture.complete(
+                                    new UpdateDistributedConfigurationAction(
+                                            state.clusterConfigurationToApply(),
+                                            (result) -> removeClusterConfigFromClusterState(result, service))
+                            );
+                        }));
     }
 
-    private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService service) {
-        return service.readClusterState()
-                .thenCompose(state -> {
-                    if (state == null) {
-                        LOG.info("No CMG state found in the Raft service");
-                        return completedFuture(null);
-                    } else if (state.clusterConfigurationToApply() == null) {
-                        // Config was applied or wasn't provided
-                        LOG.info("No cluster configuration found in the Raft service");
-                        return completedFuture(null);
+    private CompletableFuture<Void> removeClusterConfigFromClusterState(
+            CompletableFuture<Void> configurationAppliedFuture,
+            CmgRaftService service
+    ) {
+        return configurationAppliedFuture.thenCombine(
+                        service.readClusterState(),
+                        (ignored, state) -> {
+                            Collection<String> cmgNodes = state.cmgNodes();

Review Comment:
   These local variables look redundant, we can inline them



##########
modules/runner/src/test/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdaterTest.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import org.apache.ignite.internal.cluster.management.UpdateDistributedConfigurationAction;
+import org.apache.ignite.internal.configuration.presentation.ConfigurationPresentation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class DistributedConfigurationUpdaterTest {
+
+    @Mock
+    public ConfigurationPresentation<String> presentation;
+
+    @Mock
+    public ClusterManagementGroupManager cmgMgr;
+
+    @Test
+    public void nextActionIsCompletedAfterUpdatingConfiguration() {
+
+        // Set up mocks.
+        when(presentation.update(anyString())).thenReturn(CompletableFuture.completedFuture(null));
+
+        CompletableFuture<Void> nextAction = new CompletableFuture<>();
+        String configuration = "security.authentication.enabled:true";
+        UpdateDistributedConfigurationAction updateDistributedConfigurationAction =
+                new UpdateDistributedConfigurationAction(
+                        configuration,
+                        (result) -> result.whenComplete((v, e) -> nextAction.complete(null)));
+
+        when(cmgMgr.clusterConfigurationToUpdate())
+                .thenReturn(CompletableFuture.completedFuture(updateDistributedConfigurationAction));
+
+        // Run updater.
+        DistributedConfigurationUpdater distributedConfigurationUpdater = new DistributedConfigurationUpdater(

Review Comment:
   `distributedConfigurationUpdater` can be extracted into a field 



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/UpdateDistributedConfigurationAction.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Composite action to update the distributed configuration.
+ */
+public class UpdateDistributedConfigurationAction {
+
+    /**
+     * Configuration that should be applied.
+     */
+    private final String configuration;
+
+    private final Function<CompletableFuture<Void>, CompletableFuture<Void>> nextAction;
+
+
+    /**
+     * Constructor.
+     *
+     * @param configuration Configuration that should be applied.
+     * @param nextAction The next action to be performed.
+     */
+    public UpdateDistributedConfigurationAction(
+            @Nullable String configuration,
+            Function<CompletableFuture<Void>, CompletableFuture<Void>> nextAction

Review Comment:
   Why do we need the first `CompletableFuture` parameter here? I can see that it is only used in `DistributedConfigurationUpdater` in order to pass an exception here. Why do we need that?



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +387,41 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
+        raftServiceAfterJoin().thenCompose(service ->
+                service.readClusterState()
+                        .thenAccept(state -> {
+                            updateDistributedConfigurationActionFuture.complete(
+                                    new UpdateDistributedConfigurationAction(
+                                            state.clusterConfigurationToApply(),
+                                            (result) -> removeClusterConfigFromClusterState(result, service))
+                            );
+                        }));
     }
 
-    private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService service) {
-        return service.readClusterState()
-                .thenCompose(state -> {
-                    if (state == null) {
-                        LOG.info("No CMG state found in the Raft service");
-                        return completedFuture(null);
-                    } else if (state.clusterConfigurationToApply() == null) {
-                        // Config was applied or wasn't provided
-                        LOG.info("No cluster configuration found in the Raft service");
-                        return completedFuture(null);
+    private CompletableFuture<Void> removeClusterConfigFromClusterState(
+            CompletableFuture<Void> configurationAppliedFuture,
+            CmgRaftService service
+    ) {
+        return configurationAppliedFuture.thenCombine(

Review Comment:
   This links to my previous question: I think `configurationAppliedFuture` is redundant. Moreover, in case of an exception we will still read the cluster state for some reason



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java:
##########
@@ -31,37 +31,38 @@ public class DistributedConfigurationUpdater implements IgniteComponent {
 
     private static final IgniteLogger LOG = Loggers.forClass(DistributedConfigurationUpdater.class);
 
-    private final CompletableFuture<ConfigurationPresentation<String>> clusterCfgPresentation = new CompletableFuture<>();
+    private final ClusterManagementGroupManager cmgMgr;
 
-    public void setDistributedConfigurationPresentation(ConfigurationPresentation<String> presentation) {
-        clusterCfgPresentation.complete(presentation);
+    private final ConfigurationPresentation<String> presentation;
+
+    public DistributedConfigurationUpdater(ClusterManagementGroupManager cmgMgr, ConfigurationPresentation<String> presentation) {
+        this.cmgMgr = cmgMgr;
+        this.presentation = presentation;
     }
 
-    /**
-     * Applies changes to the cluster configuration when {@link DistributedConfigurationUpdater#clusterCfgPresentation}
-     * is complete.
-     *
-     * @param configurationToApply Cluster configuration that should be applied.
-     * @return Future that will be completed when cluster configuration is updated.
-     */
-    public CompletableFuture<Void> updateConfiguration(String configurationToApply) {
-        return clusterCfgPresentation.thenCompose(presentation -> presentation.update(configurationToApply))
-                .whenComplete((v, e) -> {
+    @Override
+    public void start() {
+        cmgMgr.clusterConfigurationToUpdate()

Review Comment:
   How is this going to work? This future is only completed on the CMG leader, what will happen on all other nodes? Will this future be stuck forever? I think we need to have a mechanism to cancel it on all nodes apart from the CMG leader



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -389,45 +387,41 @@ private void onElectedAsLeader(long term) {
                     }
                 });
 
-        raftServiceAfterJoin().thenCompose(this::pushClusterConfigToCluster);
+        raftServiceAfterJoin().thenCompose(service ->
+                service.readClusterState()
+                        .thenAccept(state -> {
+                            updateDistributedConfigurationActionFuture.complete(
+                                    new UpdateDistributedConfigurationAction(
+                                            state.clusterConfigurationToApply(),
+                                            (result) -> removeClusterConfigFromClusterState(result, service))
+                            );
+                        }));
     }
 
-    private CompletableFuture<Void> pushClusterConfigToCluster(CmgRaftService service) {
-        return service.readClusterState()
-                .thenCompose(state -> {
-                    if (state == null) {
-                        LOG.info("No CMG state found in the Raft service");
-                        return completedFuture(null);
-                    } else if (state.clusterConfigurationToApply() == null) {
-                        // Config was applied or wasn't provided
-                        LOG.info("No cluster configuration found in the Raft service");
-                        return completedFuture(null);
+    private CompletableFuture<Void> removeClusterConfigFromClusterState(
+            CompletableFuture<Void> configurationAppliedFuture,
+            CmgRaftService service
+    ) {
+        return configurationAppliedFuture.thenCombine(
+                        service.readClusterState(),
+                        (ignored, state) -> {
+                            Collection<String> cmgNodes = state.cmgNodes();
+                            Collection<String> msNodes = state.metaStorageNodes();
+                            IgniteProductVersion igniteVersion = state.igniteVersion();
+                            ClusterTag clusterTag = state.clusterTag();
+                            return msgFactory.clusterState()
+                                    .cmgNodes(Set.copyOf(cmgNodes))
+                                    .metaStorageNodes(Set.copyOf(msNodes))
+                                    .version(igniteVersion.toString())
+                                    .clusterTag(clusterTag)
+                                    .build();
+                        })
+                .thenCompose(service::updateClusterState)
+                .whenComplete((v2, e2) -> {

Review Comment:
   `v2` and `e2`?



##########
modules/runner/src/main/java/org/apache/ignite/internal/configuration/DistributedConfigurationUpdater.java:
##########
@@ -31,37 +31,38 @@ public class DistributedConfigurationUpdater implements IgniteComponent {
 
     private static final IgniteLogger LOG = Loggers.forClass(DistributedConfigurationUpdater.class);
 
-    private final CompletableFuture<ConfigurationPresentation<String>> clusterCfgPresentation = new CompletableFuture<>();
+    private final ClusterManagementGroupManager cmgMgr;
 
-    public void setDistributedConfigurationPresentation(ConfigurationPresentation<String> presentation) {
-        clusterCfgPresentation.complete(presentation);
+    private final ConfigurationPresentation<String> presentation;
+
+    public DistributedConfigurationUpdater(ClusterManagementGroupManager cmgMgr, ConfigurationPresentation<String> presentation) {
+        this.cmgMgr = cmgMgr;
+        this.presentation = presentation;
     }
 
-    /**
-     * Applies changes to the cluster configuration when {@link DistributedConfigurationUpdater#clusterCfgPresentation}
-     * is complete.
-     *
-     * @param configurationToApply Cluster configuration that should be applied.
-     * @return Future that will be completed when cluster configuration is updated.
-     */
-    public CompletableFuture<Void> updateConfiguration(String configurationToApply) {
-        return clusterCfgPresentation.thenCompose(presentation -> presentation.update(configurationToApply))
-                .whenComplete((v, e) -> {
+    @Override
+    public void start() {
+        cmgMgr.clusterConfigurationToUpdate()
+                .thenApply(action -> {
+                    if (action.configuration() != null) {
+                        presentation.update(action.configuration());

Review Comment:
   `presentation.update` returns a future, which we ignore here, that's a bug



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -351,6 +355,45 @@ void testLeaderChangeDuringJoin(TestInfo testInfo) throws Exception {
         assertThat(node.clusterManager().onJoinReady(), willCompleteSuccessfully());
     }
 
+    @Test
+    void testClusterConfigurationIsRemovedFromClusterStateAfterUpdating(TestInfo testInfo) throws Exception {
+        // Start a cluster of 3 nodes so that the CMG leader node could be stopped later.
+        startCluster(3, testInfo);
+
+        String[] cmgNodes = clusterNodeNames();
+
+        // Start the CMG on all 3 nodes.
+        String clusterConfiguration = "security.authentication.enabled:true";
+        initCluster(cmgNodes, cmgNodes, clusterConfiguration);
+
+        // Find the CMG leader and stop it.
+        MockNode leaderNode = findLeaderNode(cluster).orElseThrow();
+
+        // Read cluster configuration from the cluster state and remove it.
+        UpdateDistributedConfigurationAction configurationAction = leaderNode.clusterManager()
+                .clusterConfigurationToUpdate()
+                .get();
+
+        assertThat(configurationAction.configuration(), is(clusterConfiguration));
+        configurationAction.nextAction().apply(CompletableFuture.completedFuture(null)).join();
+
+        // Stop the cluster leader.
+        stopNodes(List.of(leaderNode));

Review Comment:
   What scenario are you testing here? Why do we need to stop the leader?



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -440,11 +495,17 @@ private void waitForLogicalTopology() throws InterruptedException {
         }, 10000));
     }
 
+
     private void initCluster(String[] metaStorageNodes, String[] cmgNodes) throws NodeStoppingException {
+        initCluster(metaStorageNodes, cmgNodes, null);
+    }
+
+    private void initCluster(String[] metaStorageNodes, String[] cmgNodes, String clusterConfiguration) throws NodeStoppingException {

Review Comment:
   `clusterConfiguration` should be annotated as `@Nullable`



##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -414,6 +457,18 @@ void nonCmgNodeAddedLaterGetsLogicalTopologyChanges(TestInfo testInfo) throws Ex
         assertTrue(waitForCondition(() -> nonCmgTopology.getLogicalTopology().nodes().size() == 2, 10_000));
     }
 
+    private Optional<MockNode> findLeaderNode(List<MockNode> cluster) {

Review Comment:
   Since you have extracted a method, please use it in other places in this class as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org