You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/06/08 14:43:31 UTC
[ignite-3] branch main updated: IGNITE-19685 Treat Scalecube's LEAVING event as 'node left' in the physical topology (#2168)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d6735c262d IGNITE-19685 Treat Scalecube's LEAVING event as 'node left' in the physical topology (#2168)
d6735c262d is described below
commit d6735c262d364615897f2579747f36d170523cab
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Jun 8 18:43:26 2023 +0400
IGNITE-19685 Treat Scalecube's LEAVING event as 'node left' in the physical topology (#2168)
---
.../scalecube/ScaleCubeTopologyService.java | 10 +++-
.../scalecube/ScaleCubeTopologyServiceTest.java | 69 ++++++++++++++++++++++
.../java/org/apache/ignite/internal/Cluster.java | 26 ++++++--
.../internal/ClusterPerTestIntegrationTest.java | 19 +++++-
.../internal/compute/ItLogicalTopologyTest.java | 40 ++++++-------
.../org/apache/ignite/internal/ssl/ItSslTest.java | 4 +-
.../apache/ignite/internal/start/ItStartTest.java | 2 +-
7 files changed, 137 insertions(+), 33 deletions(-)
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index 5b7c796fde..48a82b715f 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -86,12 +86,18 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
} else if (event.isUpdated()) {
members.put(member.address(), member);
consistentIdToMemberMap.put(member.name(), member);
- } else if (event.isRemoved()) {
+ } else if (event.isRemoved() || event.isLeaving()) {
+ // We treat LEAVING as 'node left' because the node will not be back and we don't want to wait for the suspicion timeout.
+
members.compute(member.address(), (addr, node) -> {
// Ignore stale remove event.
if (node == null || node.id().equals(member.id())) {
+ LOG.info("Node left [member={}, eventType={}]", member, event.type());
+
return null;
} else {
+ LOG.info("Node left (noop as it has already reappeared) [member={}, eventType={}]", member, event.type());
+
return node;
}
});
@@ -105,8 +111,6 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
}
});
- LOG.info("Node left [member={}]", member);
-
fireDisappearedEvent(member);
}
diff --git a/modules/network/src/test/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyServiceTest.java b/modules/network/src/test/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyServiceTest.java
new file mode 100644
index 0000000000..2b0c78e347
--- /dev/null
+++ b/modules/network/src/test/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyServiceTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+import io.scalecube.cluster.Member;
+import io.scalecube.cluster.membership.MembershipEvent;
+import io.scalecube.net.Address;
+import org.junit.jupiter.api.Test;
+
+class ScaleCubeTopologyServiceTest {
+ private final ScaleCubeTopologyService topologyService = new ScaleCubeTopologyService();
+
+ private final Member member1 = new Member("id1", "first", Address.create("host", 1001), "default");
+ private final Member member2 = new Member("id2", "second", Address.create("host", 1002), "default");
+
+ @Test
+ void addedEventAddsNodeToTopology() {
+ addTwoMembers();
+
+ assertThat(topologyService.allMembers(), hasSize(2));
+ assertThat(topologyService.getByConsistentId("first").name(), is("first"));
+ assertThat(topologyService.getByConsistentId("second").name(), is("second"));
+ }
+
+ @Test
+ void removedEventRemovesNodeFromTopology() {
+ addTwoMembers();
+
+ topologyService.onMembershipEvent(MembershipEvent.createRemoved(member2, null, 100));
+
+ assertThat(topologyService.allMembers(), hasSize(1));
+ assertThat(topologyService.getByConsistentId("second"), is(nullValue()));
+ }
+
+ private void addTwoMembers() {
+ topologyService.onMembershipEvent(MembershipEvent.createAdded(member1, null, 1));
+ topologyService.onMembershipEvent(MembershipEvent.createAdded(member2, null, 2));
+ }
+
+ @Test
+ void leavingEventRemovesNodeFromTopology() {
+ addTwoMembers();
+
+ topologyService.onMembershipEvent(MembershipEvent.createLeaving(member2, null, 100));
+
+ assertThat(topologyService.allMembers(), hasSize(1));
+ assertThat(topologyService.getByConsistentId("second"), is(nullValue()));
+ }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
index 593213b584..7ae97e511a 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
@@ -135,12 +135,28 @@ public class Cluster {
* @param initParametersConfigurator Configure {@link InitParameters} before initializing the cluster.
*/
public void startAndInit(int nodeCount, Consumer<InitParametersBuilder> initParametersConfigurator) {
+ startAndInit(nodeCount, defaultNodeBootstrapConfigTemplate, initParametersConfigurator);
+ }
+
+ /**
+ * Starts the cluster with the given number of nodes and initializes it.
+ *
+ * @param nodeCount Number of nodes in the cluster.
+ * @param nodeBootstrapConfigTemplate Node bootstrap config template to be used for each node started
+ * with this call.
+ * @param initParametersConfigurator Configure {@link InitParameters} before initializing the cluster.
+ */
+ public void startAndInit(
+ int nodeCount,
+ String nodeBootstrapConfigTemplate,
+ Consumer<InitParametersBuilder> initParametersConfigurator
+ ) {
if (started) {
throw new IllegalStateException("The cluster is already started");
}
List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
- .mapToObj(this::startClusterNode)
+ .mapToObj(nodeIndex -> startNodeAsync(nodeIndex, nodeBootstrapConfigTemplate))
.collect(toList());
String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
@@ -167,8 +183,8 @@ public class Cluster {
* @param nodeIndex Index of the node to start.
* @return Future that will be completed when the node starts.
*/
- public CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
- return startClusterNode(nodeIndex, defaultNodeBootstrapConfigTemplate);
+ public CompletableFuture<IgniteImpl> startNodeAsync(int nodeIndex) {
+ return startNodeAsync(nodeIndex, defaultNodeBootstrapConfigTemplate);
}
/**
@@ -178,7 +194,7 @@ public class Cluster {
* @param nodeBootstrapConfigTemplate Bootstrap config template to use for this node.
* @return Future that will be completed when the node starts.
*/
- public CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex, String nodeBootstrapConfigTemplate) {
+ public CompletableFuture<IgniteImpl> startNodeAsync(int nodeIndex, String nodeBootstrapConfigTemplate) {
String nodeName = testNodeName(testInfo, nodeIndex);
String config = IgniteStringFormatter.format(nodeBootstrapConfigTemplate, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
@@ -250,7 +266,7 @@ public class Cluster {
IgniteImpl newIgniteNode;
try {
- newIgniteNode = startClusterNode(index, nodeBootstrapConfigTemplate).get(20, TimeUnit.SECONDS);
+ newIgniteNode = startNodeAsync(index, nodeBootstrapConfigTemplate).get(20, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 059c26d582..1f0132e21c 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -57,7 +57,7 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes
+ " }\n"
+ "}";
- /** Template for node bootstrap config with Scalecube and Logical Topology settings for fast failure detection. */
+ /** Template for node bootstrap config with Scalecube settings for fast failure detection. */
protected static final String FAST_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
+ " network: {\n"
+ " port: {},\n"
@@ -76,6 +76,19 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes
+ " }\n"
+ "}";
+ /** Template for node bootstrap config with Scalecube settings for a disabled failure detection. */
+ protected static final String DISABLED_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
+ + " network: {\n"
+ + " port: {},\n"
+ + " nodeFinder: {\n"
+ + " netClusterNodes: [ {} ]\n"
+ + " },\n"
+ + " membership: {\n"
+ + " failurePingInterval: 1000000000\n"
+ + " }\n"
+ + " }\n"
+ + "}";
+
protected Cluster cluster;
/** Work directory. */
@@ -94,7 +107,9 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes
cluster = new Cluster(testInfo, workDir, getNodeBootstrapConfigTemplate());
- cluster.startAndInit(initialNodes());
+ if (initialNodes() > 0) {
+ cluster.startAndInit(initialNodes());
+ }
}
/**
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
index e053ae4cf4..c430c8e587 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
@@ -43,7 +43,6 @@ import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.message.ScaleCubeMessage;
import org.apache.ignite.internal.tostring.S;
import org.intellij.lang.annotations.Language;
@@ -98,7 +97,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
@Override
protected int initialNodes() {
- return 1;
+ // We don't need any nodes to be started automatically.
+ return 0;
}
@Override
@@ -108,6 +108,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
@Test
void receivesLogicalTopologyEvents() throws Exception {
+ cluster.startAndInit(1);
+
IgniteImpl entryNode = node(0);
entryNode.logicalTopologyService().addEventListener(listener);
@@ -145,6 +147,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
@Test
void receivesLogicalTopologyEventsWithNodeAttributes() throws Exception {
+ cluster.startAndInit(1);
+
IgniteImpl entryNode = node(0);
entryNode.logicalTopologyService().addEventListener(listener);
@@ -185,6 +189,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
@Test
void receiveLogicalTopologyFromLeaderWithAttributes() throws Exception {
+ cluster.startAndInit(1);
+
IgniteImpl entryNode = node(0);
IgniteImpl secondIgnite = startNode(1, NODE_BOOTSTRAP_CFG_TEMPLATE_WITH_NODE_ATTRIBUTES);
@@ -204,6 +210,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
@Test
void receivesLogicalTopologyEventsCausedByNodeRestart() throws Exception {
+ cluster.startAndInit(1);
+
IgniteImpl entryNode = node(0);
Ignite secondIgnite = startNode(1);
@@ -237,6 +245,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
@Test
void nodeReturnedToPhysicalTopologyDoesNotReturnToLogicalTopology() throws Exception {
+ cluster.startAndInit(1);
+
IgniteImpl entryNode = node(0);
IgniteImpl secondIgnite = startNode(1);
@@ -279,6 +289,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
@Test
void nodeLeavesLogicalTopologyImmediatelyAfterBeingLostBySwim() throws Exception {
+ cluster.startAndInit(1);
+
IgniteImpl entryNode = node(0);
IgniteImpl secondNode = startNode(1);
@@ -297,6 +309,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
@Test
void nodeThatCouldNotJoinShouldBeInvalidated(TestInfo testInfo) throws Exception {
+ cluster.startAndInit(1);
+
IgniteImpl entryNode = node(0);
entryNode.logicalTopologyService().addEventListener(listener);
@@ -310,7 +324,7 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
}
});
- cluster.startClusterNode(1);
+ cluster.startNodeAsync(1);
try {
Event event = events.poll(10, TimeUnit.SECONDS);
@@ -332,9 +346,9 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
@Test
void nodeLeavesLogicalTopologyImmediatelyOnGracefulStop() throws Exception {
- IgniteImpl entryNode = node(0);
+ cluster.startAndInit(1, DISABLED_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE, ignored -> {});
- disableNetworkFailureDetection(entryNode);
+ IgniteImpl entryNode = node(0);
IgniteImpl secondIgnite = startNode(1);
@@ -346,26 +360,12 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
assertThat(events, is(empty()));
- assertThat(leaveEvent, is(notNullValue()));
+ assertThat("Leave event not received in time", leaveEvent, is(notNullValue()));
assertThat(leaveEvent.eventType, is(EventType.LEFT));
assertThat(leaveEvent.node.name(), is(secondIgnite.name()));
}
- /**
- * Configures the given Ignite instance in a way that makes it impossible to notice (by itself) that another
- * node has gone and is not reachable on the network anymore.
- *
- * @param node Ignite node to disable network failure detection at.
- * @throws Exception If something goes wrong.
- */
- private static void disableNetworkFailureDetection(IgniteImpl node) throws Exception {
- node.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY)
- .membership()
- .change(membershipChange -> membershipChange.changeFailurePingInterval(Integer.MAX_VALUE))
- .get(10, TimeUnit.SECONDS);
- }
-
private static class Event {
private final EventType eventType;
private final LogicalNode node;
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
index 2410d4771a..0160271973 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
@@ -506,7 +506,7 @@ public class ItSslTest extends IgniteIntegrationTest {
String sslEnabledWithCipher1BoostrapConfig = createBoostrapConfig("TLS_AES_256_GCM_SHA384");
String sslEnabledWithCipher2BoostrapConfig = createBoostrapConfig("TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384");
- CompletableFuture<IgniteImpl> node1 = cluster.startClusterNode(0, sslEnabledWithCipher1BoostrapConfig);
+ CompletableFuture<IgniteImpl> node1 = cluster.startNodeAsync(0, sslEnabledWithCipher1BoostrapConfig);
String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
@@ -521,7 +521,7 @@ public class ItSslTest extends IgniteIntegrationTest {
// First node will initialize the cluster with single node successfully since the second node can't connect to it.
assertThat(node1, willCompleteSuccessfully());
- CompletableFuture<IgniteImpl> node2 = cluster.startClusterNode(1, sslEnabledWithCipher2BoostrapConfig);
+ CompletableFuture<IgniteImpl> node2 = cluster.startNodeAsync(1, sslEnabledWithCipher2BoostrapConfig);
assertThat(node2, willTimeoutIn(1, TimeUnit.SECONDS));
cluster.shutdown();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/start/ItStartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/start/ItStartTest.java
index 8a9a9935f0..b186e1a478 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/start/ItStartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/start/ItStartTest.java
@@ -128,7 +128,7 @@ class ItStartTest extends IgniteIntegrationTest {
AtomicReference<String> threadNameRef = new AtomicReference<>();
- CompletableFuture<IgniteImpl> future = cluster.startClusterNode(1).whenComplete((res, ex) -> {
+ CompletableFuture<IgniteImpl> future = cluster.startNodeAsync(1).whenComplete((res, ex) -> {
threadNameRef.set(Thread.currentThread().getName());
});