You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/05/13 14:21:27 UTC
[ignite-3] branch main updated: IGNITE-14707 Fixed topology events
processing when nodes are restarted in quick succession (#126).
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 21ebca1 IGNITE-14707 Fixed topology events processing when nodes are restarted in quick succession (#126).
21ebca1 is described below
commit 21ebca1d0273caf70e675a8e21f18247fcd707a6
Author: Alexey Scherbakov <al...@gmail.com>
AuthorDate: Thu May 13 17:21:11 2021 +0300
IGNITE-14707 Fixed topology events processing when nodes are restarted in quick succession (#126).
---
.../affinity/RendezvousAffinityFunction.java | 11 +-
.../affinity/RendezvousAffinityFunctionTest.java | 3 +-
modules/network/pom.xml | 7 ++
.../network/scalecube/ITNodeRestartsTest.java | 139 +++++++++++++++++++++
.../scalecube/ITScaleCubeNetworkMessagingTest.java | 2 +-
.../TestScaleCubeClusterServiceFactory.java | 38 ++++++
.../org/apache/ignite/network/ClusterNode.java | 41 +++---
.../org/apache/ignite/network/TopologyService.java | 9 ++
.../scalecube/ScaleCubeClusterServiceFactory.java | 16 ++-
.../scalecube/ScaleCubeMessagingService.java | 26 ++--
.../scalecube/ScaleCubeTopologyService.java | 53 +++-----
.../src/test/resources/simplelogger.properties | 52 ++++++++
.../raft/client/service/RaftGroupServiceTest.java | 6 +-
13 files changed, 321 insertions(+), 82 deletions(-)
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
index da5e6b1..1468665 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
@@ -28,7 +28,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.UUID;
import java.util.function.BiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteLogger;
@@ -92,7 +91,7 @@ public class RendezvousAffinityFunction {
int part,
List<ClusterNode> nodes,
int replicas,
- Map<UUID, Collection<ClusterNode>> neighborhoodCache,
+ Map<String, Collection<ClusterNode>> neighborhoodCache,
boolean exclNeighbors,
BiPredicate<ClusterNode, List<ClusterNode>> nodeFilter
) {
@@ -251,7 +250,7 @@ public class RendezvousAffinityFunction {
List<List<ClusterNode>> assignments = new ArrayList<>(partitions);
- Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
+ Map<String, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
neighbors(currentTopologySnapshot) : null;
List<ClusterNode> nodes = new ArrayList<>(currentTopologySnapshot);
@@ -271,7 +270,7 @@ public class RendezvousAffinityFunction {
* @param topSnapshot Topology snapshot.
* @return Neighbors map.
*/
- public static Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) {
+ public static Map<String, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) {
Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f);
// Group by mac addresses.
@@ -287,7 +286,7 @@ public class RendezvousAffinityFunction {
nodes.add(node);
}
- Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f);
+ Map<String, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f);
for (Collection<ClusterNode> group : macMap.values())
for (ClusterNode node : group)
@@ -308,7 +307,7 @@ public class RendezvousAffinityFunction {
/** {@inheritDoc} */
@Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) {
return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 :
- o1.get2().id().compareTo(o2.get2().id());
+ o1.get2().name().compareTo(o2.get2().name());
}
}
diff --git a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
index 529c592..1881da0 100644
--- a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
+++ b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.UUID;
import java.util.function.Function;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.lang.IgniteLogger;
@@ -101,7 +102,7 @@ public class RendezvousAffinityFunctionTest {
ArrayList<ClusterNode> clusterNodes = new ArrayList<>(nodes);
for (int i = 0; i < nodes; i++)
- clusterNodes.add(new ClusterNode("Node " + i, "127.0.0.1", 121212));
+ clusterNodes.add(new ClusterNode(UUID.randomUUID().toString(), "Node " + i, "127.0.0.1", 121212));
return clusterNodes;
}
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index b696324..6c8d7d5 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -66,6 +66,13 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
+
+ <!-- Logging in tests -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
new file mode 100644
index 0000000..4905a2e
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network.scalecube;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests if a topology size is correct after some nodes are restarted in quick succession.
+ */
+class ITNodeRestartsTest {
+ /** */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(ITNodeRestartsTest.class);
+
+ /** */
+ private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
+
+ /** */
+ private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
+
+ /** */
+ private List<ClusterService> services;
+
+ /** */
+ @AfterEach
+ void after() {
+ for (ClusterService service : services)
+ service.shutdown();
+ }
+
+ /** */
+ @Test
+ public void testRestarts() throws InterruptedException {
+ final int initPort = 3344;
+
+ String addr = "localhost";
+ List<String> addresses = IntStream.range(0, 5).mapToObj(i -> addr + ":" + (initPort + i)).collect(toList());
+
+ services = new ArrayList<>(addresses.size());
+
+ for (int i = 0; i < addresses.size(); i++) {
+ String address = addresses.get(i);
+
+ ClusterService svc = startNetwork(address, initPort + i, addresses);
+
+ services.add(svc);
+ }
+
+ for (ClusterService service : services) {
+ assertTrue(waitForTopology(service, 5, 5_000), service.topologyService().localMember().toString()
+ + ", topSize=" + service.topologyService().allMembers().size());
+ }
+
+ int idx0 = 0;
+ int idx1 = 2;
+
+ LOG.info("Shutdown " + addresses.get(idx0));
+ services.get(idx0).shutdown();
+
+ LOG.info("Shutdown " + addresses.get(idx1));
+ services.get(idx1).shutdown();
+
+ LOG.info("Starting " + addresses.get(idx0));
+ ClusterService svc0 = startNetwork(addresses.get(idx0), initPort + idx0, addresses);
+ services.set(idx0, svc0);
+
+ LOG.info("Starting " + addresses.get(idx1));
+ ClusterService svc2 = startNetwork(addresses.get(idx1), initPort + idx1, addresses);
+ services.set(idx1, svc2);
+
+ for (ClusterService service : services) {
+ assertTrue(waitForTopology(service, 5, 10_000), service.topologyService().localMember().toString()
+ + ", topSize=" + service.topologyService().allMembers().size());
+ }
+
+ LOG.info("Reached stable state");
+ }
+
+ /** */
+ private ClusterService startNetwork(String name, int port, List<String> addresses) {
+ var context = new ClusterLocalConfiguration(name, port, addresses, SERIALIZATION_REGISTRY);
+
+ ClusterService clusterService = NETWORK_FACTORY.createClusterService(context);
+
+ clusterService.start();
+
+ return clusterService;
+ }
+
+ /**
+ * @param service The service.
+ * @param expected Expected count.
+ * @param timeout The timeout.
+ * @return Wait status.
+ */
+ @SuppressWarnings("BusyWait")
+ protected boolean waitForTopology(ClusterService service, int expected, long timeout) {
+ long stop = System.currentTimeMillis() + timeout;
+
+ while(System.currentTimeMillis() < stop) {
+ if (service.topologyService().allMembers().size() == expected)
+ return true;
+
+ try {
+ Thread.sleep(50);
+ }
+ catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index c5117a0..b134b9b 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -257,7 +257,7 @@ class ITScaleCubeNetworkMessagingTest {
/** */
private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
/** */
- private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
+ private static final ClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
/** */
final List<ClusterService> members;
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java
new file mode 100644
index 0000000..46fc42e
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/TestScaleCubeClusterServiceFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.ClusterConfig;
+
+/**
+ * Scalecube test factory. Provides fast detection time.
+ */
+public class TestScaleCubeClusterServiceFactory extends ScaleCubeClusterServiceFactory {
+ /** {@inheritDoc} */
+ @Override protected ClusterConfig defaultConfig() {
+ ClusterConfig cfg = ClusterConfig.defaultLocalConfig();
+
+ // Theoretical suspicious timeout for 5 node cluster: 500 * 1 * log(5) = 349ms
+ // Short sync interval is required for faster convergence on node restarts.
+ cfg = cfg.membership(opts -> opts.syncInterval(1000).suspicionMult(1));
+
+ // Theoretical upper bound for detection of faulty node by some other node: 500 * (e / (e - 1)) = 790ms
+ cfg = cfg.failureDetector(opts -> opts.pingInterval(500).pingReqMembers(1));
+
+ return cfg;
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java b/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java
index 62caff1..2573a08 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -19,13 +19,15 @@ package org.apache.ignite.network;
import java.io.Serializable;
import java.util.Objects;
import org.apache.ignite.internal.tostring.S;
-import java.util.UUID;
/**
* Representation of a node in a cluster.
*/
public class ClusterNode implements Serializable {
- /** Unique name of member in cluster. */
+ /** Local id assigned to this node instance. Changes between restarts. */
+ private final String id;
+
+ /** Unique name of member in the cluster. Consistent between restarts. */
private final String name;
/** Node host. */
@@ -34,31 +36,49 @@ public class ClusterNode implements Serializable {
/** Node port. */
private final int port;
+ /** Node address in host:port format (lazily evaluated) */
+ private String address;
+
/**
* @param name Unique name of member in cluster.
*/
- public ClusterNode(String name, String host, int port) {
+ public ClusterNode(String id, String name, String host, int port) {
+ this.id = id;
this.name = name;
this.host = host;
this.port = port;
}
+ public String id() {
+ return id;
+ }
+
/**
- * @return Unique name of member in cluster.
+ * @return Unique name of member in cluster. Doesn't change between restarts.
*/
public String name() {
return name;
}
/**
- * @return node host name.
+ * @return Node host name.
*/
public String host() {
return host;
}
/**
- * @return node port.
+ * @return The address.
+ */
+ public String address() {
+ if (address == null)
+ address = host + ":" + port;
+
+ return address;
+ }
+
+ /**
+ * @return Node port.
*/
public int port() {
return port;
@@ -74,15 +94,6 @@ public class ClusterNode implements Serializable {
return port == that.port && name.equals(that.name) && host.equals(that.host);
}
- /**
- * Creates node UUID.
- *
- * @return Node UUID identifier.
- */
- public UUID id() {
- return new UUID(name.hashCode(), name.substring(name.length() / 2).hashCode());
- }
-
/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(name, host, port);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/TopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/TopologyService.java
index 03c52a2..e343bc0 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/TopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/TopologyService.java
@@ -17,6 +17,7 @@
package org.apache.ignite.network;
import java.util.Collection;
+import org.jetbrains.annotations.Nullable;
/**
* Entry point for obtaining information about a cluster's topology.
@@ -39,4 +40,12 @@ public interface TopologyService {
* Registers a handler for topology change events.
*/
void addEventHandler(TopologyEventHandler handler);
+
+ /**
+ * Returns a cluster node by it's network address in host:port format.
+ *
+ * @param addr The address.
+ * @return The node or {@code null} if the node is not yet discovered or dead.
+ */
+ @Nullable ClusterNode getByAddress(String addr);
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 4b0fd00..e1bbeb5 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -17,13 +17,14 @@
package org.apache.ignite.network.scalecube;
+import io.scalecube.cluster.ClusterConfig;
+import java.util.List;
+import java.util.stream.Collectors;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
-import java.util.List;
-import java.util.stream.Collectors;
import org.apache.ignite.network.AbstractClusterService;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
@@ -41,7 +42,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
var messagingService = new ScaleCubeMessagingService(topologyService);
var transportFactory = new DelegatingTransportFactory(messagingService);
- var cluster = new ClusterImpl()
+ var cluster = new ClusterImpl(defaultConfig())
.handler(cl -> new ClusterMessageHandler() {
/** {@inheritDoc} */
@Override public void onMessage(Message message) {
@@ -55,7 +56,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
})
.config(opts -> opts.memberAlias(context.getName()))
.transport(opts -> opts.port(context.getPort()).transportFactory(transportFactory))
- .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())).suspicionMult(1));
+ .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())));
// resolve cyclic dependencies
messagingService.setCluster(cluster);
@@ -77,6 +78,13 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
}
/**
+ * @return The default configuration.
+ */
+ protected ClusterConfig defaultConfig() {
+ return ClusterConfig.defaultConfig();
+ }
+
+ /**
* Convert string addresses to ScaleCube's {@link Address}es.
* @param addresses "host:port" formatted strings.
* @return List of addresses.
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index 658de70..c502c2b 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -18,10 +18,8 @@
package org.apache.ignite.network.scalecube;
import java.time.Duration;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
@@ -29,8 +27,6 @@ import org.apache.ignite.network.AbstractMessagingService;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkMessageHandler;
-import org.apache.ignite.network.TopologyEventHandler;
-import org.apache.ignite.network.TopologyService;
import org.apache.ignite.network.message.NetworkMessage;
/**
@@ -43,21 +39,13 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
private Cluster cluster;
/**
- * Utility map for recognizing cluster members by their addresses.
+ * Topology service.
*/
- private final Map<Address, ClusterNode> addressMemberMap = new ConcurrentHashMap<>();
+ private ScaleCubeTopologyService topologyService;
/** */
- ScaleCubeMessagingService(TopologyService topologyService) {
- topologyService.addEventHandler(new TopologyEventHandler() {
- @Override public void onAppeared(ClusterNode member) {
- addressMemberMap.put(clusterNodeAddress(member), member);
- }
-
- @Override public void onDisappeared(ClusterNode member) {
- addressMemberMap.remove(clusterNodeAddress(member));
- }
- });
+ ScaleCubeMessagingService(ScaleCubeTopologyService topologyService) {
+ this.topologyService = topologyService;
}
/**
@@ -72,7 +60,11 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
*/
void fireEvent(Message message) {
NetworkMessage msg = message.data();
- ClusterNode sender = addressMemberMap.get(message.sender());
+ ClusterNode sender = topologyService.getByAddress(message.header(Message.HEADER_SENDER));
+
+ if (sender == null) // Ignore the message from the unknown node.
+ return;
+
String correlationId = message.correlationId();
for (NetworkMessageHandler handler : getMessageHandlers())
handler.onReceived(msg, sender, correlationId);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index 70bbbde..b29587b 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -16,13 +16,12 @@
*/
package org.apache.ignite.network.scalecube;
+import io.scalecube.cluster.Member;
+import io.scalecube.cluster.membership.MembershipEvent;
import java.util.Collection;
import java.util.Collections;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import io.scalecube.cluster.Member;
-import io.scalecube.cluster.membership.MembershipEvent;
-import org.apache.ignite.lang.IgniteInternalException;
+import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.network.AbstractTopologyService;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.TopologyEventHandler;
@@ -36,7 +35,7 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
private ClusterNode localMember;
/** Topology members. */
- private final Map<String, ClusterNode> members = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, ClusterNode> members = new ConcurrentHashMap<>();
/**
* Sets the ScaleCube's local {@link Member}.
@@ -54,37 +53,16 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
void onMembershipEvent(MembershipEvent event) {
ClusterNode member = fromMember(event.member());
- String memberName = member.name();
-
- switch (event.type()) {
- case ADDED:
- members.put(memberName, member);
-
- fireAppearedEvent(member);
-
- break;
-
- case LEAVING:
- members.remove(memberName);
-
- fireDisappearedEvent(member);
+ if (event.isAdded()) {
+ members.put(member.address(), member);
- break;
-
- case REMOVED:
- // In case if member left non-gracefully, without sending LEAVING event.
- if (members.remove(memberName) != null)
- fireDisappearedEvent(member);
-
- break;
-
- case UPDATED:
- // No-op.
- break;
-
- default:
- throw new IgniteInternalException("This event is not supported: event = " + event);
+ fireAppearedEvent(member);
+ }
+ else if (event.isRemoved()) {
+ members.compute(member.address(), // Ignore stale remove event.
+ (k, v) -> v.id().equals(member.id()) ? null : v);
+ fireDisappearedEvent(member);
}
}
@@ -120,10 +98,15 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
return Collections.unmodifiableCollection(members.values());
}
+ /** {@inheritDoc} */
+ @Override public ClusterNode getByAddress(String addr) {
+ return members.get(addr);
+ }
+
/**
* Converts the given {@link Member} to a {@link ClusterNode}.
*/
private static ClusterNode fromMember(Member member) {
- return new ClusterNode(member.alias(), member.address().host(), member.address().port());
+ return new ClusterNode(member.id(), member.alias(), member.address().host(), member.address().port());
}
}
diff --git a/modules/network/src/test/resources/simplelogger.properties b/modules/network/src/test/resources/simplelogger.properties
new file mode 100644
index 0000000..7488d79
--- /dev/null
+++ b/modules/network/src/test/resources/simplelogger.properties
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SLF4J's SimpleLogger configuration file
+# Simple implementation of Logger that sends all enabled log messages, for all defined loggers, to System.err.
+
+# Default logging detail level for all instances of SimpleLogger.
+# Must be one of ("trace", "debug", "info", "warn", or "error").
+# If not specified, defaults to "info".
+org.slf4j.simpleLogger.defaultLogLevel=info
+
+# Logging detail level for a SimpleLogger instance named "xxxxx".
+# Must be one of ("trace", "debug", "info", "warn", or "error").
+# If not specified, the default logging detail level is used.
+#org.slf4j.simpleLogger.log.xxxxx=
+#org.slf4j.simpleLogger.log.io.scalecube.cluster.metadata.MetadataStore=debug
+#org.slf4j.simpleLogger.log.io.scalecube.cluster.membership.MembershipProtocol=debug
+
+# Set to true if you want the current date and time to be included in output messages.
+# Default is false, and will output the number of milliseconds elapsed since startup.
+org.slf4j.simpleLogger.showDateTime=true
+
+# The date and time format to be used in the output messages.
+# The pattern describing the date and time format is the same that is used in java.text.SimpleDateFormat.
+# If the format is not specified or is invalid, the default format is used.
+# The default format is yyyy-MM-dd HH:mm:ss:SSS Z.
+org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS Z
+
+# Set to true if you want to output the current thread name.
+# Defaults to true.
+#org.slf4j.simpleLogger.showThreadName=true
+
+# Set to true if you want the Logger instance name to be included in output messages.
+# Defaults to true.
+#org.slf4j.simpleLogger.showLogName=true
+
+# Set to true if you want the last component of the name to be included in output messages.
+# Defaults to false.
+org.slf4j.simpleLogger.showShortLogName=true
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
index 344dd55..9aeb992 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
@@ -67,9 +67,9 @@ public class RaftGroupServiceTest {
/** */
private static final List<Peer> NODES = of(
- new Peer(new ClusterNode("node1", "foobar", 123)),
- new Peer(new ClusterNode("node2", "foobar", 123)),
- new Peer(new ClusterNode("node3", "foobar", 123))
+ new Peer(new ClusterNode("id1", "node1", "foobar", 123)),
+ new Peer(new ClusterNode("id2", "node2", "foobar", 124)),
+ new Peer(new ClusterNode("id3", "node3", "foobar", 125))
);
/** */