You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/04/14 09:22:23 UTC
[ignite-3] branch main updated: IGNITE-14517 Fixed topology change
handling on graceful shutdown. - Fixes #92.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6b20869 IGNITE-14517 Fixed topology change handling on graceful shutdown. - Fixes #92.
6b20869 is described below
commit 6b2086977c730296caebcd3264bd75ac6a9dfcee
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Apr 14 12:21:20 2021 +0300
IGNITE-14517 Fixed topology change handling on graceful shutdown. - Fixes #92.
Signed-off-by: Alexey Scherbakov <al...@gmail.com>
---
.../scalecube/ITScaleCubeNetworkMessagingTest.java | 53 ++++++++++++++--------
.../scalecube/ScaleCubeClusterServiceFactory.java | 28 ++++++++----
.../scalecube/ScaleCubeTopologyService.java | 47 ++++++++++++-------
3 files changed, 83 insertions(+), 45 deletions(-)
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index c6fc15f..367e78b 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.network.scalecube;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -37,6 +38,7 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** */
@@ -67,13 +69,15 @@ class ITScaleCubeNetworkMessagingTest {
//Given: Three started member which are gathered to cluster.
List<String> addresses = List.of("localhost:3344", "localhost:3345", "localhost:3346");
- CountDownLatch latch = new CountDownLatch(3);
+ CountDownLatch messageReceivedLatch = new CountDownLatch(3);
- ClusterService alice = startNetwork("Alice", 3344, addresses);
+ String aliceName = "Alice";
+
+ ClusterService alice = startNetwork(aliceName, 3344, addresses);
ClusterService bob = startNetwork("Bob", 3345, addresses);
ClusterService carol = startNetwork("Carol", 3346, addresses);
- NetworkMessageHandler messageWaiter = (message, sender, correlationId) -> latch.countDown();
+ NetworkMessageHandler messageWaiter = (message, sender, correlationId) -> messageReceivedLatch.countDown();
alice.messagingService().addMessageHandler(messageWaiter);
bob.messagingService().addMessageHandler(messageWaiter);
@@ -83,18 +87,40 @@ class ITScaleCubeNetworkMessagingTest {
//When: Send one message to all members in cluster.
for (ClusterNode member : alice.topologyService().allMembers()) {
- System.out.println("SEND : " + member);
-
alice.messagingService().weakSend(member, testMessage);
}
- boolean done = latch.await(3, TimeUnit.SECONDS);
- assertTrue(done);
+ boolean messagesReceived = messageReceivedLatch.await(3, TimeUnit.SECONDS);
+ assertTrue(messagesReceived);
//Then: All members successfully received message.
assertThat(getLastMessage(alice), is(testMessage));
assertThat(getLastMessage(bob), is(testMessage));
assertThat(getLastMessage(carol), is(testMessage));
+
+ CountDownLatch aliceShutdownLatch = new CountDownLatch(1);
+
+ bob.topologyService().addEventHandler(new TopologyEventHandler() {
+ /** {@inheritDoc} */
+ @Override public void onAppeared(ClusterNode member) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisappeared(ClusterNode member) {
+ if (aliceName.equals(member.name()))
+ aliceShutdownLatch.countDown();
+ }
+ });
+
+ alice.shutdown();
+
+ boolean aliceShutdownReceived = aliceShutdownLatch.await(3, TimeUnit.SECONDS);
+ assertTrue(aliceShutdownReceived);
+
+ Collection<ClusterNode> networkMembers = bob.topologyService().allMembers();
+
+ assertEquals(2, networkMembers.size());
}
/** */
@@ -107,22 +133,9 @@ class ITScaleCubeNetworkMessagingTest {
var context = new ClusterLocalConfiguration(name, port, addresses, SERIALIZATION_REGISTRY);
ClusterService clusterService = NETWORK_FACTORY.createClusterService(context);
- System.out.println("-----" + name + " started");
clusterService.messagingService().addMessageHandler((message, sender, correlationId) -> {
messageStorage.put(name, message);
-
- System.out.println(name + " handled messages : " + message);
- });
-
- clusterService.topologyService().addEventHandler(new TopologyEventHandler() {
- @Override public void onAppeared(ClusterNode member) {
- System.out.println(name + " found member : " + member);
- }
-
- @Override public void onDisappeared(ClusterNode member) {
- System.out.println(name + " lost member : " + member);
- }
});
clusterService.start();
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 6b473a0..4dd7d5f 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -17,18 +17,18 @@
package org.apache.ignite.network.scalecube;
-import java.util.List;
-import java.util.stream.Collectors;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkConfigurationException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.ignite.network.AbstractClusterService;
import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
-import org.apache.ignite.network.AbstractClusterService;
+import org.apache.ignite.network.NetworkConfigurationException;
/**
* {@link ClusterServiceFactory} implementation that uses ScaleCube for messaging and topology services.
@@ -41,12 +41,14 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
var cluster = new ClusterImpl()
.handler(cl -> new ClusterMessageHandler() {
+ /** {@inheritDoc} */
@Override public void onMessage(Message message) {
messagingService.fireEvent(message);
}
+ /** {@inheritDoc} */
@Override public void onMembershipEvent(MembershipEvent event) {
- topologyService.fireEvent(event);
+ topologyService.onMembershipEvent(event);
}
})
.config(opts -> opts.memberAlias(context.getName()))
@@ -54,14 +56,17 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
.membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())));
// resolve cyclic dependencies
- topologyService.setCluster(cluster);
messagingService.setCluster(cluster);
return new AbstractClusterService(context, topologyService, messagingService) {
+ /** {@inheritDoc} */
@Override public void start() {
cluster.startAwait();
+
+ topologyService.setLocalMember(cluster.member());
}
+ /** {@inheritDoc} */
@Override public void shutdown() {
cluster.shutdown();
cluster.onShutdown().block();
@@ -69,13 +74,18 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
};
}
- /** */
+ /**
+ * Convert string addresses to ScaleCube's {@link Address}es.
+ * @param addresses "host:port" formatted strings.
+ * @return List of addresses.
+ */
private List<Address> parseAddresses(List<String> addresses) {
try {
return addresses.stream()
.map(Address::from)
.collect(Collectors.toList());
- } catch (IllegalArgumentException e) {
+ }
+ catch (IllegalArgumentException e) {
throw new NetworkConfigurationException("Failed to parse address", e);
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index da69281..73654b2 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -16,11 +16,13 @@
*/
package org.apache.ignite.network.scalecube;
-import java.util.Collection;
-import java.util.stream.Collectors;
-import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.AbstractTopologyService;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.TopologyEventHandler;
@@ -30,49 +32,62 @@ import org.apache.ignite.network.TopologyService;
* Implementation of {@link TopologyService} based on ScaleCube.
*/
final class ScaleCubeTopologyService extends AbstractTopologyService {
- /** Inner representation a ScaleCube cluster. */
- private Cluster cluster;
+ /** Local member node. */
+ private ClusterNode localMember;
+
+ /** Topology members. */
+ private final Map<String, ClusterNode> members = new ConcurrentHashMap<>();
/**
- * Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
+ * Sets the ScaleCube's local {@link Member}.
*/
- void setCluster(Cluster cluster) {
- this.cluster = cluster;
+ void setLocalMember(Member member) {
+ this.localMember = fromMember(member);
+
+ this.members.put(localMember.name(), localMember);
}
/**
* Delegates the received topology event to the registered event handlers.
*/
- void fireEvent(MembershipEvent event) {
+ void onMembershipEvent(MembershipEvent event) {
ClusterNode member = fromMember(event.member());
for (TopologyEventHandler handler : getEventHandlers()) {
switch (event.type()) {
case ADDED:
+ members.put(member.name(), member);
+
handler.onAppeared(member);
+
break;
+
case LEAVING:
- case REMOVED:
+ members.remove(member.name());
+
handler.onDisappeared(member);
+
break;
+
+ case REMOVED:
case UPDATED:
- // do nothing
+ // No-op.
break;
+
default:
- throw new RuntimeException("This event is not supported: event = " + event);
+ throw new IgniteInternalException("This event is not supported: event = " + event);
+
}
}
}
/** {@inheritDoc} */
@Override public ClusterNode localMember() {
- return fromMember(cluster.member());
+ return localMember;
}
/** {@inheritDoc} */
@Override public Collection<ClusterNode> allMembers() {
- return cluster.members().stream()
- .map(ScaleCubeTopologyService::fromMember)
- .collect(Collectors.toList());
+ return Collections.unmodifiableCollection(members.values());
}
/**