You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/06/08 14:43:31 UTC

[ignite-3] branch main updated: IGNITE-19685 Treat Scalecube's LEAVING event as 'node left' in the physical topology (#2168)

This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d6735c262d IGNITE-19685 Treat Scalecube's LEAVING event as 'node left' in the physical topology (#2168)
d6735c262d is described below

commit d6735c262d364615897f2579747f36d170523cab
Author: Roman Puchkovskiy <ro...@gmail.com>
AuthorDate: Thu Jun 8 18:43:26 2023 +0400

    IGNITE-19685 Treat Scalecube's LEAVING event as 'node left' in the physical topology (#2168)
---
 .../scalecube/ScaleCubeTopologyService.java        | 10 +++-
 .../scalecube/ScaleCubeTopologyServiceTest.java    | 69 ++++++++++++++++++++++
 .../java/org/apache/ignite/internal/Cluster.java   | 26 ++++++--
 .../internal/ClusterPerTestIntegrationTest.java    | 19 +++++-
 .../internal/compute/ItLogicalTopologyTest.java    | 40 ++++++-------
 .../org/apache/ignite/internal/ssl/ItSslTest.java  |  4 +-
 .../apache/ignite/internal/start/ItStartTest.java  |  2 +-
 7 files changed, 137 insertions(+), 33 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index 5b7c796fde..48a82b715f 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -86,12 +86,18 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
         } else if (event.isUpdated()) {
             members.put(member.address(), member);
             consistentIdToMemberMap.put(member.name(), member);
-        } else if (event.isRemoved()) {
+        } else if (event.isRemoved() || event.isLeaving()) {
+            // We treat LEAVING as 'node left' because the node will not be back and we don't want to wait for the suspicion timeout.
+
             members.compute(member.address(), (addr, node) -> {
                 // Ignore stale remove event.
                 if (node == null || node.id().equals(member.id())) {
+                    LOG.info("Node left [member={}, eventType={}]", member, event.type());
+
                     return null;
                 } else {
+                    LOG.info("Node left (noop as it has already reappeared) [member={}, eventType={}]", member, event.type());
+
                     return node;
                 }
             });
@@ -105,8 +111,6 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
                 }
             });
 
-            LOG.info("Node left [member={}]", member);
-
             fireDisappearedEvent(member);
         }
 
diff --git a/modules/network/src/test/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyServiceTest.java b/modules/network/src/test/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyServiceTest.java
new file mode 100644
index 0000000000..2b0c78e347
--- /dev/null
+++ b/modules/network/src/test/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyServiceTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+import io.scalecube.cluster.Member;
+import io.scalecube.cluster.membership.MembershipEvent;
+import io.scalecube.net.Address;
+import org.junit.jupiter.api.Test;
+
+class ScaleCubeTopologyServiceTest {
+    private final ScaleCubeTopologyService topologyService = new ScaleCubeTopologyService();
+
+    private final Member member1 = new Member("id1", "first", Address.create("host", 1001), "default");
+    private final Member member2 = new Member("id2", "second", Address.create("host", 1002), "default");
+
+    @Test
+    void addedEventAddsNodeToTopology() {
+        addTwoMembers();
+
+        assertThat(topologyService.allMembers(), hasSize(2));
+        assertThat(topologyService.getByConsistentId("first").name(), is("first"));
+        assertThat(topologyService.getByConsistentId("second").name(), is("second"));
+    }
+
+    @Test
+    void removedEventRemovesNodeFromTopology() {
+        addTwoMembers();
+
+        topologyService.onMembershipEvent(MembershipEvent.createRemoved(member2, null, 100));
+
+        assertThat(topologyService.allMembers(), hasSize(1));
+        assertThat(topologyService.getByConsistentId("second"), is(nullValue()));
+    }
+
+    private void addTwoMembers() {
+        topologyService.onMembershipEvent(MembershipEvent.createAdded(member1, null, 1));
+        topologyService.onMembershipEvent(MembershipEvent.createAdded(member2, null, 2));
+    }
+
+    @Test
+    void leavingEventRemovesNodeFromTopology() {
+        addTwoMembers();
+
+        topologyService.onMembershipEvent(MembershipEvent.createLeaving(member2, null, 100));
+
+        assertThat(topologyService.allMembers(), hasSize(1));
+        assertThat(topologyService.getByConsistentId("second"), is(nullValue()));
+    }
+}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
index 593213b584..7ae97e511a 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
@@ -135,12 +135,28 @@ public class Cluster {
      * @param initParametersConfigurator Configure {@link InitParameters} before initializing the cluster.
      */
     public void startAndInit(int nodeCount, Consumer<InitParametersBuilder> initParametersConfigurator) {
+        startAndInit(nodeCount, defaultNodeBootstrapConfigTemplate, initParametersConfigurator);
+    }
+
+    /**
+     * Starts the cluster with the given number of nodes and initializes it.
+     *
+     * @param nodeCount Number of nodes in the cluster.
+     * @param nodeBootstrapConfigTemplate Node bootstrap config template to be used for each node started
+     *     with this call.
+     * @param initParametersConfigurator Configure {@link InitParameters} before initializing the cluster.
+     */
+    public void startAndInit(
+            int nodeCount,
+            String nodeBootstrapConfigTemplate,
+            Consumer<InitParametersBuilder> initParametersConfigurator
+    ) {
         if (started) {
             throw new IllegalStateException("The cluster is already started");
         }
 
         List<CompletableFuture<IgniteImpl>> futures = IntStream.range(0, nodeCount)
-                .mapToObj(this::startClusterNode)
+                .mapToObj(nodeIndex -> startNodeAsync(nodeIndex, nodeBootstrapConfigTemplate))
                 .collect(toList());
 
         String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
@@ -167,8 +183,8 @@ public class Cluster {
      * @param nodeIndex Index of the node to start.
      * @return Future that will be completed when the node starts.
      */
-    public CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex) {
-        return startClusterNode(nodeIndex, defaultNodeBootstrapConfigTemplate);
+    public CompletableFuture<IgniteImpl> startNodeAsync(int nodeIndex) {
+        return startNodeAsync(nodeIndex, defaultNodeBootstrapConfigTemplate);
     }
 
     /**
@@ -178,7 +194,7 @@ public class Cluster {
      * @param nodeBootstrapConfigTemplate Bootstrap config template to use for this node.
      * @return Future that will be completed when the node starts.
      */
-    public CompletableFuture<IgniteImpl> startClusterNode(int nodeIndex, String nodeBootstrapConfigTemplate) {
+    public CompletableFuture<IgniteImpl> startNodeAsync(int nodeIndex, String nodeBootstrapConfigTemplate) {
         String nodeName = testNodeName(testInfo, nodeIndex);
 
         String config = IgniteStringFormatter.format(nodeBootstrapConfigTemplate, BASE_PORT + nodeIndex, CONNECT_NODE_ADDR);
@@ -250,7 +266,7 @@ public class Cluster {
         IgniteImpl newIgniteNode;
 
         try {
-            newIgniteNode = startClusterNode(index, nodeBootstrapConfigTemplate).get(20, TimeUnit.SECONDS);
+            newIgniteNode = startNodeAsync(index, nodeBootstrapConfigTemplate).get(20, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 059c26d582..1f0132e21c 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -57,7 +57,7 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes
             + "  }\n"
             + "}";
 
-    /** Template for node bootstrap config with Scalecube and Logical Topology settings for fast failure detection. */
+    /** Template for node bootstrap config with Scalecube settings for fast failure detection. */
     protected static final String FAST_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
             + "  network: {\n"
             + "    port: {},\n"
@@ -76,6 +76,19 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes
             + "  }\n"
             + "}";
 
+    /** Template for node bootstrap config with Scalecube settings for a disabled failure detection. */
+    protected static final String DISABLED_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
+            + "  network: {\n"
+            + "    port: {},\n"
+            + "    nodeFinder: {\n"
+            + "      netClusterNodes: [ {} ]\n"
+            + "    },\n"
+            + "    membership: {\n"
+            + "      failurePingInterval: 1000000000\n"
+            + "    }\n"
+            + "  }\n"
+            + "}";
+
     protected Cluster cluster;
 
     /** Work directory. */
@@ -94,7 +107,9 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes
 
         cluster = new Cluster(testInfo, workDir, getNodeBootstrapConfigTemplate());
 
-        cluster.startAndInit(initialNodes());
+        if (initialNodes() > 0) {
+            cluster.startAndInit(initialNodes());
+        }
     }
 
     /**
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
index e053ae4cf4..c430c8e587 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItLogicalTopologyTest.java
@@ -43,7 +43,6 @@ import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
-import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import org.apache.ignite.internal.tostring.S;
 import org.intellij.lang.annotations.Language;
@@ -98,7 +97,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
 
     @Override
     protected int initialNodes() {
-        return 1;
+        // We don't need any nodes to be started automatically.
+        return 0;
     }
 
     @Override
@@ -108,6 +108,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
 
     @Test
     void receivesLogicalTopologyEvents() throws Exception {
+        cluster.startAndInit(1);
+
         IgniteImpl entryNode = node(0);
 
         entryNode.logicalTopologyService().addEventListener(listener);
@@ -145,6 +147,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
 
     @Test
     void receivesLogicalTopologyEventsWithNodeAttributes() throws Exception {
+        cluster.startAndInit(1);
+
         IgniteImpl entryNode = node(0);
 
         entryNode.logicalTopologyService().addEventListener(listener);
@@ -185,6 +189,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
 
     @Test
     void receiveLogicalTopologyFromLeaderWithAttributes() throws Exception {
+        cluster.startAndInit(1);
+
         IgniteImpl entryNode = node(0);
 
         IgniteImpl secondIgnite = startNode(1, NODE_BOOTSTRAP_CFG_TEMPLATE_WITH_NODE_ATTRIBUTES);
@@ -204,6 +210,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
 
     @Test
     void receivesLogicalTopologyEventsCausedByNodeRestart() throws Exception {
+        cluster.startAndInit(1);
+
         IgniteImpl entryNode = node(0);
 
         Ignite secondIgnite = startNode(1);
@@ -237,6 +245,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
 
     @Test
     void nodeReturnedToPhysicalTopologyDoesNotReturnToLogicalTopology() throws Exception {
+        cluster.startAndInit(1);
+
         IgniteImpl entryNode = node(0);
 
         IgniteImpl secondIgnite = startNode(1);
@@ -279,6 +289,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
 
     @Test
     void nodeLeavesLogicalTopologyImmediatelyAfterBeingLostBySwim() throws Exception {
+        cluster.startAndInit(1);
+
         IgniteImpl entryNode = node(0);
 
         IgniteImpl secondNode = startNode(1);
@@ -297,6 +309,8 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
 
     @Test
     void nodeThatCouldNotJoinShouldBeInvalidated(TestInfo testInfo) throws Exception {
+        cluster.startAndInit(1);
+
         IgniteImpl entryNode = node(0);
 
         entryNode.logicalTopologyService().addEventListener(listener);
@@ -310,7 +324,7 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
             }
         });
 
-        cluster.startClusterNode(1);
+        cluster.startNodeAsync(1);
 
         try {
             Event event = events.poll(10, TimeUnit.SECONDS);
@@ -332,9 +346,9 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
 
     @Test
     void nodeLeavesLogicalTopologyImmediatelyOnGracefulStop() throws Exception {
-        IgniteImpl entryNode = node(0);
+        cluster.startAndInit(1, DISABLED_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE, ignored -> {});
 
-        disableNetworkFailureDetection(entryNode);
+        IgniteImpl entryNode = node(0);
 
         IgniteImpl secondIgnite = startNode(1);
 
@@ -346,26 +360,12 @@ class ItLogicalTopologyTest extends ClusterPerTestIntegrationTest {
 
         assertThat(events, is(empty()));
 
-        assertThat(leaveEvent, is(notNullValue()));
+        assertThat("Leave event not received in time", leaveEvent, is(notNullValue()));
 
         assertThat(leaveEvent.eventType, is(EventType.LEFT));
         assertThat(leaveEvent.node.name(), is(secondIgnite.name()));
     }
 
-    /**
-     * Configures the given Ignite instance in a way that makes it impossible to notice (by itself) that another
-     * node has gone and is not reachable on the network anymore.
-     *
-     * @param node Ignite node to disable network failure detection at.
-     * @throws Exception If something goes wrong.
-     */
-    private static void disableNetworkFailureDetection(IgniteImpl node) throws Exception {
-        node.nodeConfiguration().getConfiguration(NetworkConfiguration.KEY)
-                .membership()
-                .change(membershipChange -> membershipChange.changeFailurePingInterval(Integer.MAX_VALUE))
-                .get(10, TimeUnit.SECONDS);
-    }
-
     private static class Event {
         private final EventType eventType;
         private final LogicalNode node;
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
index 2410d4771a..0160271973 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
@@ -506,7 +506,7 @@ public class ItSslTest extends IgniteIntegrationTest {
         String sslEnabledWithCipher1BoostrapConfig = createBoostrapConfig("TLS_AES_256_GCM_SHA384");
         String sslEnabledWithCipher2BoostrapConfig = createBoostrapConfig("TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384");
 
-        CompletableFuture<IgniteImpl> node1 = cluster.startClusterNode(0, sslEnabledWithCipher1BoostrapConfig);
+        CompletableFuture<IgniteImpl> node1 = cluster.startNodeAsync(0, sslEnabledWithCipher1BoostrapConfig);
 
         String metaStorageAndCmgNodeName = testNodeName(testInfo, 0);
 
@@ -521,7 +521,7 @@ public class ItSslTest extends IgniteIntegrationTest {
         // First node will initialize the cluster with single node successfully since the second node can't connect to it.
         assertThat(node1, willCompleteSuccessfully());
 
-        CompletableFuture<IgniteImpl> node2 = cluster.startClusterNode(1, sslEnabledWithCipher2BoostrapConfig);
+        CompletableFuture<IgniteImpl> node2 = cluster.startNodeAsync(1, sslEnabledWithCipher2BoostrapConfig);
         assertThat(node2, willTimeoutIn(1, TimeUnit.SECONDS));
 
         cluster.shutdown();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/start/ItStartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/start/ItStartTest.java
index 8a9a9935f0..b186e1a478 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/start/ItStartTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/start/ItStartTest.java
@@ -128,7 +128,7 @@ class ItStartTest extends IgniteIntegrationTest {
 
         AtomicReference<String> threadNameRef = new AtomicReference<>();
 
-        CompletableFuture<IgniteImpl> future = cluster.startClusterNode(1).whenComplete((res, ex) -> {
+        CompletableFuture<IgniteImpl> future = cluster.startNodeAsync(1).whenComplete((res, ex) -> {
             threadNameRef.set(Thread.currentThread().getName());
         });