You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/06/06 08:33:12 UTC
[ignite-3] branch main updated: IGNITE-17100 Make JoinReady command idempotent (#853)
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0b683ddc7 IGNITE-17100 Make JoinReady command idempotent (#853)
0b683ddc7 is described below
commit 0b683ddc79a35b7c34c1a35d4d0cc8f6ee71d78d
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Mon Jun 6 11:33:07 2022 +0300
IGNITE-17100 Make JoinReady command idempotent (#853)
---
.../management/raft/ItCmgRaftServiceTest.java | 40 ++++++--
.../management/ClusterManagementGroupManager.java | 2 +-
.../management/raft/CmgRaftGroupListener.java | 6 ++
.../cluster/management/raft/CmgRaftService.java | 4 +
.../management/raft/RaftStorageManager.java | 17 +++-
.../cluster/management/raft/ValidationManager.java | 4 +-
.../management/raft/CmgRaftGroupListenerTest.java | 101 +++++++++++++++++++++
.../raft/ConcurrentMapClusterStateStorage.java | 4 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 12 ++-
.../ignite/internal/app/LifecycleManager.java | 2 +
10 files changed, 175 insertions(+), 17 deletions(-)
diff --git a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
index 92d6a89e7..534566ad2 100644
--- a/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
+++ b/modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/raft/ItCmgRaftServiceTest.java
@@ -37,6 +37,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.cluster.management.ClusterTag;
+import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
+import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.testframework.WorkDirectory;
@@ -91,6 +93,8 @@ public class ItCmgRaftServiceTest {
() -> new CmgRaftGroupListener(raftStorage)
);
+ assertThat(raftService, willCompleteSuccessfully());
+
this.raftService = new CmgRaftService(raftService.get(), clusterService);
}
@@ -361,7 +365,7 @@ public class ItCmgRaftServiceTest {
// Node has not passed validation.
String errMsg = String.format(
"JoinReady request denied, reason: Node \"%s\" has not yet passed the validation step",
- cluster.get(0).clusterService.topologyService().localMember().id()
+ cluster.get(0).clusterService.topologyService().localMember()
);
assertThrowsWithCause(
@@ -374,13 +378,6 @@ public class ItCmgRaftServiceTest {
// Everything is ok after the node has passed validation.
assertThat(raftService.completeJoinCluster(), willCompleteSuccessfully());
-
- // Validation state is cleared after the first successful attempt.
- assertThrowsWithCause(
- () -> raftService.completeJoinCluster().get(10, TimeUnit.SECONDS),
- IgniteInternalException.class,
- errMsg
- );
}
/**
@@ -471,4 +468,31 @@ public class ItCmgRaftServiceTest {
)
);
}
+
+ /**
+ * Tests that {@link JoinRequestCommand} and {@link JoinReadyCommand} are idempotent.
+ */
+ @Test
+ void testJoinCommandsIdempotence() {
+ ClusterState state = new ClusterState(
+ List.of("foo"),
+ List.of("bar"),
+ IgniteProductVersion.CURRENT_VERSION,
+ new ClusterTag("cluster")
+ );
+
+ assertThat(cluster.get(0).raftService.initClusterState(state), willCompleteSuccessfully());
+
+ CmgRaftService service = cluster.get(1).raftService;
+
+ assertThat(service.startJoinCluster(state.clusterTag()), willCompleteSuccessfully());
+
+ assertThat(service.startJoinCluster(state.clusterTag()), willCompleteSuccessfully());
+
+ assertThat(service.completeJoinCluster(), willCompleteSuccessfully());
+
+ assertThat(service.completeJoinCluster(), willCompleteSuccessfully());
+
+ assertThat(service.completeJoinCluster(), willCompleteSuccessfully());
+ }
}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
index d825f1d6d..c26653f8b 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java
@@ -256,7 +256,7 @@ public class ClusterManagementGroupManager implements IgniteComponent {
synchronized (raftServiceLock) {
if (raftService == null) {
// Raft service has not been started
- LOG.info("Init command received, starting the CMG: {}", msg);
+ LOG.info("Init command received, starting the CMG on: {}", msg.cmgNodes());
raftService = startCmgRaftService(msg.cmgNodes());
} else {
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
index 17e085e5a..5d138a1a8 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListener.java
@@ -43,6 +43,7 @@ import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* {@link RaftGroupListener} implementation for the CMG.
@@ -176,4 +177,9 @@ public class CmgRaftGroupListener implements RaftGroupListener {
public @Nullable CompletableFuture<Void> onBeforeApply(Command command) {
return null;
}
+
+ @TestOnly
+ public RaftStorageManager storage() {
+ return storage;
+ }
}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
index 338317a6f..255bdc06a 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java
@@ -129,6 +129,8 @@ public class CmgRaftService {
throw new JoinDeniedException("Join request denied, reason: " + ((ValidationErrorResponse) response).reason());
} else if (response != null) {
throw new IgniteInternalException("Unexpected response: " + response);
+ } else {
+ LOG.info("JoinRequest command executed successfully");
}
});
}
@@ -150,6 +152,8 @@ public class CmgRaftService {
+ ((ValidationErrorResponse) response).reason());
} else if (response != null) {
throw new IgniteInternalException("Unexpected response: " + response);
+ } else {
+ LOG.info("JoinReady command executed successfully");
}
});
}
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
index ef81d6c31..13520bd17 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RaftStorageManager.java
@@ -110,14 +110,21 @@ class RaftStorageManager {
storage.removeAll(keys);
}
+ /**
+ * Returns {@code true} if a given node is present in the logical topology or {@code false} otherwise.
+ */
+ boolean isNodeInLogicalTopology(ClusterNode node) {
+ byte[] value = storage.get(logicalTopologyKey(node));
+
+ return value != null;
+ }
+
private static byte[] logicalTopologyKey(ClusterNode node) {
return prefixedKey(LOGICAL_TOPOLOGY_PREFIX, node.id());
}
/**
- * Retrieves the validation token for a given node.
- *
- * @return Validation token or {@code null} if it does not exist.
+ * Returns {@code true} if a given node has been previously validated or {@code false} otherwise.
*/
boolean isNodeValidated(String nodeId) {
byte[] value = storage.get(validatedNodeKey(nodeId));
@@ -126,14 +133,14 @@ class RaftStorageManager {
}
/**
- * Saves the validation token for a given node.
+ * Marks the given node as validated.
*/
void putValidatedNode(String nodeId) {
storage.put(validatedNodeKey(nodeId), EMPTY_VALUE);
}
/**
- * Removes the validation token for a given node.
+ * Removes the given node from the validated node set.
*/
void removeValidatedNode(String nodeId) {
storage.remove(validatedNodeKey(nodeId));
diff --git a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java
index 7abb2c5a9..79dcfb0d7 100644
--- a/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java
+++ b/modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/ValidationManager.java
@@ -150,8 +150,8 @@ class ValidationManager implements AutoCloseable {
ValidationResult completeValidation(ClusterNode node) {
String nodeId = node.id();
- if (!storage.isNodeValidated(nodeId)) {
- return ValidationResult.errorResult(String.format("Node \"%s\" has not yet passed the validation step", nodeId));
+ if (!storage.isNodeValidated(nodeId) && !storage.isNodeInLogicalTopology(node)) {
+ return ValidationResult.errorResult(String.format("Node \"%s\" has not yet passed the validation step", node));
}
Future<?> cleanupFuture = cleanupFutures.remove(nodeId);
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
new file mode 100644
index 000000000..692abfe16
--- /dev/null
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftGroupListenerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.cluster.management.ClusterState;
+import org.apache.ignite.internal.cluster.management.ClusterTag;
+import org.apache.ignite.internal.cluster.management.raft.commands.InitCmgStateCommand;
+import org.apache.ignite.internal.cluster.management.raft.commands.JoinReadyCommand;
+import org.apache.ignite.internal.cluster.management.raft.commands.JoinRequestCommand;
+import org.apache.ignite.internal.properties.IgniteProductVersion;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for the {@link CmgRaftGroupListener}.
+ */
+public class CmgRaftGroupListenerTest {
+ private final ClusterStateStorage storage = new ConcurrentMapClusterStateStorage();
+
+ private final CmgRaftGroupListener listener = new CmgRaftGroupListener(storage);
+
+ @BeforeEach
+ void setUp() {
+ storage.start();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ storage.close();
+ }
+
+ /**
+ * Test that validated node IDs get added and removed from the storage.
+ */
+ @Test
+ void testValidatedNodeIds() {
+ var state = new ClusterState(
+ List.of("foo"),
+ List.of("bar"),
+ IgniteProductVersion.CURRENT_VERSION,
+ new ClusterTag("cluster")
+ );
+
+ var node = new ClusterNode("foo", "bar", new NetworkAddress("localhost", 666));
+
+ listener.onWrite(iterator(new InitCmgStateCommand(node, state)));
+
+ listener.onWrite(iterator(new JoinRequestCommand(node, state.igniteVersion(), state.clusterTag())));
+
+ assertThat(listener.storage().getValidatedNodeIds(), contains(node.id()));
+
+ listener.onWrite(iterator(new JoinReadyCommand(node)));
+
+ assertThat(listener.storage().getValidatedNodeIds(), is(empty()));
+ }
+
+ private static <T extends Command> Iterator<CommandClosure<T>> iterator(T obj) {
+ CommandClosure<T> closure = new CommandClosure<>() {
+ @Override
+ public T command() {
+ return obj;
+ }
+
+ @Override
+ public void result(@Nullable Serializable res) {
+ // no-op.
+ }
+ };
+
+ return List.of(closure).iterator();
+ }
+}
diff --git a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
index 185e9bbc4..ef89843ef 100644
--- a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
+++ b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
@@ -83,6 +83,10 @@ public class ConcurrentMapClusterStateStorage implements ClusterStateStorage {
.filter(e -> {
byte[] key = e.getKey().bytes();
+ if (key.length < prefix.length) {
+ return false;
+ }
+
return Arrays.equals(key, 0, prefix.length, prefix, 0, prefix.length);
})
.map(e -> entryTransformer.apply(e.getKey().bytes(), e.getValue()))
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 9fd9b5f69..354f22191 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -419,9 +419,13 @@ public class IgniteImpl implements Ignite {
cmgMgr
);
+ LOG.info("Components started, joining the cluster");
+
return cmgMgr.joinFuture()
// using the default executor to avoid blocking the CMG Manager threads
.thenRunAsync(() -> {
+ LOG.info("Join complete, starting the remaining components");
+
// Start all other components after the join request has completed and the node has been validated.
try {
lifecycleManager.startComponents(
@@ -441,6 +445,8 @@ public class IgniteImpl implements Ignite {
}
})
.thenCompose(v -> {
+ LOG.info("Components started, performing recovery");
+
// Recovery future must be created before configuration listeners are triggered.
CompletableFuture<Void> recoveryFuture = RecoveryCompletionFutureFactory.create(
clusterCfgMgr,
@@ -460,7 +466,11 @@ public class IgniteImpl implements Ignite {
});
})
// Signal that local recovery is complete and the node is ready to join the cluster.
- .thenCompose(v -> cmgMgr.onJoinReady())
+ .thenCompose(v -> {
+ LOG.info("Recovery complete, finishing join");
+
+ return cmgMgr.onJoinReady();
+ })
.thenRun(() -> {
try {
// Transfer the node to the STARTED state.
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java
index fe7563b99..a71d31dae 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/LifecycleManager.java
@@ -93,6 +93,8 @@ class LifecycleManager {
* @throws NodeStoppingException If node stopping intention was detected.
*/
void onStartComplete() throws NodeStoppingException {
+ LOG.info("Start complete, transferring to {} state", Status.STARTED);
+
Status currentStatus = status.compareAndExchange(Status.STARTING, Status.STARTED);
if (currentStatus == Status.STOPPING) {