You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/06/06 08:33:12 UTC

[ignite-3] branch main updated: IGNITE-17100 Make JoinReady command idempotent (#853)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b683ddc7 IGNITE-17100 Make JoinReady command idempotent (#853)
0b683ddc7 is described below

commit 0b683ddc79a35b7c34c1a35d4d0cc8f6ee71d78d
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Mon Jun 6 11:33:07 2022 +0300

    IGNITE-17100 Make JoinReady command idempotent (#853)
---
 .../management/raft/ItCmgRaftServiceTest.java      |  40 ++++++--
 .../management/ClusterManagementGroupManager.java  |   2 +-
 .../management/raft/CmgRaftGroupListener.java      |   6 ++
 .../cluster/management/raft/CmgRaftService.java    |   4 +
 .../management/raft/RaftStorageManager.java        |  17 +++-
 .../cluster/management/raft/ValidationManager.java |   4 +-
 .../management/raft/CmgRaftGroupListenerTest.java  | 101 +++++++++++++++++++++
 .../raft/ConcurrentMapClusterStateStorage.java     |   4 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |  12 ++-
 .../ignite/internal/app/LifecycleManager.java      |   2 +
 10 files changed, 175 insertions(+), 17 deletions(-)

diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 92d6a89e7..534566ad2 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -37,6 +37,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.cluster.management.ClusterState;
 import org.apache.ignite.internal.cluster.management.ClusterTag;
+import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
+import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
 import org.apache.ignite.internal.properties.IgniteProductVersion;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -91,6 +93,8 @@ public class ItCmgRaftServiceTest {
                     () -> new CmgRaftGroupListener(raftStorage)
             );
 
+            assertThat(raftService, willCompleteSuccessfully());
+
             this.raftService = new CmgRaftService(raftService.get(), clusterService);
         }
 
@@ -361,7 +365,7 @@ public class ItCmgRaftServiceTest {
         // Node has not passed validation.
         String errMsg = String.format(
                 "JoinReady request denied, reason: Node \"%s\" has not yet passed the validation step",
-                cluster.get(0).clusterService.topologyService().localMember().id()
+                cluster.get(0).clusterService.topologyService().localMember()
         );
 
         assertThrowsWithCause(
@@ -374,13 +378,6 @@ public class ItCmgRaftServiceTest {
 
         // Everything is ok after the node has passed validation.
         assertThat(raftService.completeJoinCluster(), willCompleteSuccessfully());
-
-        // Validation state is cleared after the first successful attempt.
-        assertThrowsWithCause(
-                () -> raftService.completeJoinCluster().get(10, TimeUnit.SECONDS),
-                IgniteInternalException.class,
-                errMsg
-        );
     }
 
     /**
@@ -471,4 +468,31 @@ public class ItCmgRaftServiceTest {
                 )
         );
     }
+
+    /**
+     * Tests that {@link JoinRequestCommand} and {@link JoinReadyCommand} are idempotent.
+     */
+    @Test
+    void testJoinCommandsIdempotence() {
+        ClusterState state = new ClusterState(
+                List.of("foo"),
+                List.of("bar"),
+                IgniteProductVersion.CURRENT_VERSION,
+                new ClusterTag("cluster")
+        );
+
+        assertThat(cluster.get(0).raftService.initClusterState(state), willCompleteSuccessfully());
+
+        CmgRaftService service = cluster.get(1).raftService;
+
+        assertThat(service.startJoinCluster(state.clusterTag()), willCompleteSuccessfully());
+
+        assertThat(service.startJoinCluster(state.clusterTag()), willCompleteSuccessfully());
+
+        assertThat(service.completeJoinCluster(), willCompleteSuccessfully());
+
+        assertThat(service.completeJoinCluster(), willCompleteSuccessfully());
+
+        assertThat(service.completeJoinCluster(), willCompleteSuccessfully());
+    }
 }
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index d825f1d6d..c26653f8b 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -256,7 +256,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
         synchronized (raftServiceLock) {
             if (raftService == null) {
                 // Raft service has not been started
-                LOG.info("Init command received, starting the CMG: {}", msg);
+                LOG.info("Init command received, starting the CMG on: {}", msg.cmgNodes());
 
                 raftService = startCmgRaftService(msg.cmgNodes());
             } else {
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index 17e085e5a..5d138a1a8 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -43,6 +43,7 @@ import org.apache.ignite.raft.client.WriteCommand;
 import org.apache.ignite.raft.client.service.CommandClosure;
 import org.apache.ignite.raft.client.service.RaftGroupListener;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * {@link RaftGroupListener} implementation for the CMG.
@@ -176,4 +177,9 @@ public class CmgRaftGroupListener implements RaftGroupListener {
     public @Nullable CompletableFuture<Void> onBeforeApply(Command command) {
         return null;
     }
+
+    @TestOnly
+    public RaftStorageManager storage() {
+        return storage;
+    }
 }
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
index 338317a6f..255bdc06a 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
@@ -129,6 +129,8 @@ public class CmgRaftService {
                         throw new JoinDeniedException("Join request denied, reason: " + ((ValidationErrorResponse) response).reason());
                     } else if (response != null) {
                         throw new IgniteInternalException("Unexpected response: " + response);
+                    }  else {
+                        LOG.info("JoinRequest command executed successfully");
                     }
                 });
     }
@@ -150,6 +152,8 @@ public class CmgRaftService {
                                 + ((ValidationErrorResponse) response).reason());
                     } else if (response != null) {
                         throw new IgniteInternalException("Unexpected response: " + response);
+                    } else {
+                        LOG.info("JoinReady command executed successfully");
                     }
                 });
     }
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
index ef81d6c31..13520bd17 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
@@ -110,14 +110,21 @@ class RaftStorageManager {
         storage.removeAll(keys);
     }
 
+    /**
+     * Returns {@code true} if a given node is present in the logical topology or {@code false} otherwise.
+     */
+    boolean isNodeInLogicalTopology(ClusterNode node) {
+        byte[] value = storage.get(logicalTopologyKey(node));
+
+        return value != null;
+    }
+
     private static byte[] logicalTopologyKey(ClusterNode node) {
         return prefixedKey(LOGICAL_TOPOLOGY_PREFIX, node.id());
     }
 
     /**
-     * Retrieves the validation token for a given node.
-     *
-     * @return Validation token or {@code null} if it does not exist.
+     * Returns {@code true} if a given node has been previously validated or {@code false} otherwise.
      */
     boolean isNodeValidated(String nodeId) {
         byte[] value = storage.get(validatedNodeKey(nodeId));
@@ -126,14 +133,14 @@ class RaftStorageManager {
     }
 
     /**
-     * Saves the validation token for a given node.
+     * Marks the given node as validated.
      */
     void putValidatedNode(String nodeId) {
         storage.put(validatedNodeKey(nodeId), EMPTY_VALUE);
     }
 
     /**
-     * Removes the validation token for a given node.
+     * Removes the given node from the validated node set.
      */
     void removeValidatedNode(String nodeId) {
         storage.remove(validatedNodeKey(nodeId));
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java
index 7abb2c5a9..79dcfb0d7 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java
@@ -150,8 +150,8 @@ class ValidationManager implements AutoCloseable {
     ValidationResult completeValidation(ClusterNode node) {
         String nodeId = node.id();
 
-        if (!storage.isNodeValidated(nodeId)) {
-            return ValidationResult.errorResult(String.format("Node \"%s\" has not yet passed the validation step", nodeId));
+        if (!storage.isNodeValidated(nodeId) && !storage.isNodeInLogicalTopology(node)) {
+            return ValidationResult.errorResult(String.format("Node \"%s\" has not yet passed the validation step", node));
         }
 
         Future<?> cleanupFuture = cleanupFutures.remove(nodeId);
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
new file mode 100644
index 000000000..692abfe16
--- /dev/null
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.apache.ignite.internal.cluster.management.ClusterTag;
+import org.apache.ignite.internal.cluster.management.raft.commands.InitCmgStateCommand;
+import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
+import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
+import org.apache.ignite.internal.properties.IgniteProductVersion;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for the {@link CmgRaftGroupListener}.
+ */
+public class CmgRaftGroupListenerTest {
+    private final ClusterStateStorage storage = new ConcurrentMapClusterStateStorage();
+
+    private final CmgRaftGroupListener listener = new CmgRaftGroupListener(storage);
+
+    @BeforeEach
+    void setUp() {
+        storage.start();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        storage.close();
+    }
+
+    /**
+     * Test that validated node IDs get added and removed from the storage.
+     */
+    @Test
+    void testValidatedNodeIds() {
+        var state = new ClusterState(
+                List.of("foo"),
+                List.of("bar"),
+                IgniteProductVersion.CURRENT_VERSION,
+                new ClusterTag("cluster")
+        );
+
+        var node = new ClusterNode("foo", "bar", new NetworkAddress("localhost", 666));
+
+        listener.onWrite(iterator(new InitCmgStateCommand(node, state)));
+
+        listener.onWrite(iterator(new JoinRequestCommand(node, state.igniteVersion(), state.clusterTag())));
+
+        assertThat(listener.storage().getValidatedNodeIds(), contains(node.id()));
+
+        listener.onWrite(iterator(new JoinReadyCommand(node)));
+
+        assertThat(listener.storage().getValidatedNodeIds(), is(empty()));
+    }
+
+    private static <T extends Command> Iterator<CommandClosure<T>> iterator(T obj) {
+        CommandClosure<T> closure = new CommandClosure<>() {
+            @Override
+            public T command() {
+                return obj;
+            }
+
+            @Override
+            public void result(@Nullable Serializable res) {
+                // no-op.
+            }
+        };
+
+        return List.of(closure).iterator();
+    }
+}
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
index 185e9bbc4..ef89843ef 100644
--- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
@@ -83,6 +83,10 @@ public class ConcurrentMapClusterStateStorage implements ClusterStateStorage {
                 .filter(e -> {
                     byte[] key = e.getKey().bytes();
 
+                    if (key.length < prefix.length) {
+                        return false;
+                    }
+
                     return Arrays.equals(key, 0, prefix.length, prefix, 0, prefix.length);
                 })
                 .map(e -> entryTransformer.apply(e.getKey().bytes(), e.getValue()))
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 9fd9b5f69..354f22191 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -419,9 +419,13 @@ public class IgniteImpl implements Ignite {
                     cmgMgr
             );
 
+            LOG.info("Components started, joining the cluster");
+
             return cmgMgr.joinFuture()
                     // using the default executor to avoid blocking the CMG Manager threads
                     .thenRunAsync(() -> {
+                        LOG.info("Join complete, starting the remaining components");
+
                         // Start all other components after the join request has completed and the node has been validated.
                         try {
                             lifecycleManager.startComponents(
@@ -441,6 +445,8 @@ public class IgniteImpl implements Ignite {
                         }
                     })
                     .thenCompose(v -> {
+                        LOG.info("Components started, performing recovery");
+
                         // Recovery future must be created before configuration listeners are triggered.
                         CompletableFuture<Void> recoveryFuture = RecoveryCompletionFutureFactory.create(
                                 clusterCfgMgr,
@@ -460,7 +466,11 @@ public class IgniteImpl implements Ignite {
                                 });
                     })
                     // Signal that local recovery is complete and the node is ready to join the cluster.
-                    .thenCompose(v -> cmgMgr.onJoinReady())
+                    .thenCompose(v -> {
+                        LOG.info("Recovery complete, finishing join");
+
+                        return cmgMgr.onJoinReady();
+                    })
                     .thenRun(() -> {
                         try {
                             // Transfer the node to the STARTED state.
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java
index fe7563b99..a71d31dae 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java
@@ -93,6 +93,8 @@ class LifecycleManager {
      * @throws NodeStoppingException If node stopping intention was detected.
      */
     void onStartComplete() throws NodeStoppingException {
+        LOG.info("Start complete, transferring to {} state", Status.STARTED);
+
         Status currentStatus = status.compareAndExchange(Status.STARTING, Status.STARTED);
 
         if (currentStatus == Status.STOPPING) {