You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/04/19 14:46:15 UTC

[ignite-3] branch main updated: IGNITE-14536 Node disappeared event propagation in case of node forceful shutdown (#94)

This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d81277f  IGNITE-14536 Node disappeared event propagation in case of node forceful shutdown (#94)
d81277f is described below

commit d81277f18f2ad4fa5260a4c21b497397a4ff5368
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Mon Apr 19 17:43:12 2021 +0300

    IGNITE-14536 Node disappeared event propagation in case of node forceful shutdown (#94)
    
    Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
 .../scalecube/ITScaleCubeNetworkMessagingTest.java | 200 ++++++++++++++++-----
 .../scalecube/ScaleCubeClusterServiceFactory.java  |   3 +-
 .../scalecube/ScaleCubeTopologyService.java        |  61 +++++--
 3 files changed, 200 insertions(+), 64 deletions(-)

diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index 367e78b..c241dcf 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -16,7 +16,10 @@
  */
 package org.apache.ignite.network.scalecube;
 
-import java.util.ArrayList;
+import io.scalecube.cluster.ClusterImpl;
+import io.scalecube.cluster.transport.api.Transport;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -24,17 +27,19 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.ClusterServiceFactory;
-import org.apache.ignite.network.NetworkMessageHandler;
 import org.apache.ignite.network.TestMessage;
 import org.apache.ignite.network.TopologyEventHandler;
 import org.apache.ignite.network.message.MessageSerializationRegistry;
 import org.apache.ignite.network.message.NetworkMessage;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -43,22 +48,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** */
 class ITScaleCubeNetworkMessagingTest {
-    /** */
-    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
-
-    /** */
-    private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
-
-    /** */
-    private final Map<String, NetworkMessage> messageStorage = new ConcurrentHashMap<>();
-
-    /** */
-    private final List<ClusterService> startedMembers = new ArrayList<>();
+    private Cluster testCluster;
 
     /** */
     @AfterEach
     public void afterEach() {
-        startedMembers.forEach(ClusterService::shutdown);
+        testCluster.shutdown();
     }
 
     /**
@@ -66,26 +61,27 @@ class ITScaleCubeNetworkMessagingTest {
      */
     @Test
     public void messageWasSentToAllMembersSuccessfully() throws Exception {
-        //Given: Three started member which are gathered to cluster.
-        List<String> addresses = List.of("localhost:3344", "localhost:3345", "localhost:3346");
+        Map<String, NetworkMessage> messageStorage = new ConcurrentHashMap<>();
 
         CountDownLatch messageReceivedLatch = new CountDownLatch(3);
 
-        String aliceName = "Alice";
-
-        ClusterService alice = startNetwork(aliceName, 3344, addresses);
-        ClusterService bob = startNetwork("Bob", 3345, addresses);
-        ClusterService carol = startNetwork("Carol", 3346, addresses);
+        testCluster = new Cluster(3);
 
-        NetworkMessageHandler messageWaiter = (message, sender, correlationId) -> messageReceivedLatch.countDown();
+        for (ClusterService member : testCluster.members) {
+            member.messagingService().addMessageHandler(
+                (message, sender, correlationId) -> {
+                    messageStorage.put(member.localConfiguration().getName(), message);
+                    messageReceivedLatch.countDown();
+                }
+            );
+        }
 
-        alice.messagingService().addMessageHandler(messageWaiter);
-        bob.messagingService().addMessageHandler(messageWaiter);
-        carol.messagingService().addMessageHandler(messageWaiter);
+        testCluster.startAwait();
 
         TestMessage testMessage = new TestMessage("Message from Alice", Collections.emptyMap());
 
-        //When: Send one message to all members in cluster.
+        ClusterService alice = testCluster.members.get(0);
+
         for (ClusterNode member : alice.topologyService().allMembers()) {
             alice.messagingService().weakSend(member, testMessage);
         }
@@ -93,10 +89,42 @@ class ITScaleCubeNetworkMessagingTest {
         boolean messagesReceived = messageReceivedLatch.await(3, TimeUnit.SECONDS);
         assertTrue(messagesReceived);
 
-        //Then: All members successfully received message.
-        assertThat(getLastMessage(alice), is(testMessage));
-        assertThat(getLastMessage(bob), is(testMessage));
-        assertThat(getLastMessage(carol), is(testMessage));
+        testCluster.members.stream()
+            .map(member -> member.localConfiguration().getName())
+            .map(messageStorage::get)
+            .forEach(msg -> assertThat(msg, is(testMessage)));
+    }
+
+    /**
+     * Test graceful shutdown.
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testShutdown() throws Exception {
+        testShutdown0(false);
+    }
+
+    /**
+     * Test forceful shutdown.
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testForcefulShutdown() throws Exception {
+        testShutdown0(true);
+    }
+
+    /**
+     * Test shutdown.
+     * @param forceful Whether shutdown should be forceful.
+     * @throws Exception If failed.
+     */
+    private void testShutdown0(boolean forceful) throws Exception {
+        testCluster = new Cluster(2);
+        testCluster.startAwait();
+
+        ClusterService alice = testCluster.members.get(0);
+        ClusterService bob = testCluster.members.get(1);
+        String aliceName = alice.localConfiguration().getName();
 
         CountDownLatch aliceShutdownLatch = new CountDownLatch(1);
 
@@ -113,35 +141,115 @@ class ITScaleCubeNetworkMessagingTest {
             }
         });
 
-        alice.shutdown();
+        if (forceful)
+            stopForcefully(alice);
+        else
+            alice.shutdown();
 
-        boolean aliceShutdownReceived = aliceShutdownLatch.await(3, TimeUnit.SECONDS);
+        boolean aliceShutdownReceived = aliceShutdownLatch.await(forceful ? 10 : 3, TimeUnit.SECONDS);
         assertTrue(aliceShutdownReceived);
 
         Collection<ClusterNode> networkMembers = bob.topologyService().allMembers();
 
-        assertEquals(2, networkMembers.size());
+        assertEquals(1, networkMembers.size());
     }
 
-    /** */
-    private NetworkMessage getLastMessage(ClusterService clusterService) {
-        return messageStorage.get(clusterService.localConfiguration().getName());
+    /**
+     * Find cluster's transport and force it to stop.
+     * @param cluster Cluster to be shutdown.
+     * @throws Exception If failed to stop.
+     */
+    private static void stopForcefully(ClusterService cluster) throws Exception {
+        Field clusterImplField = cluster.getClass().getDeclaredField("val$cluster");
+        clusterImplField.setAccessible(true);
+
+        ClusterImpl clusterImpl = (ClusterImpl) clusterImplField.get(cluster);
+        Field transportField = clusterImpl.getClass().getDeclaredField("transport");
+        transportField.setAccessible(true);
+
+        Transport transport = (Transport) transportField.get(clusterImpl);
+        Method stop = transport.getClass().getDeclaredMethod("stop");
+        stop.setAccessible(true);
+
+        Mono<?> invoke = (Mono<?>) stop.invoke(transport);
+        invoke.block();
     }
 
-    /** */
-    private ClusterService startNetwork(String name, int port, List<String> addresses) {
-        var context = new ClusterLocalConfiguration(name, port, addresses, SERIALIZATION_REGISTRY);
+    /**
+     * Wrapper for cluster.
+     */
+    private static final class Cluster {
+        /** */
+        private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
+        /** */
+        private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
 
-        ClusterService clusterService = NETWORK_FACTORY.createClusterService(context);
+        /** */
+        final List<ClusterService> members;
 
-        clusterService.messagingService().addMessageHandler((message, sender, correlationId) -> {
-            messageStorage.put(name, message);
-        });
+        /** */
+        private final CountDownLatch startupLatch;
+
+        /** Constructor. */
+        Cluster(int numOfNodes) {
+            startupLatch = new CountDownLatch(numOfNodes - 1);
 
-        clusterService.start();
+            int initialPort = 3344;
 
-        startedMembers.add(clusterService);
+            List<String> addresses = IntStream.range(0, numOfNodes)
+                .mapToObj(i -> String.format("localhost:%d", initialPort + i))
+                .collect(Collectors.toUnmodifiableList());
 
-        return clusterService;
+            members = IntStream.range(0, numOfNodes)
+                .mapToObj(i -> startNode("Node #" + i, initialPort + i, addresses, i == 0))
+                .collect(Collectors.toUnmodifiableList());
+        }
+
+        /**
+         * Start cluster node.
+         *
+         * @param name Node name.
+         * @param port Node port.
+         * @param addresses Addresses of other nodes.
+         * @param initial Whether this node is the first one.
+         * @return Started cluster node.
+         */
+        private ClusterService startNode(String name, int port, List<String> addresses, boolean initial) {
+            var context = new ClusterLocalConfiguration(name, port, addresses, SERIALIZATION_REGISTRY);
+
+            ClusterService clusterService = NETWORK_FACTORY.createClusterService(context);
+
+            if (initial)
+                clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
+                    /** {@inheritDoc} */
+                    @Override public void onAppeared(ClusterNode member) {
+                        startupLatch.countDown();
+                    }
+
+                    /** {@inheritDoc} */
+                    @Override public void onDisappeared(ClusterNode member) {
+                    }
+                });
+
+            return clusterService;
+        }
+
+        /**
+         * Start and wait for cluster to come up.
+         * @throws InterruptedException If failed.
+         */
+        void startAwait() throws InterruptedException {
+            members.forEach(ClusterService::start);
+
+            if (!startupLatch.await(3, TimeUnit.SECONDS))
+                throw new AssertionError();
+        }
+
+        /**
+         * Shutdown cluster.
+         */
+        void shutdown() {
+            members.forEach(ClusterService::shutdown);
+        }
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 4dd7d5f..74af277 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -32,6 +32,7 @@ import org.apache.ignite.network.NetworkConfigurationException;
 
 /**
  * {@link ClusterServiceFactory} implementation that uses ScaleCube for messaging and topology services.
+ * TODO: IGNITE-14538: This factory should use ScaleCube configuration instead of default parameters.
  */
 public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
     /** {@inheritDoc} */
@@ -53,7 +54,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
             })
             .config(opts -> opts.memberAlias(context.getName()))
             .transport(opts -> opts.port(context.getPort()))
-            .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())));
+            .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())).suspicionMult(1));
 
         // resolve cyclic dependencies
         messagingService.setCluster(cluster);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index 73654b2..31eaaf7 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -52,34 +52,61 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
      */
     void onMembershipEvent(MembershipEvent event) {
         ClusterNode member = fromMember(event.member());
-        for (TopologyEventHandler handler : getEventHandlers()) {
-            switch (event.type()) {
-                case ADDED:
-                    members.put(member.name(), member);
 
-                    handler.onAppeared(member);
+        String memberName = member.name();
 
-                    break;
+        switch (event.type()) {
+            case ADDED:
+                members.put(memberName, member);
 
-                case LEAVING:
-                    members.remove(member.name());
+                fireAppearedEvent(member);
 
-                    handler.onDisappeared(member);
+                break;
 
-                    break;
+            case LEAVING:
+                members.remove(memberName);
 
-                case REMOVED:
-                case UPDATED:
-                    // No-op.
-                    break;
+                fireDisappearedEvent(member);
 
-                default:
-                    throw new IgniteInternalException("This event is not supported: event = " + event);
+                break;
+
+            case REMOVED:
+                // In case if member left non-gracefully, without sending LEAVING event.
+                if (members.remove(memberName) != null)
+                    fireDisappearedEvent(member);
+
+                break;
+
+            case UPDATED:
+                // No-op.
+                break;
+
+            default:
+                throw new IgniteInternalException("This event is not supported: event = " + event);
 
-            }
         }
     }
 
+    /**
+     * Fire a cluster member appearance event.
+     *
+     * @param member Appeared cluster member.
+     */
+    private void fireAppearedEvent(ClusterNode member) {
+        for (TopologyEventHandler handler : getEventHandlers())
+            handler.onAppeared(member);
+    }
+
+    /**
+     * Fire a cluster member disappearance event.
+     *
+     * @param member Disappeared cluster member.
+     */
+    private void fireDisappearedEvent(ClusterNode member) {
+        for (TopologyEventHandler handler : getEventHandlers())
+            handler.onDisappeared(member);
+    }
+
     /** {@inheritDoc} */
     @Override public ClusterNode localMember() {
         return localMember;