You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/11/22 14:46:55 UTC
[ignite-3] branch main updated: IGNITE-15965 Fix a race between cluster start and setting the local member (#463)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8a384c0 IGNITE-15965 Fix a race between cluster start and setting the local member (#463)
8a384c0 is described below
commit 8a384c021659b658f693e2f63a4b4c6d6ad2101d
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Mon Nov 22 17:46:47 2021 +0300
IGNITE-15965 Fix a race between cluster start and setting the local member (#463)
---
modules/network/pom.xml | 10 ++++
.../scalecube/ScaleCubeClusterServiceFactory.java | 64 +++++++---------------
.../scalecube/ScaleCubeMessagingService.java | 2 +-
.../scalecube/ScaleCubeTopologyService.java | 22 ++++----
parent/pom.xml | 32 ++---------
5 files changed, 48 insertions(+), 82 deletions(-)
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index 53cbe08..c9ce704 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -59,6 +59,16 @@
<artifactId>scalecube-cluster</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.ignite</groupId>
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 8b5b4a6..7e420c6 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -23,15 +23,9 @@ import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
-import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanRegistrationException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
import org.apache.ignite.configuration.schemas.network.ClusterMembershipView;
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.configuration.schemas.network.NetworkView;
@@ -55,22 +49,11 @@ import org.apache.ignite.network.serialization.MessageSerializationRegistry;
public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
/** {@inheritDoc} */
@Override
- public ClusterService createClusterService(
- ClusterLocalConfiguration context,
- NetworkConfiguration networkConfiguration
- ) {
- String consistentId = context.getName();
-
+ public ClusterService createClusterService(ClusterLocalConfiguration context, NetworkConfiguration networkConfiguration) {
var topologyService = new ScaleCubeTopologyService();
var messagingService = new ScaleCubeMessagingService();
- var messageFactory = new NetworkMessagesFactory();
-
- MessageSerializationRegistry registry = context.getSerializationRegistry();
-
- UUID launchId = UUID.randomUUID();
-
return new AbstractClusterService(context, topologyService, messagingService) {
private volatile ClusterImpl cluster;
@@ -79,10 +62,18 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
/** {@inheritDoc} */
@Override
public void start() {
- NetworkView networkConfigurationView = networkConfiguration.value();
+ String consistentId = context.getName();
+
+ MessageSerializationRegistry registry = context.getSerializationRegistry();
+
+ UUID launchId = UUID.randomUUID();
+
+ var messageFactory = new NetworkMessagesFactory();
- this.connectionMgr = new ConnectionManager(
- networkConfigurationView,
+ NetworkView configView = networkConfiguration.value();
+
+ connectionMgr = new ConnectionManager(
+ configView,
registry,
consistentId,
() -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
@@ -91,9 +82,9 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr, topologyService, messageFactory);
- NodeFinder finder = NodeFinderFactory.createNodeFinder(networkConfigurationView.nodeFinder());
+ NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder());
- this.cluster = new ClusterImpl(clusterConfig(networkConfigurationView.membership()))
+ cluster = new ClusterImpl(clusterConfig(configView.membership()))
.handler(cl -> new ClusterMessageHandler() {
/** {@inheritDoc} */
@Override
@@ -113,12 +104,16 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
// resolve cyclic dependencies
messagingService.setCluster(cluster);
+ topologyService.setCluster(cluster);
connectionMgr.start();
cluster.startAwait();
- topologyService.setLocalMember(cluster.member());
+ // emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
+ var localMembershipEvent = MembershipEvent.createAdded(cluster.member(), null, System.currentTimeMillis());
+
+ topologyService.onMembershipEvent(localMembershipEvent);
}
/** {@inheritDoc} */
@@ -129,10 +124,9 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
return;
}
- stopJmxMonitor();
-
cluster.shutdown();
cluster.onShutdown().block();
+
connectionMgr.stop();
}
@@ -147,24 +141,6 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
public boolean isStopped() {
return cluster.isShutdown();
}
-
- /**
- * Removes the JMX MBean registered by the "io.scalecube.cluster.ClusterImpl#startJmxMonitor()" method.
- * Current ScaleCube implementation does not do that which leads to memory leaks.
- */
- private void stopJmxMonitor() {
- MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-
- try {
- var pattern = new ObjectName("io.scalecube.cluster", "name", cluster.member().id() + "@*");
-
- for (ObjectName name : server.queryNames(pattern, null)) {
- server.unregisterMBean(name);
- }
- } catch (MalformedObjectNameException | InstanceNotFoundException | MBeanRegistrationException ignore) {
- // No-op.
- }
- }
};
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index 15d7479..12e776e 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -41,7 +41,7 @@ class ScaleCubeMessagingService extends AbstractMessagingService {
/**
* Inner representation of a ScaleCube cluster.
*/
- private Cluster cluster;
+ private volatile Cluster cluster;
/**
* Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index c10f00b..7b6e107 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -17,6 +17,7 @@
package org.apache.ignite.network.scalecube;
+import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MembershipEvent;
import java.util.Collection;
@@ -37,22 +38,21 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
/** Logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(ScaleCubeTopologyService.class);
- /** Local member node. */
- private ClusterNode localMember;
+ /**
+ * Inner representation of a ScaleCube cluster.
+ */
+ private volatile Cluster cluster;
/** Topology members. */
private final ConcurrentMap<NetworkAddress, ClusterNode> members = new ConcurrentHashMap<>();
/**
- * Sets the ScaleCube's local {@link Member}.
+ * Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
*
- * @param member Local member.
+ * @param cluster Cluster.
*/
- void setLocalMember(Member member) {
- localMember = fromMember(member);
-
- // emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
- onMembershipEvent(MembershipEvent.createAdded(member, null, System.currentTimeMillis()));
+ void setCluster(Cluster cluster) {
+ this.cluster = cluster;
}
/**
@@ -122,9 +122,11 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
/** {@inheritDoc} */
@Override
public ClusterNode localMember() {
+ Member localMember = cluster.member();
+
assert localMember != null : "Cluster has not been started";
- return localMember;
+ return fromMember(localMember);
}
/** {@inheritDoc} */
diff --git a/parent/pom.xml b/parent/pom.xml
index 0949a9f..658d435 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -53,7 +53,7 @@
<gson.version>2.8.6</gson.version>
<jackson.version>2.12.3</jackson.version>
<jansi.version>1.18</jansi.version>
- <netty.version>4.1.60.Final</netty.version>
+ <netty.version>4.1.70.Final</netty.version>
<javapoet.version>1.13.0</javapoet.version>
<javax.annotation.api.version>1.3.2</javax.annotation.api.version>
<jetbrains.annotations.version>20.1.0</jetbrains.annotations.version>
@@ -68,7 +68,7 @@
<spoon.framework.version>8.4.0-beta-18</spoon.framework.version>
<typesafe.version>1.4.1</typesafe.version>
<hamcrest.version>2.2</hamcrest.version>
- <scalecube.version>2.6.6</scalecube.version>
+ <scalecube.version>2.6.12</scalecube.version>
<calcite.version>1.27.0</calcite.version>
<janino.version>3.1.4</janino.version>
<avatica.version>1.18.0</avatica.version>
@@ -587,32 +587,10 @@
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty-common</artifactId>
- <version>${netty.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-buffer</artifactId>
- <version>${netty.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec</artifactId>
- <version>${netty.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-handler</artifactId>
- <version>${netty.version}</version>
- </dependency>
-
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-http</artifactId>
+ <artifactId>netty-bom</artifactId>
<version>${netty.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
</dependency>
<dependency>