You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2021/04/19 14:46:15 UTC
[ignite-3] branch main updated: IGNITE-14536 Node disappeared event
propagation in case of node forceful shutdown (#94)
This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d81277f IGNITE-14536 Node disappeared event propagation in case of node forceful shutdown (#94)
d81277f is described below
commit d81277f18f2ad4fa5260a4c21b497397a4ff5368
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Mon Apr 19 17:43:12 2021 +0300
IGNITE-14536 Node disappeared event propagation in case of node forceful shutdown (#94)
Signed-off-by: Sergey Chugunov <se...@gmail.com>
---
.../scalecube/ITScaleCubeNetworkMessagingTest.java | 200 ++++++++++++++++-----
.../scalecube/ScaleCubeClusterServiceFactory.java | 3 +-
.../scalecube/ScaleCubeTopologyService.java | 61 +++++--
3 files changed, 200 insertions(+), 64 deletions(-)
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index 367e78b..c241dcf 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -16,7 +16,10 @@
*/
package org.apache.ignite.network.scalecube;
-import java.util.ArrayList;
+import io.scalecube.cluster.ClusterImpl;
+import io.scalecube.cluster.transport.api.Transport;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -24,17 +27,19 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
-import org.apache.ignite.network.NetworkMessageHandler;
import org.apache.ignite.network.TestMessage;
import org.apache.ignite.network.TopologyEventHandler;
import org.apache.ignite.network.message.MessageSerializationRegistry;
import org.apache.ignite.network.message.NetworkMessage;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -43,22 +48,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** */
class ITScaleCubeNetworkMessagingTest {
- /** */
- private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
-
- /** */
- private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
-
- /** */
- private final Map<String, NetworkMessage> messageStorage = new ConcurrentHashMap<>();
-
- /** */
- private final List<ClusterService> startedMembers = new ArrayList<>();
+ private Cluster testCluster;
/** */
@AfterEach
public void afterEach() {
- startedMembers.forEach(ClusterService::shutdown);
+ testCluster.shutdown();
}
/**
@@ -66,26 +61,27 @@ class ITScaleCubeNetworkMessagingTest {
*/
@Test
public void messageWasSentToAllMembersSuccessfully() throws Exception {
- //Given: Three started member which are gathered to cluster.
- List<String> addresses = List.of("localhost:3344", "localhost:3345", "localhost:3346");
+ Map<String, NetworkMessage> messageStorage = new ConcurrentHashMap<>();
CountDownLatch messageReceivedLatch = new CountDownLatch(3);
- String aliceName = "Alice";
-
- ClusterService alice = startNetwork(aliceName, 3344, addresses);
- ClusterService bob = startNetwork("Bob", 3345, addresses);
- ClusterService carol = startNetwork("Carol", 3346, addresses);
+ testCluster = new Cluster(3);
- NetworkMessageHandler messageWaiter = (message, sender, correlationId) -> messageReceivedLatch.countDown();
+ for (ClusterService member : testCluster.members) {
+ member.messagingService().addMessageHandler(
+ (message, sender, correlationId) -> {
+ messageStorage.put(member.localConfiguration().getName(), message);
+ messageReceivedLatch.countDown();
+ }
+ );
+ }
- alice.messagingService().addMessageHandler(messageWaiter);
- bob.messagingService().addMessageHandler(messageWaiter);
- carol.messagingService().addMessageHandler(messageWaiter);
+ testCluster.startAwait();
TestMessage testMessage = new TestMessage("Message from Alice", Collections.emptyMap());
- //When: Send one message to all members in cluster.
+ ClusterService alice = testCluster.members.get(0);
+
for (ClusterNode member : alice.topologyService().allMembers()) {
alice.messagingService().weakSend(member, testMessage);
}
@@ -93,10 +89,42 @@ class ITScaleCubeNetworkMessagingTest {
boolean messagesReceived = messageReceivedLatch.await(3, TimeUnit.SECONDS);
assertTrue(messagesReceived);
- //Then: All members successfully received message.
- assertThat(getLastMessage(alice), is(testMessage));
- assertThat(getLastMessage(bob), is(testMessage));
- assertThat(getLastMessage(carol), is(testMessage));
+ testCluster.members.stream()
+ .map(member -> member.localConfiguration().getName())
+ .map(messageStorage::get)
+ .forEach(msg -> assertThat(msg, is(testMessage)));
+ }
+
+ /**
+ * Test graceful shutdown.
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testShutdown() throws Exception {
+ testShutdown0(false);
+ }
+
+ /**
+ * Test forceful shutdown.
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testForcefulShutdown() throws Exception {
+ testShutdown0(true);
+ }
+
+ /**
+ * Test shutdown.
+ * @param forceful Whether shutdown should be forceful.
+ * @throws Exception If failed.
+ */
+ private void testShutdown0(boolean forceful) throws Exception {
+ testCluster = new Cluster(2);
+ testCluster.startAwait();
+
+ ClusterService alice = testCluster.members.get(0);
+ ClusterService bob = testCluster.members.get(1);
+ String aliceName = alice.localConfiguration().getName();
CountDownLatch aliceShutdownLatch = new CountDownLatch(1);
@@ -113,35 +141,115 @@ class ITScaleCubeNetworkMessagingTest {
}
});
- alice.shutdown();
+ if (forceful)
+ stopForcefully(alice);
+ else
+ alice.shutdown();
- boolean aliceShutdownReceived = aliceShutdownLatch.await(3, TimeUnit.SECONDS);
+ boolean aliceShutdownReceived = aliceShutdownLatch.await(forceful ? 10 : 3, TimeUnit.SECONDS);
assertTrue(aliceShutdownReceived);
Collection<ClusterNode> networkMembers = bob.topologyService().allMembers();
- assertEquals(2, networkMembers.size());
+ assertEquals(1, networkMembers.size());
}
- /** */
- private NetworkMessage getLastMessage(ClusterService clusterService) {
- return messageStorage.get(clusterService.localConfiguration().getName());
+ /**
+ * Find cluster's transport and force it to stop.
+ * @param cluster Cluster to be shutdown.
+ * @throws Exception If failed to stop.
+ */
+ private static void stopForcefully(ClusterService cluster) throws Exception {
+ Field clusterImplField = cluster.getClass().getDeclaredField("val$cluster");
+ clusterImplField.setAccessible(true);
+
+ ClusterImpl clusterImpl = (ClusterImpl) clusterImplField.get(cluster);
+ Field transportField = clusterImpl.getClass().getDeclaredField("transport");
+ transportField.setAccessible(true);
+
+ Transport transport = (Transport) transportField.get(clusterImpl);
+ Method stop = transport.getClass().getDeclaredMethod("stop");
+ stop.setAccessible(true);
+
+ Mono<?> invoke = (Mono<?>) stop.invoke(transport);
+ invoke.block();
}
- /** */
- private ClusterService startNetwork(String name, int port, List<String> addresses) {
- var context = new ClusterLocalConfiguration(name, port, addresses, SERIALIZATION_REGISTRY);
+ /**
+ * Wrapper for cluster.
+ */
+ private static final class Cluster {
+ /** */
+ private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
+ /** */
+ private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
- ClusterService clusterService = NETWORK_FACTORY.createClusterService(context);
+ /** */
+ final List<ClusterService> members;
- clusterService.messagingService().addMessageHandler((message, sender, correlationId) -> {
- messageStorage.put(name, message);
- });
+ /** */
+ private final CountDownLatch startupLatch;
+
+ /** Constructor. */
+ Cluster(int numOfNodes) {
+ startupLatch = new CountDownLatch(numOfNodes - 1);
- clusterService.start();
+ int initialPort = 3344;
- startedMembers.add(clusterService);
+ List<String> addresses = IntStream.range(0, numOfNodes)
+ .mapToObj(i -> String.format("localhost:%d", initialPort + i))
+ .collect(Collectors.toUnmodifiableList());
- return clusterService;
+ members = IntStream.range(0, numOfNodes)
+ .mapToObj(i -> startNode("Node #" + i, initialPort + i, addresses, i == 0))
+ .collect(Collectors.toUnmodifiableList());
+ }
+
+ /**
+ * Start cluster node.
+ *
+ * @param name Node name.
+ * @param port Node port.
+ * @param addresses Addresses of other nodes.
+ * @param initial Whether this node is the first one.
+ * @return Started cluster node.
+ */
+ private ClusterService startNode(String name, int port, List<String> addresses, boolean initial) {
+ var context = new ClusterLocalConfiguration(name, port, addresses, SERIALIZATION_REGISTRY);
+
+ ClusterService clusterService = NETWORK_FACTORY.createClusterService(context);
+
+ if (initial)
+ clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
+ /** {@inheritDoc} */
+ @Override public void onAppeared(ClusterNode member) {
+ startupLatch.countDown();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisappeared(ClusterNode member) {
+ }
+ });
+
+ return clusterService;
+ }
+
+ /**
+ * Start and wait for cluster to come up.
+ * @throws InterruptedException If failed.
+ */
+ void startAwait() throws InterruptedException {
+ members.forEach(ClusterService::start);
+
+ if (!startupLatch.await(3, TimeUnit.SECONDS))
+ throw new AssertionError();
+ }
+
+ /**
+ * Shutdown cluster.
+ */
+ void shutdown() {
+ members.forEach(ClusterService::shutdown);
+ }
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 4dd7d5f..74af277 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -32,6 +32,7 @@ import org.apache.ignite.network.NetworkConfigurationException;
/**
* {@link ClusterServiceFactory} implementation that uses ScaleCube for messaging and topology services.
+ * TODO: IGNITE-14538: This factory should use ScaleCube configuration instead of default parameters.
*/
public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
/** {@inheritDoc} */
@@ -53,7 +54,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
})
.config(opts -> opts.memberAlias(context.getName()))
.transport(opts -> opts.port(context.getPort()))
- .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())));
+ .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())).suspicionMult(1));
// resolve cyclic dependencies
messagingService.setCluster(cluster);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index 73654b2..31eaaf7 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -52,34 +52,61 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
*/
void onMembershipEvent(MembershipEvent event) {
ClusterNode member = fromMember(event.member());
- for (TopologyEventHandler handler : getEventHandlers()) {
- switch (event.type()) {
- case ADDED:
- members.put(member.name(), member);
- handler.onAppeared(member);
+ String memberName = member.name();
- break;
+ switch (event.type()) {
+ case ADDED:
+ members.put(memberName, member);
- case LEAVING:
- members.remove(member.name());
+ fireAppearedEvent(member);
- handler.onDisappeared(member);
+ break;
- break;
+ case LEAVING:
+ members.remove(memberName);
- case REMOVED:
- case UPDATED:
- // No-op.
- break;
+ fireDisappearedEvent(member);
- default:
- throw new IgniteInternalException("This event is not supported: event = " + event);
+ break;
+
+ case REMOVED:
+ // In case if member left non-gracefully, without sending LEAVING event.
+ if (members.remove(memberName) != null)
+ fireDisappearedEvent(member);
+
+ break;
+
+ case UPDATED:
+ // No-op.
+ break;
+
+ default:
+ throw new IgniteInternalException("This event is not supported: event = " + event);
- }
}
}
+ /**
+ * Fire a cluster member appearance event.
+ *
+ * @param member Appeared cluster member.
+ */
+ private void fireAppearedEvent(ClusterNode member) {
+ for (TopologyEventHandler handler : getEventHandlers())
+ handler.onAppeared(member);
+ }
+
+ /**
+ * Fire a cluster member disappearance event.
+ *
+ * @param member Disappeared cluster member.
+ */
+ private void fireDisappearedEvent(ClusterNode member) {
+ for (TopologyEventHandler handler : getEventHandlers())
+ handler.onDisappeared(member);
+ }
+
/** {@inheritDoc} */
@Override public ClusterNode localMember() {
return localMember;