You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/11/22 18:07:30 UTC

[ignite-3] branch ignite-15307 updated: NettyBootstrapFactory implements IgniteComponent

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch ignite-15307
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-15307 by this push:
     new 395a57c  NettyBootstrapFactory implements IgniteComponent
395a57c is described below

commit 395a57ce7c265d191982167aacbce5bf2244a8e7
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Nov 22 21:07:22 2021 +0300

    NettyBootstrapFactory implements IgniteComponent
---
 .../ignite/network/ClusterServiceFactory.java      | 37 +++++++++++++++++
 .../network/recovery/ItRecoveryHandshakeTest.java  |  4 +-
 .../ignite/utils/ClusterServiceTestUtils.java      |  3 +-
 .../internal/network/netty/ConnectionManager.java  | 15 +++++++
 .../ignite/network/NettyBootstrapFactory.java      | 47 ++++++++++++++--------
 .../scalecube/ScaleCubeClusterServiceFactory.java  | 24 +++++++++--
 .../internal/network/netty/NettyServerTest.java    | 19 +--------
 .../org/apache/ignite/internal/app/IgniteImpl.java |  2 +-
 8 files changed, 111 insertions(+), 40 deletions(-)

diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
new file mode 100644
index 0000000..46dfb9c
--- /dev/null
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+
+/**
+ * Cluster service factory.
+ */
+public interface ClusterServiceFactory {
+    /**
+     * Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
+     *
+     * @param context              Cluster context.
+     * @param networkConfiguration Network configuration.
+     * @return New cluster service.
+     */
+    ClusterService createClusterService(
+            ClusterLocalConfiguration context,
+            NetworkConfiguration networkConfiguration
+    );
+}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
index a65c165..f2748a4 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.handshake.HandshakeAction;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
 import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
 import org.apache.ignite.network.TestMessagesFactory;
@@ -406,7 +407,8 @@ public class ItRecoveryHandshakeTest {
                 registry,
                 consistentId,
                 () -> new FailingRecoveryServerHandshakeManager(launchId, consistentId, serverHandshakeFailAt, messageFactory),
-                () -> new FailingRecoveryClientHandshakeManager(launchId, consistentId, clientHandshakeFailAt, messageFactory)
+                () -> new FailingRecoveryClientHandshakeManager(launchId, consistentId, clientHandshakeFailAt, messageFactory),
+                new NettyBootstrapFactory(cfg, consistentId)
         );
 
         manager.start();
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java b/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
index 59d8e49..10e6f01 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
@@ -74,8 +74,7 @@ public class ClusterServiceTestUtils {
         
         var clusterSvc = clusterSvcFactory.createClusterService(
                 ctx,
-                nodeConfigurationMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY),
-                nettyBootstrapFactory);
+                nodeConfigurationMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY));
         
         assert nodeFinder instanceof StaticNodeFinder : "Only StaticNodeFinder is supported at the moment";
         
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 0b51cd8..3cf9a35 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -57,6 +57,9 @@ public class ConnectionManager {
     /** Latest version of the direct marshalling protocol. */
     public static final byte DIRECT_PROTOCOL_VERSION = 1;
     
+    /** Bootstrap factory. */
+    private final NettyBootstrapFactory bootstrapFactory;
+    
     /** Client bootstrap. */
     private final Bootstrap clientBootstrap;
     
@@ -120,6 +123,8 @@ public class ConnectionManager {
         );
         
         this.clientBootstrap = bootstrapFactory.createClientBootstrap();
+        
+        this.bootstrapFactory = bootstrapFactory;
     }
     
     /**
@@ -318,6 +323,16 @@ public class ConnectionManager {
     }
     
     /**
+     * Returns the bootstrap factory.
+     *
+     * @return Bootstrap factory.
+     */
+    @TestOnly
+    public NettyBootstrapFactory bootstrapFactory() {
+        return bootstrapFactory;
+    }
+    
+    /**
      * Creates a {@link Bootstrap} for clients with channel options provided by a {@link OutboundView}.
      *
      * @param eventLoopGroup      Event loop group for channel handling.
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
index 4995802..4108651 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
@@ -23,49 +23,47 @@ import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.configuration.schemas.network.InboundView;
 import org.apache.ignite.configuration.schemas.network.NetworkView;
 import org.apache.ignite.configuration.schemas.network.OutboundView;
+import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.network.netty.NamedNioEventLoopGroup;
 
 /**
  * Netty bootstrap factory.
  */
-public class NettyBootstrapFactory {
+public class NettyBootstrapFactory implements IgniteComponent {
     /** Network configuration. */
     private final NetworkView networkConfiguration;
     
+    /** Prefix for event loop group names. */
+    private final String eventLoopGroupNamePrefix;
+    
     /** Server boss socket channel handler event loop group. */
-    private final EventLoopGroup bossGroup;
+    private EventLoopGroup bossGroup;
     
     /** Server work socket channel handler event loop group. */
-    private final EventLoopGroup workerGroup;
+    private EventLoopGroup workerGroup;
     
     /** Client socket channel handler event loop group. */
-    private final EventLoopGroup clientWorkerGroup;
+    private EventLoopGroup clientWorkerGroup;
     
     /**
      * Constructor.
      *
-     * @param networkConfiguration Network configuration.
-     * @param consistentId         Consistent id of this node.
+     * @param networkConfiguration     Network configuration.
+     * @param eventLoopGroupNamePrefix Prefix for event loop group names.
      */
     public NettyBootstrapFactory(
             NetworkView networkConfiguration,
-            String consistentId
+            String eventLoopGroupNamePrefix
     ) {
-        assert consistentId != null;
+        assert eventLoopGroupNamePrefix != null;
         assert networkConfiguration != null;
         
         this.networkConfiguration = networkConfiguration;
-        
-        bossGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-accept");
-        workerGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-worker");
-        clientWorkerGroup = NamedNioEventLoopGroup.create(consistentId + "-client");
-        
-        // TODO: When do we stop everything?
-        // TODO: IGNITE-14538 quietPeriod and timeout should be configurable.
-        // clientWorkerGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS).sync();
+        this.eventLoopGroupNamePrefix = eventLoopGroupNamePrefix;
     }
     
     /**
@@ -131,4 +129,21 @@ public class NettyBootstrapFactory {
                 
         return serverBootstrap;
     }
+    
+    /** {@inheritDoc} */
+    @Override
+    public void start() {
+        bossGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-srv-accept");
+        workerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-srv-worker");
+        clientWorkerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-client");
+    }
+    
+    /** {@inheritDoc} */
+    @Override
+    public void stop() throws Exception {
+        // TODO: IGNITE-14538 quietPeriod and timeout should be configurable.
+        clientWorkerGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS).sync();
+        workerGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS).sync();
+        bossGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS).sync();
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 65dbaff..32f7fb5 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManage
 import org.apache.ignite.network.AbstractClusterService;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NodeFinder;
@@ -44,10 +45,27 @@ import org.apache.ignite.network.NodeFinderFactory;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 
 /**
- * ScaleCube cluster service factory for messaging and topology services.
+ * {@link ClusterServiceFactory} implementation that uses ScaleCube for messaging and topology services.
  */
-public class ScaleCubeClusterServiceFactory {
+public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
     /** {@inheritDoc} */
+    @Override
+    public ClusterService createClusterService(ClusterLocalConfiguration context, NetworkConfiguration networkConfiguration) {
+        final String consistentId = context.getName();
+    
+        var nettyBootstrapFactory = new NettyBootstrapFactory(networkConfiguration.value(), consistentId);
+        
+        return createClusterService(context, networkConfiguration, nettyBootstrapFactory);
+    }
+    
+    /**
+     * Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
+     *
+     * @param context               Cluster context.
+     * @param networkConfiguration  Network configuration.
+     * @param nettyBootstrapFactory Bootstrap factory.
+     * @return New cluster service.
+     */
     public ClusterService createClusterService(
             ClusterLocalConfiguration context,
             NetworkConfiguration networkConfiguration,
@@ -56,7 +74,7 @@ public class ScaleCubeClusterServiceFactory {
         var topologyService = new ScaleCubeTopologyService();
 
         var messagingService = new ScaleCubeMessagingService();
-
+    
         return new AbstractClusterService(context, topologyService, messagingService) {
             private volatile ClusterImpl cluster;
 
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index 46d1232..a799f3d 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -104,9 +104,6 @@ public class NettyServerTest {
         server = getServer(channel.newSucceededFuture(), true);
         
         server.stop().join();
-        
-        assertTrue(server.getBossGroup().isTerminated());
-        assertTrue(server.getWorkerGroup().isTerminated());
     }
     
     /**
@@ -119,9 +116,6 @@ public class NettyServerTest {
         var channel = new EmbeddedServerChannel();
         
         server = getServer(channel.newFailedFuture(new ClosedChannelException()), false);
-        
-        assertTrue(server.getBossGroup().isTerminated());
-        assertTrue(server.getWorkerGroup().isTerminated());
     }
     
     /**
@@ -136,9 +130,6 @@ public class NettyServerTest {
         server = getServer(channel.newSucceededFuture(), true);
         
         channel.close();
-        
-        assertTrue(server.getBossGroup().isShuttingDown());
-        assertTrue(server.getWorkerGroup().isShuttingDown());
     }
     
     /**
@@ -159,9 +150,6 @@ public class NettyServerTest {
         future.setSuccess(null);
         
         stop.get(3, TimeUnit.SECONDS);
-        
-        assertTrue(server.getBossGroup().isTerminated());
-        assertTrue(server.getWorkerGroup().isTerminated());
     }
     
     /**
@@ -222,8 +210,7 @@ public class NettyServerTest {
                 (socketAddress, message) -> {
                 },
                 registry,
-                NamedNioEventLoopGroup.create("boss-"),
-                NamedNioEventLoopGroup.create("worker-")
+                null
         );
         
         server.start().get(3, TimeUnit.SECONDS);
@@ -291,9 +278,7 @@ public class NettyServerTest {
                 () -> mock(HandshakeManager.class),
                 null,
                 null,
-                null,
-                NamedNioEventLoopGroup.create("boss-"),
-                NamedNioEventLoopGroup.create("worker-")
+                null
         );
         
         try {
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index a31356a..44cd526 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -339,7 +339,7 @@ public class IgniteImpl implements Ignite {
 
         if (explicitStop.get()) {
             doStopNode(List.of(vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, metaStorageMgr, clusterCfgMgr, baselineMgr,
-                    distributedTblMgr, qryEngine, restModule, clientHandlerModule));
+                    distributedTblMgr, qryEngine, restModule, clientHandlerModule, nettyBootstrapFactory));
         }
     }