You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/04/14 09:22:23 UTC

[ignite-3] branch main updated: IGNITE-14517 Fixed topology change handling on graceful shutdown. - Fixes #92.

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6b20869  IGNITE-14517 Fixed topology change handling on graceful shutdown. - Fixes #92.
6b20869 is described below

commit 6b2086977c730296caebcd3264bd75ac6a9dfcee
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Apr 14 12:21:20 2021 +0300

    IGNITE-14517 Fixed topology change handling on graceful shutdown. - Fixes #92.
    
    Signed-off-by: Alexey Scherbakov <al...@gmail.com>
---
 .../scalecube/ITScaleCubeNetworkMessagingTest.java | 53 ++++++++++++++--------
 .../scalecube/ScaleCubeClusterServiceFactory.java  | 28 ++++++++----
 .../scalecube/ScaleCubeTopologyService.java        | 47 ++++++++++++-------
 3 files changed, 83 insertions(+), 45 deletions(-)

diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index c6fc15f..367e78b 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -17,6 +17,7 @@
 package org.apache.ignite.network.scalecube;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +38,7 @@ import org.junit.jupiter.api.Test;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** */
@@ -67,13 +69,15 @@ class ITScaleCubeNetworkMessagingTest {
         //Given: Three started member which are gathered to cluster.
         List<String> addresses = List.of("localhost:3344", "localhost:3345", "localhost:3346");
 
-        CountDownLatch latch = new CountDownLatch(3);
+        CountDownLatch messageReceivedLatch = new CountDownLatch(3);
 
-        ClusterService alice = startNetwork("Alice", 3344, addresses);
+        String aliceName = "Alice";
+
+        ClusterService alice = startNetwork(aliceName, 3344, addresses);
         ClusterService bob = startNetwork("Bob", 3345, addresses);
         ClusterService carol = startNetwork("Carol", 3346, addresses);
 
-        NetworkMessageHandler messageWaiter = (message, sender, correlationId) -> latch.countDown();
+        NetworkMessageHandler messageWaiter = (message, sender, correlationId) -> messageReceivedLatch.countDown();
 
         alice.messagingService().addMessageHandler(messageWaiter);
         bob.messagingService().addMessageHandler(messageWaiter);
@@ -83,18 +87,40 @@ class ITScaleCubeNetworkMessagingTest {
 
         //When: Send one message to all members in cluster.
         for (ClusterNode member : alice.topologyService().allMembers()) {
-            System.out.println("SEND : " + member);
-
             alice.messagingService().weakSend(member, testMessage);
         }
 
-        boolean done = latch.await(3, TimeUnit.SECONDS);
-        assertTrue(done);
+        boolean messagesReceived = messageReceivedLatch.await(3, TimeUnit.SECONDS);
+        assertTrue(messagesReceived);
 
         //Then: All members successfully received message.
         assertThat(getLastMessage(alice), is(testMessage));
         assertThat(getLastMessage(bob), is(testMessage));
         assertThat(getLastMessage(carol), is(testMessage));
+
+        CountDownLatch aliceShutdownLatch = new CountDownLatch(1);
+
+        bob.topologyService().addEventHandler(new TopologyEventHandler() {
+            /** {@inheritDoc} */
+            @Override public void onAppeared(ClusterNode member) {
+                // No-op.
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onDisappeared(ClusterNode member) {
+                if (aliceName.equals(member.name()))
+                    aliceShutdownLatch.countDown();
+            }
+        });
+
+        alice.shutdown();
+
+        boolean aliceShutdownReceived = aliceShutdownLatch.await(3, TimeUnit.SECONDS);
+        assertTrue(aliceShutdownReceived);
+
+        Collection<ClusterNode> networkMembers = bob.topologyService().allMembers();
+
+        assertEquals(2, networkMembers.size());
     }
 
     /** */
@@ -107,22 +133,9 @@ class ITScaleCubeNetworkMessagingTest {
         var context = new ClusterLocalConfiguration(name, port, addresses, SERIALIZATION_REGISTRY);
 
         ClusterService clusterService = NETWORK_FACTORY.createClusterService(context);
-        System.out.println("-----" + name + " started");
 
         clusterService.messagingService().addMessageHandler((message, sender, correlationId) -> {
             messageStorage.put(name, message);
-
-            System.out.println(name + " handled messages : " + message);
-        });
-
-        clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
-            @Override public void onAppeared(ClusterNode member) {
-                System.out.println(name + " found member : " + member);
-            }
-
-            @Override public void onDisappeared(ClusterNode member) {
-                System.out.println(name + " lost member : " + member);
-            }
         });
 
         clusterService.start();
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 6b473a0..4dd7d5f 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -17,18 +17,18 @@
 
 package org.apache.ignite.network.scalecube;
 
-import java.util.List;
-import java.util.stream.Collectors;
 import io.scalecube.cluster.ClusterImpl;
 import io.scalecube.cluster.ClusterMessageHandler;
 import io.scalecube.cluster.membership.MembershipEvent;
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.net.Address;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkConfigurationException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.network.AbstractClusterService;
 import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.ClusterServiceFactory;
-import org.apache.ignite.network.AbstractClusterService;
+import org.apache.ignite.network.NetworkConfigurationException;
 
 /**
  * {@link ClusterServiceFactory} implementation that uses ScaleCube for messaging and topology services.
@@ -41,12 +41,14 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
 
         var cluster = new ClusterImpl()
             .handler(cl -> new ClusterMessageHandler() {
+                /** {@inheritDoc} */
                 @Override public void onMessage(Message message) {
                     messagingService.fireEvent(message);
                 }
 
+                /** {@inheritDoc} */
                 @Override public void onMembershipEvent(MembershipEvent event) {
-                    topologyService.fireEvent(event);
+                    topologyService.onMembershipEvent(event);
                 }
             })
             .config(opts -> opts.memberAlias(context.getName()))
@@ -54,14 +56,17 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
             .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())));
 
         // resolve cyclic dependencies
-        topologyService.setCluster(cluster);
         messagingService.setCluster(cluster);
 
         return new AbstractClusterService(context, topologyService, messagingService) {
+            /** {@inheritDoc} */
             @Override public void start() {
                 cluster.startAwait();
+
+                topologyService.setLocalMember(cluster.member());
             }
 
+            /** {@inheritDoc} */
             @Override public void shutdown() {
                 cluster.shutdown();
                 cluster.onShutdown().block();
@@ -69,13 +74,18 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
         };
     }
 
-    /** */
+    /**
+     * Convert string addresses to ScaleCube's {@link Address}es.
+     * @param addresses "host:port" formatted strings.
+     * @return List of addresses.
+     */
     private List<Address> parseAddresses(List<String> addresses) {
         try {
             return addresses.stream()
                 .map(Address::from)
                 .collect(Collectors.toList());
-        } catch (IllegalArgumentException e) {
+        }
+        catch (IllegalArgumentException e) {
             throw new NetworkConfigurationException("Failed to parse address", e);
         }
     }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index da69281..73654b2 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -16,11 +16,13 @@
  */
 package org.apache.ignite.network.scalecube;
 
-import java.util.Collection;
-import java.util.stream.Collectors;
-import io.scalecube.cluster.Cluster;
 import io.scalecube.cluster.Member;
 import io.scalecube.cluster.membership.MembershipEvent;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.AbstractTopologyService;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.TopologyEventHandler;
@@ -30,49 +32,62 @@ import org.apache.ignite.network.TopologyService;
  * Implementation of {@link TopologyService} based on ScaleCube.
  */
 final class ScaleCubeTopologyService extends AbstractTopologyService {
-    /** Inner representation a ScaleCube cluster. */
-    private Cluster cluster;
+    /** Local member node. */
+    private ClusterNode localMember;
+
+    /** Topology members. */
+    private final Map<String, ClusterNode> members = new ConcurrentHashMap<>();
 
     /**
-     * Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
+     * Sets the ScaleCube's local {@link Member}.
      */
-    void setCluster(Cluster cluster) {
-        this.cluster = cluster;
+    void setLocalMember(Member member) {
+        this.localMember = fromMember(member);
+
+        this.members.put(localMember.name(), localMember);
     }
 
     /**
      * Delegates the received topology event to the registered event handlers.
      */
-    void fireEvent(MembershipEvent event) {
+    void onMembershipEvent(MembershipEvent event) {
         ClusterNode member = fromMember(event.member());
         for (TopologyEventHandler handler : getEventHandlers()) {
             switch (event.type()) {
                 case ADDED:
+                    members.put(member.name(), member);
+
                     handler.onAppeared(member);
+
                     break;
+
                 case LEAVING:
-                case REMOVED:
+                    members.remove(member.name());
+
                     handler.onDisappeared(member);
+
                     break;
+
+                case REMOVED:
                 case UPDATED:
-                    // do nothing
+                    // No-op.
                     break;
+
                 default:
-                    throw new RuntimeException("This event is not supported: event = " + event);
+                    throw new IgniteInternalException("This event is not supported: event = " + event);
+
             }
         }
     }
 
     /** {@inheritDoc} */
     @Override public ClusterNode localMember() {
-        return fromMember(cluster.member());
+        return localMember;
     }
 
     /** {@inheritDoc} */
     @Override public Collection<ClusterNode> allMembers() {
-        return cluster.members().stream()
-            .map(ScaleCubeTopologyService::fromMember)
-            .collect(Collectors.toList());
+        return Collections.unmodifiableCollection(members.values());
     }
 
     /**