You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/04/12 20:30:27 UTC

[GitHub] [ignite-3] SammyVimes opened a new pull request #92: IGNITE-14517 Fix ScaleCube's topology change handling

SammyVimes opened a new pull request #92:
URL: https://github.com/apache/ignite-3/pull/92


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #92: IGNITE-14517 Fix ScaleCube's topology change handling

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #92:
URL: https://github.com/apache/ignite-3/pull/92#discussion_r612207117



##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
##########
@@ -88,13 +92,37 @@ public void messageWasSentToAllMembersSuccessfully() throws Exception {
             alice.messagingService().weakSend(member, testMessage);
         }
 
-        boolean done = latch.await(3, TimeUnit.SECONDS);
-        assertTrue(done);
+        boolean messagesReceived = messageReceivedLatch.await(3, TimeUnit.SECONDS);
+        assertTrue(messagesReceived);
 
         //Then: All members successfully received message.
         assertThat(getLastMessage(alice), is(testMessage));
         assertThat(getLastMessage(bob), is(testMessage));
         assertThat(getLastMessage(carol), is(testMessage));
+
+        CountDownLatch aliceShutdownLatch = new CountDownLatch(1);
+
+        bob.topologyService().addEventHandler(new TopologyEventHandler() {
+            /** {@inheritDoc} */
+            @Override public void onAppeared(ClusterNode member) {
+                // No-op.
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onDisappeared(ClusterNode member) {
+                if (aliceName.equals(member.name()))
+                    aliceShutdownLatch.countDown();
+            }
+        });
+
+        alice.shutdown();
+
+        boolean aliceShutdownReceived = aliceShutdownLatch.await(3, TimeUnit.SECONDS);
+        assertTrue(aliceShutdownReceived);
+
+        Collection<ClusterNode> networkMembers = bob.topologyService().allMembers();

Review comment:
       I think this code should be extracted into a separate test

##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
##########
@@ -30,49 +32,62 @@
  * Implementation of {@link TopologyService} based on ScaleCube.
  */
 final class ScaleCubeTopologyService extends AbstractTopologyService {
-    /** Inner representation a ScaleCube cluster. */
-    private Cluster cluster;
+    /** Local member node. */
+    private ClusterNode localMember;
+
+    /** Topology members. */
+    private Map<String, ClusterNode> members = new HashMap<>();

Review comment:
       ```suggestion
       private final Map<String, ClusterNode> members = new HashMap<>();
   ```

##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
##########
@@ -17,18 +17,18 @@
 
 package org.apache.ignite.network.scalecube;
 
-import java.util.List;
-import java.util.stream.Collectors;
 import io.scalecube.cluster.ClusterImpl;
 import io.scalecube.cluster.ClusterMessageHandler;
 import io.scalecube.cluster.membership.MembershipEvent;
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.net.Address;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkConfigurationException;
+import java.util.List;

Review comment:
       Looks like either my IDEA settings are incorrect or yours. Mine always puts `java.*` imports on top. Is it wrong?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
##########
@@ -30,49 +32,62 @@
  * Implementation of {@link TopologyService} based on ScaleCube.
  */
 final class ScaleCubeTopologyService extends AbstractTopologyService {
-    /** Inner representation a ScaleCube cluster. */
-    private Cluster cluster;
+    /** Local member node. */
+    private ClusterNode localMember;
+
+    /** Topology members. */
+    private Map<String, ClusterNode> members = new HashMap<>();

Review comment:
       are you sure it shouldn't be a `ConcurrentHashMap`?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
##########
@@ -41,41 +41,51 @@
 
         var cluster = new ClusterImpl()
             .handler(cl -> new ClusterMessageHandler() {
+                /** {@inheritDoc} */
                 @Override public void onMessage(Message message) {
                     messagingService.fireEvent(message);
                 }
 
+                /** {@inheritDoc} */
                 @Override public void onMembershipEvent(MembershipEvent event) {
-                    topologyService.fireEvent(event);
+                    topologyService.onMembershipEvent(event);
                 }
             })
             .config(opts -> opts.memberAlias(context.getName()))
             .transport(opts -> opts.port(context.getPort()))
             .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())));
 
         // resolve cyclic dependencies
-        topologyService.setCluster(cluster);
         messagingService.setCluster(cluster);
 
         return new AbstractClusterService(context, topologyService, messagingService) {
+            /** {@inheritDoc} */
             @Override public void start() {
                 cluster.startAwait();
+
+                topologyService.setLocalMember(cluster.member());

Review comment:
       why do you call this method here and not in place of `setCluster`?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
##########
@@ -30,49 +32,62 @@
  * Implementation of {@link TopologyService} based on ScaleCube.
  */
 final class ScaleCubeTopologyService extends AbstractTopologyService {
-    /** Inner representation a ScaleCube cluster. */
-    private Cluster cluster;
+    /** Local member node. */
+    private ClusterNode localMember;
+
+    /** Topology members. */
+    private Map<String, ClusterNode> members = new HashMap<>();
 
     /**
-     * Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
+     * Sets the ScaleCube's local {@link Member}.
      */
-    void setCluster(Cluster cluster) {
-        this.cluster = cluster;
+    void setLocalMember(Member member) {
+        this.localMember = fromMember(member);
+
+        this.members.put(localMember.name(), localMember);
     }
 
     /**
      * Delegates the received topology event to the registered event handlers.
      */
-    void fireEvent(MembershipEvent event) {
+    void onMembershipEvent(MembershipEvent event) {
         ClusterNode member = fromMember(event.member());
         for (TopologyEventHandler handler : getEventHandlers()) {
             switch (event.type()) {
                 case ADDED:
+                    members.put(member.name(), member);
+
                     handler.onAppeared(member);
+
                     break;
+
                 case LEAVING:
                 case REMOVED:

Review comment:
       maybe we should start treating these events separately? Currently `onDisappeared` is fired twice, which I don't think is entirely correct




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] SammyVimes commented on a change in pull request #92: IGNITE-14517 Fix ScaleCube's topology change handling

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on a change in pull request #92:
URL: https://github.com/apache/ignite-3/pull/92#discussion_r612406296



##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
##########
@@ -17,18 +17,18 @@
 
 package org.apache.ignite.network.scalecube;
 
-import java.util.List;
-import java.util.stream.Collectors;
 import io.scalecube.cluster.ClusterImpl;
 import io.scalecube.cluster.ClusterMessageHandler;
 import io.scalecube.cluster.membership.MembershipEvent;
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.net.Address;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkConfigurationException;
+import java.util.List;

Review comment:
       Dunno, I can see both import orders in project




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] SammyVimes commented on a change in pull request #92: IGNITE-14517 Fix ScaleCube's topology change handling

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on a change in pull request #92:
URL: https://github.com/apache/ignite-3/pull/92#discussion_r612414051



##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
##########
@@ -30,49 +32,62 @@
  * Implementation of {@link TopologyService} based on ScaleCube.
  */
 final class ScaleCubeTopologyService extends AbstractTopologyService {
-    /** Inner representation a ScaleCube cluster. */
-    private Cluster cluster;
+    /** Local member node. */
+    private ClusterNode localMember;
+
+    /** Topology members. */
+    private Map<String, ClusterNode> members = new HashMap<>();
 
     /**
-     * Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
+     * Sets the ScaleCube's local {@link Member}.
      */
-    void setCluster(Cluster cluster) {
-        this.cluster = cluster;
+    void setLocalMember(Member member) {
+        this.localMember = fromMember(member);
+
+        this.members.put(localMember.name(), localMember);
     }
 
     /**
      * Delegates the received topology event to the registered event handlers.
      */
-    void fireEvent(MembershipEvent event) {
+    void onMembershipEvent(MembershipEvent event) {
         ClusterNode member = fromMember(event.member());
         for (TopologyEventHandler handler : getEventHandlers()) {
             switch (event.type()) {
                 case ADDED:
+                    members.put(member.name(), member);
+
                     handler.onAppeared(member);
+
                     break;
+
                 case LEAVING:
                 case REMOVED:

Review comment:
       I suppose we can examine the map and look whether it still contains disappearing member (in case if node was not stopped gracefully)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] SammyVimes commented on a change in pull request #92: IGNITE-14517 Fix ScaleCube's topology change handling

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on a change in pull request #92:
URL: https://github.com/apache/ignite-3/pull/92#discussion_r612413185



##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
##########
@@ -41,41 +41,51 @@
 
         var cluster = new ClusterImpl()
             .handler(cl -> new ClusterMessageHandler() {
+                /** {@inheritDoc} */
                 @Override public void onMessage(Message message) {
                     messagingService.fireEvent(message);
                 }
 
+                /** {@inheritDoc} */
                 @Override public void onMembershipEvent(MembershipEvent event) {
-                    topologyService.fireEvent(event);
+                    topologyService.onMembershipEvent(event);
                 }
             })
             .config(opts -> opts.memberAlias(context.getName()))
             .transport(opts -> opts.port(context.getPort()))
             .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())));
 
         // resolve cyclic dependencies
-        topologyService.setCluster(cluster);
         messagingService.setCluster(cluster);
 
         return new AbstractClusterService(context, topologyService, messagingService) {
+            /** {@inheritDoc} */
             @Override public void start() {
                 cluster.startAwait();
+
+                topologyService.setLocalMember(cluster.member());

Review comment:
       Cluster is not started hence there is no local member at the moment of the creation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] sashapolo commented on a change in pull request #92: IGNITE-14517 Fix ScaleCube's topology change handling

Posted by GitBox <gi...@apache.org>.
sashapolo commented on a change in pull request #92:
URL: https://github.com/apache/ignite-3/pull/92#discussion_r612463398



##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
##########
@@ -88,13 +92,37 @@ public void messageWasSentToAllMembersSuccessfully() throws Exception {
             alice.messagingService().weakSend(member, testMessage);
         }
 
-        boolean done = latch.await(3, TimeUnit.SECONDS);
-        assertTrue(done);
+        boolean messagesReceived = messageReceivedLatch.await(3, TimeUnit.SECONDS);
+        assertTrue(messagesReceived);
 
         //Then: All members successfully received message.
         assertThat(getLastMessage(alice), is(testMessage));
         assertThat(getLastMessage(bob), is(testMessage));
         assertThat(getLastMessage(carol), is(testMessage));
+
+        CountDownLatch aliceShutdownLatch = new CountDownLatch(1);
+
+        bob.topologyService().addEventHandler(new TopologyEventHandler() {
+            /** {@inheritDoc} */
+            @Override public void onAppeared(ClusterNode member) {
+                // No-op.
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onDisappeared(ClusterNode member) {
+                if (aliceName.equals(member.name()))
+                    aliceShutdownLatch.countDown();
+            }
+        });
+
+        alice.shutdown();
+
+        boolean aliceShutdownReceived = aliceShutdownLatch.await(3, TimeUnit.SECONDS);
+        assertTrue(aliceShutdownReceived);
+
+        Collection<ClusterNode> networkMembers = bob.topologyService().allMembers();

Review comment:
       I would suggest splitting it anyway




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] asfgit closed pull request #92: IGNITE-14517 Fix ScaleCube's topology change handling

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #92:
URL: https://github.com/apache/ignite-3/pull/92


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] SammyVimes commented on a change in pull request #92: IGNITE-14517 Fix ScaleCube's topology change handling

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on a change in pull request #92:
URL: https://github.com/apache/ignite-3/pull/92#discussion_r612401184



##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
##########
@@ -30,49 +32,62 @@
  * Implementation of {@link TopologyService} based on ScaleCube.
  */
 final class ScaleCubeTopologyService extends AbstractTopologyService {
-    /** Inner representation a ScaleCube cluster. */
-    private Cluster cluster;
+    /** Local member node. */
+    private ClusterNode localMember;
+
+    /** Topology members. */
+    private Map<String, ClusterNode> members = new HashMap<>();

Review comment:
       True, it should be




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite-3] SammyVimes commented on a change in pull request #92: IGNITE-14517 Fix ScaleCube's topology change handling

Posted by GitBox <gi...@apache.org>.
SammyVimes commented on a change in pull request #92:
URL: https://github.com/apache/ignite-3/pull/92#discussion_r612415052



##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
##########
@@ -88,13 +92,37 @@ public void messageWasSentToAllMembersSuccessfully() throws Exception {
             alice.messagingService().weakSend(member, testMessage);
         }
 
-        boolean done = latch.await(3, TimeUnit.SECONDS);
-        assertTrue(done);
+        boolean messagesReceived = messageReceivedLatch.await(3, TimeUnit.SECONDS);
+        assertTrue(messagesReceived);
 
         //Then: All members successfully received message.
         assertThat(getLastMessage(alice), is(testMessage));
         assertThat(getLastMessage(bob), is(testMessage));
         assertThat(getLastMessage(carol), is(testMessage));
+
+        CountDownLatch aliceShutdownLatch = new CountDownLatch(1);
+
+        bob.topologyService().addEventHandler(new TopologyEventHandler() {
+            /** {@inheritDoc} */
+            @Override public void onAppeared(ClusterNode member) {
+                // No-op.
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onDisappeared(ClusterNode member) {
+                if (aliceName.equals(member.name()))
+                    aliceShutdownLatch.countDown();
+            }
+        });
+
+        alice.shutdown();
+
+        boolean aliceShutdownReceived = aliceShutdownLatch.await(3, TimeUnit.SECONDS);
+        assertTrue(aliceShutdownReceived);
+
+        Collection<ClusterNode> networkMembers = bob.topologyService().allMembers();

Review comment:
       I think this test is useless anyway and will be removed once we have a proper networking




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org