You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/11/22 14:46:55 UTC

[ignite-3] branch main updated: IGNITE-15965 Fix a race between cluster start and setting the local member (#463)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a384c0  IGNITE-15965 Fix a race between cluster start and setting the local member (#463)
8a384c0 is described below

commit 8a384c021659b658f693e2f63a4b4c6d6ad2101d
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Mon Nov 22 17:46:47 2021 +0300

    IGNITE-15965 Fix a race between cluster start and setting the local member (#463)
---
 modules/network/pom.xml                            | 10 ++++
 .../scalecube/ScaleCubeClusterServiceFactory.java  | 64 +++++++---------------
 .../scalecube/ScaleCubeMessagingService.java       |  2 +-
 .../scalecube/ScaleCubeTopologyService.java        | 22 ++++----
 parent/pom.xml                                     | 32 ++---------
 5 files changed, 48 insertions(+), 82 deletions(-)

diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index 53cbe08..c9ce704 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -59,6 +59,16 @@
             <artifactId>scalecube-cluster</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-handler</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport</artifactId>
+        </dependency>
+
         <!-- Test dependencies -->
         <dependency>
             <groupId>org.apache.ignite</groupId>
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 8b5b4a6..7e420c6 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -23,15 +23,9 @@ import io.scalecube.cluster.ClusterMessageHandler;
 import io.scalecube.cluster.membership.MembershipEvent;
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.net.Address;
-import java.lang.management.ManagementFactory;
 import java.util.List;
 import java.util.UUID;
 import java.util.stream.Collectors;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanRegistrationException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
 import org.apache.ignite.configuration.schemas.network.ClusterMembershipView;
 import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
 import org.apache.ignite.configuration.schemas.network.NetworkView;
@@ -55,22 +49,11 @@ import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
     /** {@inheritDoc} */
     @Override
-    public ClusterService createClusterService(
-            ClusterLocalConfiguration context,
-            NetworkConfiguration networkConfiguration
-    ) {
-        String consistentId = context.getName();
-
+    public ClusterService createClusterService(ClusterLocalConfiguration context, NetworkConfiguration networkConfiguration) {
         var topologyService = new ScaleCubeTopologyService();
 
         var messagingService = new ScaleCubeMessagingService();
 
-        var messageFactory = new NetworkMessagesFactory();
-
-        MessageSerializationRegistry registry = context.getSerializationRegistry();
-
-        UUID launchId = UUID.randomUUID();
-
         return new AbstractClusterService(context, topologyService, messagingService) {
             private volatile ClusterImpl cluster;
 
@@ -79,10 +62,18 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
             /** {@inheritDoc} */
             @Override
             public void start() {
-                NetworkView networkConfigurationView = networkConfiguration.value();
+                String consistentId = context.getName();
+
+                MessageSerializationRegistry registry = context.getSerializationRegistry();
+
+                UUID launchId = UUID.randomUUID();
+
+                var messageFactory = new NetworkMessagesFactory();
 
-                this.connectionMgr = new ConnectionManager(
-                        networkConfigurationView,
+                NetworkView configView = networkConfiguration.value();
+
+                connectionMgr = new ConnectionManager(
+                        configView,
                         registry,
                         consistentId,
                         () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
@@ -91,9 +82,9 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
 
                 var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr, topologyService, messageFactory);
 
-                NodeFinder finder = NodeFinderFactory.createNodeFinder(networkConfigurationView.nodeFinder());
+                NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder());
 
-                this.cluster = new ClusterImpl(clusterConfig(networkConfigurationView.membership()))
+                cluster = new ClusterImpl(clusterConfig(configView.membership()))
                         .handler(cl -> new ClusterMessageHandler() {
                             /** {@inheritDoc} */
                             @Override
@@ -113,12 +104,16 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
 
                 // resolve cyclic dependencies
                 messagingService.setCluster(cluster);
+                topologyService.setCluster(cluster);
 
                 connectionMgr.start();
 
                 cluster.startAwait();
 
-                topologyService.setLocalMember(cluster.member());
+                // emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
+                var localMembershipEvent = MembershipEvent.createAdded(cluster.member(), null, System.currentTimeMillis());
+
+                topologyService.onMembershipEvent(localMembershipEvent);
             }
 
             /** {@inheritDoc} */
@@ -129,10 +124,9 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
                     return;
                 }
 
-                stopJmxMonitor();
-
                 cluster.shutdown();
                 cluster.onShutdown().block();
+
                 connectionMgr.stop();
             }
 
@@ -147,24 +141,6 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
             public boolean isStopped() {
                 return cluster.isShutdown();
             }
-
-            /**
-             * Removes the JMX MBean registered by the "io.scalecube.cluster.ClusterImpl#startJmxMonitor()" method.
-             * Current ScaleCube implementation does not do that which leads to memory leaks.
-             */
-            private void stopJmxMonitor() {
-                MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-
-                try {
-                    var pattern = new ObjectName("io.scalecube.cluster", "name", cluster.member().id() + "@*");
-
-                    for (ObjectName name : server.queryNames(pattern, null)) {
-                        server.unregisterMBean(name);
-                    }
-                } catch (MalformedObjectNameException | InstanceNotFoundException | MBeanRegistrationException ignore) {
-                    // No-op.
-                }
-            }
         };
     }
 
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index 15d7479..12e776e 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -41,7 +41,7 @@ class ScaleCubeMessagingService extends AbstractMessagingService {
     /**
      * Inner representation of a ScaleCube cluster.
      */
-    private Cluster cluster;
+    private volatile Cluster cluster;
     
     /**
      * Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index c10f00b..7b6e107 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.network.scalecube;
 
+import io.scalecube.cluster.Cluster;
 import io.scalecube.cluster.Member;
 import io.scalecube.cluster.membership.MembershipEvent;
 import java.util.Collection;
@@ -37,22 +38,21 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
     /** Logger. */
     private static final IgniteLogger LOG = IgniteLogger.forClass(ScaleCubeTopologyService.class);
 
-    /** Local member node. */
-    private ClusterNode localMember;
+    /**
+     * Inner representation of a ScaleCube cluster.
+     */
+    private volatile Cluster cluster;
 
     /** Topology members. */
     private final ConcurrentMap<NetworkAddress, ClusterNode> members = new ConcurrentHashMap<>();
 
     /**
-     * Sets the ScaleCube's local {@link Member}.
+     * Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
      *
-     * @param member Local member.
+     * @param cluster Cluster.
      */
-    void setLocalMember(Member member) {
-        localMember = fromMember(member);
-
-        // emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
-        onMembershipEvent(MembershipEvent.createAdded(member, null, System.currentTimeMillis()));
+    void setCluster(Cluster cluster) {
+        this.cluster = cluster;
     }
 
     /**
@@ -122,9 +122,11 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
     /** {@inheritDoc} */
     @Override
     public ClusterNode localMember() {
+        Member localMember = cluster.member();
+
         assert localMember != null : "Cluster has not been started";
 
-        return localMember;
+        return fromMember(localMember);
     }
 
     /** {@inheritDoc} */
diff --git a/parent/pom.xml b/parent/pom.xml
index 0949a9f..658d435 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -53,7 +53,7 @@
         <gson.version>2.8.6</gson.version>
         <jackson.version>2.12.3</jackson.version>
         <jansi.version>1.18</jansi.version>
-        <netty.version>4.1.60.Final</netty.version>
+        <netty.version>4.1.70.Final</netty.version>
         <javapoet.version>1.13.0</javapoet.version>
         <javax.annotation.api.version>1.3.2</javax.annotation.api.version>
         <jetbrains.annotations.version>20.1.0</jetbrains.annotations.version>
@@ -68,7 +68,7 @@
         <spoon.framework.version>8.4.0-beta-18</spoon.framework.version>
         <typesafe.version>1.4.1</typesafe.version>
         <hamcrest.version>2.2</hamcrest.version>
-        <scalecube.version>2.6.6</scalecube.version>
+        <scalecube.version>2.6.12</scalecube.version>
         <calcite.version>1.27.0</calcite.version>
         <janino.version>3.1.4</janino.version>
         <avatica.version>1.18.0</avatica.version>
@@ -587,32 +587,10 @@
 
             <dependency>
                 <groupId>io.netty</groupId>
-                <artifactId>netty-common</artifactId>
-                <version>${netty.version}</version>
-            </dependency>
-
-            <dependency>
-                <groupId>io.netty</groupId>
-                <artifactId>netty-buffer</artifactId>
-                <version>${netty.version}</version>
-            </dependency>
-
-            <dependency>
-                <groupId>io.netty</groupId>
-                <artifactId>netty-codec</artifactId>
-                <version>${netty.version}</version>
-            </dependency>
-
-            <dependency>
-                <groupId>io.netty</groupId>
-                <artifactId>netty-handler</artifactId>
-                <version>${netty.version}</version>
-            </dependency>
-
-            <dependency>
-                <groupId>io.netty</groupId>
-                <artifactId>netty-codec-http</artifactId>
+                <artifactId>netty-bom</artifactId>
                 <version>${netty.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
             </dependency>
 
             <dependency>