You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/11/22 18:07:30 UTC
[ignite-3] branch ignite-15307 updated: NettyBootstrapFactory implements IgniteComponent
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch ignite-15307
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-15307 by this push:
new 395a57c NettyBootstrapFactory implements IgniteComponent
395a57c is described below
commit 395a57ce7c265d191982167aacbce5bf2244a8e7
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Nov 22 21:07:22 2021 +0300
NettyBootstrapFactory implements IgniteComponent
---
.../ignite/network/ClusterServiceFactory.java | 37 +++++++++++++++++
.../network/recovery/ItRecoveryHandshakeTest.java | 4 +-
.../ignite/utils/ClusterServiceTestUtils.java | 3 +-
.../internal/network/netty/ConnectionManager.java | 15 +++++++
.../ignite/network/NettyBootstrapFactory.java | 47 ++++++++++++++--------
.../scalecube/ScaleCubeClusterServiceFactory.java | 24 +++++++++--
.../internal/network/netty/NettyServerTest.java | 19 +--------
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
8 files changed, 111 insertions(+), 40 deletions(-)
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
new file mode 100644
index 0000000..46dfb9c
--- /dev/null
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+
+/**
+ * Cluster service factory.
+ */
+public interface ClusterServiceFactory {
+ /**
+ * Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
+ *
+ * @param context Cluster context.
+ * @param networkConfiguration Network configuration.
+ * @return New cluster service.
+ */
+ ClusterService createClusterService(
+ ClusterLocalConfiguration context,
+ NetworkConfiguration networkConfiguration
+ );
+}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
index a65c165..f2748a4 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.handshake.HandshakeAction;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.TestMessagesFactory;
@@ -406,7 +407,8 @@ public class ItRecoveryHandshakeTest {
registry,
consistentId,
() -> new FailingRecoveryServerHandshakeManager(launchId, consistentId, serverHandshakeFailAt, messageFactory),
- () -> new FailingRecoveryClientHandshakeManager(launchId, consistentId, clientHandshakeFailAt, messageFactory)
+ () -> new FailingRecoveryClientHandshakeManager(launchId, consistentId, clientHandshakeFailAt, messageFactory),
+ new NettyBootstrapFactory(cfg, consistentId)
);
manager.start();
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java b/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
index 59d8e49..10e6f01 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
@@ -74,8 +74,7 @@ public class ClusterServiceTestUtils {
var clusterSvc = clusterSvcFactory.createClusterService(
ctx,
- nodeConfigurationMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY),
- nettyBootstrapFactory);
+ nodeConfigurationMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY));
assert nodeFinder instanceof StaticNodeFinder : "Only StaticNodeFinder is supported at the moment";
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 0b51cd8..3cf9a35 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -57,6 +57,9 @@ public class ConnectionManager {
/** Latest version of the direct marshalling protocol. */
public static final byte DIRECT_PROTOCOL_VERSION = 1;
+ /** Bootstrap factory. */
+ private final NettyBootstrapFactory bootstrapFactory;
+
/** Client bootstrap. */
private final Bootstrap clientBootstrap;
@@ -120,6 +123,8 @@ public class ConnectionManager {
);
this.clientBootstrap = bootstrapFactory.createClientBootstrap();
+
+ this.bootstrapFactory = bootstrapFactory;
}
/**
@@ -318,6 +323,16 @@ public class ConnectionManager {
}
/**
+ * Returns the bootstrap factory.
+ *
+ * @return Bootstrap factory.
+ */
+ @TestOnly
+ public NettyBootstrapFactory bootstrapFactory() {
+ return bootstrapFactory;
+ }
+
+ /**
* Creates a {@link Bootstrap} for clients with channel options provided by a {@link OutboundView}.
*
* @param eventLoopGroup Event loop group for channel handling.
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
index 4995802..4108651 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
@@ -23,49 +23,47 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.configuration.schemas.network.InboundView;
import org.apache.ignite.configuration.schemas.network.NetworkView;
import org.apache.ignite.configuration.schemas.network.OutboundView;
+import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.netty.NamedNioEventLoopGroup;
/**
* Netty bootstrap factory.
*/
-public class NettyBootstrapFactory {
+public class NettyBootstrapFactory implements IgniteComponent {
/** Network configuration. */
private final NetworkView networkConfiguration;
+ /** Prefix for event loop group names. */
+ private final String eventLoopGroupNamePrefix;
+
/** Server boss socket channel handler event loop group. */
- private final EventLoopGroup bossGroup;
+ private EventLoopGroup bossGroup;
/** Server work socket channel handler event loop group. */
- private final EventLoopGroup workerGroup;
+ private EventLoopGroup workerGroup;
/** Client socket channel handler event loop group. */
- private final EventLoopGroup clientWorkerGroup;
+ private EventLoopGroup clientWorkerGroup;
/**
* Constructor.
*
- * @param networkConfiguration Network configuration.
- * @param consistentId Consistent id of this node.
+ * @param networkConfiguration Network configuration.
+ * @param eventLoopGroupNamePrefix Prefix for event loop group names.
*/
public NettyBootstrapFactory(
NetworkView networkConfiguration,
- String consistentId
+ String eventLoopGroupNamePrefix
) {
- assert consistentId != null;
+ assert eventLoopGroupNamePrefix != null;
assert networkConfiguration != null;
this.networkConfiguration = networkConfiguration;
-
- bossGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-accept");
- workerGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-worker");
- clientWorkerGroup = NamedNioEventLoopGroup.create(consistentId + "-client");
-
- // TODO: When do we stop everything?
- // TODO: IGNITE-14538 quietPeriod and timeout should be configurable.
- // clientWorkerGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS).sync();
+ this.eventLoopGroupNamePrefix = eventLoopGroupNamePrefix;
}
/**
@@ -131,4 +129,21 @@ public class NettyBootstrapFactory {
return serverBootstrap;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public void start() {
+ bossGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-srv-accept");
+ workerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-srv-worker");
+ clientWorkerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-client");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void stop() throws Exception {
+ // TODO: IGNITE-14538 quietPeriod and timeout should be configurable.
+ clientWorkerGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS).sync();
+ workerGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS).sync();
+ bossGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS).sync();
+ }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 65dbaff..32f7fb5 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManage
import org.apache.ignite.network.AbstractClusterService;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeFinder;
@@ -44,10 +45,27 @@ import org.apache.ignite.network.NodeFinderFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
/**
- * ScaleCube cluster service factory for messaging and topology services.
+ * {@link ClusterServiceFactory} implementation that uses ScaleCube for messaging and topology services.
*/
-public class ScaleCubeClusterServiceFactory {
+public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
/** {@inheritDoc} */
+ @Override
+ public ClusterService createClusterService(ClusterLocalConfiguration context, NetworkConfiguration networkConfiguration) {
+ final String consistentId = context.getName();
+
+ var nettyBootstrapFactory = new NettyBootstrapFactory(networkConfiguration.value(), consistentId);
+
+ return createClusterService(context, networkConfiguration, nettyBootstrapFactory);
+ }
+
+ /**
+ * Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
+ *
+ * @param context Cluster context.
+ * @param networkConfiguration Network configuration.
+ * @param nettyBootstrapFactory Bootstrap factory.
+ * @return New cluster service.
+ */
public ClusterService createClusterService(
ClusterLocalConfiguration context,
NetworkConfiguration networkConfiguration,
@@ -56,7 +74,7 @@ public class ScaleCubeClusterServiceFactory {
var topologyService = new ScaleCubeTopologyService();
var messagingService = new ScaleCubeMessagingService();
-
+
return new AbstractClusterService(context, topologyService, messagingService) {
private volatile ClusterImpl cluster;
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index 46d1232..a799f3d 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -104,9 +104,6 @@ public class NettyServerTest {
server = getServer(channel.newSucceededFuture(), true);
server.stop().join();
-
- assertTrue(server.getBossGroup().isTerminated());
- assertTrue(server.getWorkerGroup().isTerminated());
}
/**
@@ -119,9 +116,6 @@ public class NettyServerTest {
var channel = new EmbeddedServerChannel();
server = getServer(channel.newFailedFuture(new ClosedChannelException()), false);
-
- assertTrue(server.getBossGroup().isTerminated());
- assertTrue(server.getWorkerGroup().isTerminated());
}
/**
@@ -136,9 +130,6 @@ public class NettyServerTest {
server = getServer(channel.newSucceededFuture(), true);
channel.close();
-
- assertTrue(server.getBossGroup().isShuttingDown());
- assertTrue(server.getWorkerGroup().isShuttingDown());
}
/**
@@ -159,9 +150,6 @@ public class NettyServerTest {
future.setSuccess(null);
stop.get(3, TimeUnit.SECONDS);
-
- assertTrue(server.getBossGroup().isTerminated());
- assertTrue(server.getWorkerGroup().isTerminated());
}
/**
@@ -222,8 +210,7 @@ public class NettyServerTest {
(socketAddress, message) -> {
},
registry,
- NamedNioEventLoopGroup.create("boss-"),
- NamedNioEventLoopGroup.create("worker-")
+ null
);
server.start().get(3, TimeUnit.SECONDS);
@@ -291,9 +278,7 @@ public class NettyServerTest {
() -> mock(HandshakeManager.class),
null,
null,
- null,
- NamedNioEventLoopGroup.create("boss-"),
- NamedNioEventLoopGroup.create("worker-")
+ null
);
try {
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index a31356a..44cd526 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -339,7 +339,7 @@ public class IgniteImpl implements Ignite {
if (explicitStop.get()) {
doStopNode(List.of(vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, metaStorageMgr, clusterCfgMgr, baselineMgr,
- distributedTblMgr, qryEngine, restModule, clientHandlerModule));
+ distributedTblMgr, qryEngine, restModule, clientHandlerModule, nettyBootstrapFactory));
}
}