You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/05/13 14:21:27 UTC

[ignite-3] branch main updated: IGNITE-14707 Fixed topology events processing when nodes are restarted in quick succession (#126).

This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 21ebca1  IGNITE-14707 Fixed topology events processing when nodes are restarted in quick succession (#126).
21ebca1 is described below

commit 21ebca1d0273caf70e675a8e21f18247fcd707a6
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Thu May 13 17:21:11 2021 +0300

    IGNITE-14707 Fixed topology events processing when nodes are restarted in quick succession (#126).
---
 .../affinity/RendezvousAffinityFunction.java       |  11 +-
 .../affinity/RendezvousAffinityFunctionTest.java   |   3 +-
 modules/network/pom.xml                            |   7 ++
 .../network/scalecube/ITNodeRestartsTest.java      | 139 +++++++++++++++++++++
 .../scalecube/ITScaleCubeNetworkMessagingTest.java |   2 +-
 .../TestScaleCubeClusterServiceFactory.java        |  38 ++++++
 .../org/apache/ignite/network/ClusterNode.java     |  41 +++---
 .../org/apache/ignite/network/TopologyService.java |   9 ++
 .../scalecube/ScaleCubeClusterServiceFactory.java  |  16 ++-
 .../scalecube/ScaleCubeMessagingService.java       |  26 ++--
 .../scalecube/ScaleCubeTopologyService.java        |  53 +++-----
 .../src/test/resources/simplelogger.properties     |  52 ++++++++
 .../raft/client/service/RaftGroupServiceTest.java  |   6 +-
 13 files changed, 321 insertions(+), 82 deletions(-)

diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
index da5e6b1..1468665 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
@@ -28,7 +28,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.UUID;
 import java.util.function.BiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteLogger;
@@ -92,7 +91,7 @@ public class RendezvousAffinityFunction {
         int part,
         List<ClusterNode> nodes,
         int replicas,
-        Map<UUID, Collection<ClusterNode>> neighborhoodCache,
+        Map<String, Collection<ClusterNode>> neighborhoodCache,
         boolean exclNeighbors,
         BiPredicate<ClusterNode, List<ClusterNode>> nodeFilter
     ) {
@@ -251,7 +250,7 @@ public class RendezvousAffinityFunction {
 
         List<List<ClusterNode>> assignments = new ArrayList<>(partitions);
 
-        Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
+        Map<String, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
             neighbors(currentTopologySnapshot) : null;
 
         List<ClusterNode> nodes = new ArrayList<>(currentTopologySnapshot);
@@ -271,7 +270,7 @@ public class RendezvousAffinityFunction {
      * @param topSnapshot Topology snapshot.
      * @return Neighbors map.
      */
-    public static Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) {
+    public static Map<String, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) {
         Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f);
 
         // Group by mac addresses.
@@ -287,7 +286,7 @@ public class RendezvousAffinityFunction {
             nodes.add(node);
         }
 
-        Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f);
+        Map<String, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f);
 
         for (Collection<ClusterNode> group : macMap.values())
             for (ClusterNode node : group)
@@ -308,7 +307,7 @@ public class RendezvousAffinityFunction {
         /** {@inheritDoc} */
         @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) {
             return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 :
-                o1.get2().id().compareTo(o2.get2().id());
+                o1.get2().name().compareTo(o2.get2().name());
         }
     }
 
diff --git a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
index 529c592..1881da0 100644
--- a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
+++ b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.UUID;
 import java.util.function.Function;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.lang.IgniteLogger;
@@ -101,7 +102,7 @@ public class RendezvousAffinityFunctionTest {
         ArrayList<ClusterNode> clusterNodes = new ArrayList<>(nodes);
 
         for (int i = 0; i < nodes; i++)
-            clusterNodes.add(new ClusterNode("Node " + i, "127.0.0.1", 121212));
+            clusterNodes.add(new ClusterNode(UUID.randomUUID().toString(), "Node " + i, "127.0.0.1", 121212));
         return clusterNodes;
     }
 
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index b696324..6c8d7d5 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -66,6 +66,13 @@
             <artifactId>junit-jupiter-engine</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <!-- Logging in tests -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     
     <build>
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
new file mode 100644
index 0000000..4905a2e
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network.scalecube;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests if a topology size is correct after some nodes are restarted in quick succession.
+ */
+class ITNodeRestartsTest {
+    /** */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(ITNodeRestartsTest.class);
+
+    /** */
+    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
+
+    /** */
+    private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
+
+    /** */
+    private List<ClusterService> services;
+
+    /** */
+    @AfterEach
+    void after() {
+        for (ClusterService service : services)
+            service.shutdown();
+    }
+
+    /** */
+    @Test
+    public void testRestarts() throws InterruptedException {
+        final int initPort = 3344;
+
+        String addr = "localhost";
+        List<String> addresses = IntStream.range(0, 5).mapToObj(i -> addr + ":" + (initPort + i)).collect(toList());
+
+        services = new ArrayList<>(addresses.size());
+
+        for (int i = 0; i < addresses.size(); i++) {
+            String address = addresses.get(i);
+
+            ClusterService svc = startNetwork(address, initPort + i, addresses);
+
+            services.add(svc);
+        }
+
+        for (ClusterService service : services) {
+            assertTrue(waitForTopology(service, 5, 5_000), service.topologyService().localMember().toString()
+                + ", topSize=" + service.topologyService().allMembers().size());
+        }
+
+        int idx0 = 0;
+        int idx1 = 2;
+
+        LOG.info("Shutdown " + addresses.get(idx0));
+        services.get(idx0).shutdown();
+
+        LOG.info("Shutdown " + addresses.get(idx1));
+        services.get(idx1).shutdown();
+
+        LOG.info("Starting " + addresses.get(idx0));
+        ClusterService svc0 = startNetwork(addresses.get(idx0), initPort + idx0, addresses);
+        services.set(idx0, svc0);
+
+        LOG.info("Starting " + addresses.get(idx1));
+        ClusterService svc2 = startNetwork(addresses.get(idx1), initPort + idx1, addresses);
+        services.set(idx1, svc2);
+
+        for (ClusterService service : services) {
+            assertTrue(waitForTopology(service, 5, 10_000), service.topologyService().localMember().toString()
+                + ", topSize=" + service.topologyService().allMembers().size());
+        }
+
+        LOG.info("Reached stable state");
+    }
+
+    /** */
+    private ClusterService startNetwork(String name, int port, List<String> addresses) {
+        var context = new ClusterLocalConfiguration(name, port, addresses, SERIALIZATION_REGISTRY);
+
+        ClusterService clusterService = NETWORK_FACTORY.createClusterService(context);
+
+        clusterService.start();
+
+        return clusterService;
+    }
+
+    /**
+     * @param service  The service.
+     * @param expected Expected count.
+     * @param timeout  The timeout.
+     * @return Wait status.
+     */
+    @SuppressWarnings("BusyWait")
+    protected boolean waitForTopology(ClusterService service, int expected, long timeout) {
+        long stop = System.currentTimeMillis() + timeout;
+
+        while(System.currentTimeMillis() < stop) {
+            if (service.topologyService().allMembers().size() == expected)
+                return true;
+
+            try {
+                Thread.sleep(50);
+            }
+            catch (InterruptedException e) {
+                return false;
+            }
+        }
+
+        return false;
+    }
+}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index c5117a0..b134b9b 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -257,7 +257,7 @@ class ITScaleCubeNetworkMessagingTest {
         /** */
         private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
         /** */
-        private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
+        private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
 
         /** */
         final List<ClusterService> members;
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java
new file mode 100644
index 0000000..46fc42e
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.ClusterConfig;
+
+/**
+ * Scalecube test factory. Provides fast detection time.
+ */
+public class TestScaleCubeClusterServiceFactory extends ScaleCubeClusterServiceFactory {
+    /** {@inheritDoc} */
+    @Override protected ClusterConfig defaultConfig() {
+        ClusterConfig cfg = ClusterConfig.defaultLocalConfig();
+
+        // Theoretical suspicious timeout for 5 node cluster: 500 * 1 * log(5) = 349ms
+        // Short sync interval is required for faster convergence on node restarts.
+        cfg = cfg.membership(opts -> opts.syncInterval(1000).suspicionMult(1));
+
+        // Theoretical upper bound for detection of faulty node by some other node: 500 * (e / (e - 1)) = 790ms
+        cfg = cfg.failureDetector(opts -> opts.pingInterval(500).pingReqMembers(1));
+
+        return cfg;
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java b/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java
index 62caff1..2573a08 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -19,13 +19,15 @@ package org.apache.ignite.network;
 import java.io.Serializable;
 import java.util.Objects;
 import org.apache.ignite.internal.tostring.S;
-import java.util.UUID;
 
 /**
  * Representation of a node in a cluster.
  */
 public class ClusterNode implements Serializable {
-    /** Unique name of member in cluster. */
+    /** Local id assigned to this node instance. Changes between restarts. */
+    private final String id;
+
+    /** Unique name of member in the cluster. Consistent between restarts. */
     private final String name;
 
     /** Node host. */
@@ -34,31 +36,49 @@ public class ClusterNode implements Serializable {
     /** Node port. */
     private final int port;
 
+    /** Node address in host:port format (lazily evaluated) */
+    private String address;
+
     /**
      * @param name Unique name of member in cluster.
      */
-    public ClusterNode(String name, String host, int port) {
+    public ClusterNode(String id, String name, String host, int port) {
+        this.id = id;
         this.name = name;
         this.host = host;
         this.port = port;
     }
 
+    public String id() {
+        return id;
+    }
+
     /**
-     * @return Unique name of member in cluster.
+     * @return Unique name of member in cluster. Doesn't change between restarts.
      */
     public String name() {
         return name;
     }
 
     /**
-     * @return node host name.
+     * @return Node host name.
      */
     public String host() {
         return host;
     }
 
     /**
-     * @return node port.
+     * @return The address.
+     */
+    public String address() {
+        if (address == null)
+            address = host + ":" + port;
+
+        return address;
+    }
+
+    /**
+     * @return Node port.
      */
     public int port() {
         return port;
@@ -74,15 +94,6 @@ public class ClusterNode implements Serializable {
         return port == that.port && name.equals(that.name) && host.equals(that.host);
     }
 
-    /**
-     * Creates node UUID.
-     *
-     * @return Node UUID identifier.
-     */
-    public UUID id() {
-        return new UUID(name.hashCode(), name.substring(name.length() / 2).hashCode());
-    }
-
     /** {@inheritDoc} */
     @Override public int hashCode() {
         return Objects.hash(name, host, port);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/TopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/TopologyService.java
index 03c52a2..e343bc0 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/TopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/TopologyService.java
@@ -17,6 +17,7 @@
 package org.apache.ignite.network;
 
 import java.util.Collection;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Entry point for obtaining information about a cluster's topology.
@@ -39,4 +40,12 @@ public interface TopologyService {
      * Registers a handler for topology change events.
      */
     void addEventHandler(TopologyEventHandler handler);
+
+    /**
+     * Returns a cluster node by it's network address in host:port format.
+     *
+     * @param addr The address.
+     * @return The node or {@code null} if the node is not yet discovered or dead.
+     */
+    @Nullable ClusterNode getByAddress(String addr);
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 4b0fd00..e1bbeb5 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.network.scalecube;
 
+import io.scalecube.cluster.ClusterConfig;
+import java.util.List;
+import java.util.stream.Collectors;
 import io.scalecube.cluster.ClusterImpl;
 import io.scalecube.cluster.ClusterMessageHandler;
 import io.scalecube.cluster.membership.MembershipEvent;
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.net.Address;
-import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.ignite.network.AbstractClusterService;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
@@ -41,7 +42,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
         var messagingService = new ScaleCubeMessagingService(topologyService);
         var transportFactory = new DelegatingTransportFactory(messagingService);
 
-        var cluster = new ClusterImpl()
+        var cluster = new ClusterImpl(defaultConfig())
             .handler(cl -> new ClusterMessageHandler() {
                 /** {@inheritDoc} */
                 @Override public void onMessage(Message message) {
@@ -55,7 +56,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
             })
             .config(opts -> opts.memberAlias(context.getName()))
             .transport(opts -> opts.port(context.getPort()).transportFactory(transportFactory))
-            .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())).suspicionMult(1));
+            .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())));
 
         // resolve cyclic dependencies
         messagingService.setCluster(cluster);
@@ -77,6 +78,13 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
     }
 
     /**
+     * @return The default configuration.
+     */
+    protected ClusterConfig defaultConfig() {
+        return ClusterConfig.defaultConfig();
+    }
+
+    /**
      * Convert string addresses to ScaleCube's {@link Address}es.
      * @param addresses "host:port" formatted strings.
      * @return List of addresses.
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index 658de70..c502c2b 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -18,10 +18,8 @@
 package org.apache.ignite.network.scalecube;
 
 import java.time.Duration;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import io.scalecube.cluster.Cluster;
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.net.Address;
@@ -29,8 +27,6 @@ import org.apache.ignite.network.AbstractMessagingService;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkMessageHandler;
-import org.apache.ignite.network.TopologyEventHandler;
-import org.apache.ignite.network.TopologyService;
 import org.apache.ignite.network.message.NetworkMessage;
 
 /**
@@ -43,21 +39,13 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
     private Cluster cluster;
 
     /**
-     * Utility map for recognizing cluster members by their addresses.
+     * Topology service.
      */
-    private final Map<Address, ClusterNode> addressMemberMap = new ConcurrentHashMap<>();
+    private ScaleCubeTopologyService topologyService;
 
     /** */
-    ScaleCubeMessagingService(TopologyService topologyService) {
-        topologyService.addEventHandler(new TopologyEventHandler() {
-            @Override public void onAppeared(ClusterNode member) {
-                addressMemberMap.put(clusterNodeAddress(member), member);
-            }
-
-            @Override public void onDisappeared(ClusterNode member) {
-                addressMemberMap.remove(clusterNodeAddress(member));
-            }
-        });
+    ScaleCubeMessagingService(ScaleCubeTopologyService topologyService) {
+        this.topologyService = topologyService;
     }
 
     /**
@@ -72,7 +60,11 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
      */
     void fireEvent(Message message) {
         NetworkMessage msg = message.data();
-        ClusterNode sender = addressMemberMap.get(message.sender());
+        ClusterNode sender = topologyService.getByAddress(message.header(Message.HEADER_SENDER));
+
+        if (sender == null) // Ignore the message from the unknown node.
+            return;
+
         String correlationId = message.correlationId();
         for (NetworkMessageHandler handler : getMessageHandlers())
             handler.onReceived(msg, sender, correlationId);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index 70bbbde..b29587b 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -16,13 +16,12 @@
  */
 package org.apache.ignite.network.scalecube;
 
+import io.scalecube.cluster.Member;
+import io.scalecube.cluster.membership.MembershipEvent;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import io.scalecube.cluster.Member;
-import io.scalecube.cluster.membership.MembershipEvent;
-import org.apache.ignite.lang.IgniteInternalException;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.network.AbstractTopologyService;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.TopologyEventHandler;
@@ -36,7 +35,7 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
     private ClusterNode localMember;
 
     /** Topology members. */
-    private final Map<String, ClusterNode> members = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, ClusterNode> members = new ConcurrentHashMap<>();
 
     /**
      * Sets the ScaleCube's local {@link Member}.
@@ -54,37 +53,16 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
     void onMembershipEvent(MembershipEvent event) {
         ClusterNode member = fromMember(event.member());
 
-        String memberName = member.name();
-
-        switch (event.type()) {
-            case ADDED:
-                members.put(memberName, member);
-
-                fireAppearedEvent(member);
-
-                break;
-
-            case LEAVING:
-                members.remove(memberName);
-
-                fireDisappearedEvent(member);
+        if (event.isAdded()) {
+            members.put(member.address(), member);
 
-                break;
-
-            case REMOVED:
-                // In case if member left non-gracefully, without sending LEAVING event.
-                if (members.remove(memberName) != null)
-                    fireDisappearedEvent(member);
-
-                break;
-
-            case UPDATED:
-                // No-op.
-                break;
-
-            default:
-                throw new IgniteInternalException("This event is not supported: event = " + event);
+            fireAppearedEvent(member);
+        }
+        else if (event.isRemoved()) {
+            members.compute(member.address(), // Ignore stale remove event.
+                (k, v) -> v.id().equals(member.id()) ? null : v);
 
+            fireDisappearedEvent(member);
         }
     }
 
@@ -120,10 +98,15 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
         return Collections.unmodifiableCollection(members.values());
     }
 
+    /** {@inheritDoc} */
+    @Override public ClusterNode getByAddress(String addr) {
+        return members.get(addr);
+    }
+
     /**
      * Converts the given {@link Member} to a {@link ClusterNode}.
      */
     private static ClusterNode fromMember(Member member) {
-        return new ClusterNode(member.alias(), member.address().host(), member.address().port());
+        return new ClusterNode(member.id(), member.alias(), member.address().host(), member.address().port());
     }
 }
diff --git a/modules/network/src/test/resources/simplelogger.properties b/modules/network/src/test/resources/simplelogger.properties
new file mode 100644
index 0000000..7488d79
--- /dev/null
+++ b/modules/network/src/test/resources/simplelogger.properties
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SLF4J's SimpleLogger configuration file
+# Simple implementation of Logger that sends all enabled log messages, for all defined loggers, to System.err.
+
+# Default logging detail level for all instances of SimpleLogger.
+# Must be one of ("trace", "debug", "info", "warn", or "error").
+# If not specified, defaults to "info".
+org.slf4j.simpleLogger.defaultLogLevel=info
+
+# Logging detail level for a SimpleLogger instance named "xxxxx".
+# Must be one of ("trace", "debug", "info", "warn", or "error").
+# If not specified, the default logging detail level is used.
+#org.slf4j.simpleLogger.log.xxxxx=
+#org.slf4j.simpleLogger.log.io.scalecube.cluster.metadata.MetadataStore=debug
+#org.slf4j.simpleLogger.log.io.scalecube.cluster.membership.MembershipProtocol=debug
+
+# Set to true if you want the current date and time to be included in output messages.
+# Default is false, and will output the number of milliseconds elapsed since startup.
+org.slf4j.simpleLogger.showDateTime=true
+
+# The date and time format to be used in the output messages.
+# The pattern describing the date and time format is the same that is used in java.text.SimpleDateFormat.
+# If the format is not specified or is invalid, the default format is used.
+# The default format is yyyy-MM-dd HH:mm:ss:SSS Z.
+org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS Z
+
+# Set to true if you want to output the current thread name.
+# Defaults to true.
+#org.slf4j.simpleLogger.showThreadName=true
+
+# Set to true if you want the Logger instance name to be included in output messages.
+# Defaults to true.
+#org.slf4j.simpleLogger.showLogName=true
+
+# Set to true if you want the last component of the name to be included in output messages.
+# Defaults to false.
+org.slf4j.simpleLogger.showShortLogName=true
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
index 344dd55..9aeb992 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
@@ -67,9 +67,9 @@ public class RaftGroupServiceTest {
 
     /** */
     private static final List<Peer> NODES = of(
-        new Peer(new ClusterNode("node1", "foobar", 123)),
-        new Peer(new ClusterNode("node2", "foobar", 123)),
-        new Peer(new ClusterNode("node3", "foobar", 123))
+        new Peer(new ClusterNode("id1", "node1", "foobar", 123)),
+        new Peer(new ClusterNode("id2", "node2", "foobar", 124)),
+        new Peer(new ClusterNode("id3", "node3", "foobar", 125))
     );
 
     /** */