You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/11/23 12:58:03 UTC

[GitHub] [ignite-3] sashapolo commented on a change in pull request #466: IGNITE-15307 Thin 3.0: Reuse Netty infrastructure from network module

sashapolo commented on a change in pull request #466:
URL: https://github.com/apache/ignite-3/pull/466#discussion_r754988097



##########
File path: modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.configuration.schemas.network.InboundView;
+import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+import org.apache.ignite.configuration.schemas.network.OutboundView;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.network.netty.NamedNioEventLoopGroup;
+
+/**
+ * Netty bootstrap factory.
+ */
+public class NettyBootstrapFactory implements IgniteComponent {
+    /** Network configuration. */
+    private final NetworkConfiguration networkConfiguration;
+    
+    /** Prefix for event loop group names. */
+    private final String eventLoopGroupNamePrefix;
+    
+    /** Server boss socket channel handler event loop group. */
+    private EventLoopGroup bossGroup;
+    
+    /** Server work socket channel handler event loop group. */
+    private EventLoopGroup workerGroup;
+    
+    /** Client socket channel handler event loop group. */
+    private EventLoopGroup clientWorkerGroup;
+    
+    /**
+     * Constructor.
+     *
+     * @param networkConfiguration     Network configuration.
+     * @param eventLoopGroupNamePrefix Prefix for event loop group names.
+     */
+    public NettyBootstrapFactory(
+            NetworkConfiguration networkConfiguration,

Review comment:
       I would suggest using `NetworkView` instead of `NetworkConfiguration`

##########
File path: modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
##########
@@ -123,36 +129,29 @@ private ChannelFuture startEndpoint() throws InterruptedException {
 
         int port = 0;
         Channel ch = null;
-
-        // TODO: Reuse Netty infrastructure from network module IGNITE-15307.
-        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-        ServerBootstrap b = new ServerBootstrap();
-
-        b.group(eventLoopGroup)
-                .channel(NioServerSocketChannel.class)
-                .childHandler(new ChannelInitializer<>() {
+    
+        var bootstrap = bootstrapFactory.createServerBootstrap();
+    
+        bootstrap.childHandler(new ChannelInitializer<>() {
                     @Override
                     protected void initChannel(Channel ch) {
                         ch.pipeline().addLast(
                                 new ClientMessageDecoder(),
                                 new ClientInboundMessageHandler(igniteTables, processor));
                     }
                 })
-                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.connectTimeout())
-                .childOption(ChannelOption.SO_KEEPALIVE, true)
-                .childOption(ChannelOption.TCP_NODELAY, true);
-
+                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.connectTimeout());
+    
         for (int portCandidate = desiredPort; portCandidate <= desiredPort + portRange; portCandidate++) {
-            ChannelFuture bindRes = b.bind(portCandidate).await();
+            ChannelFuture bindRes = bootstrap.bind(portCandidate).await();
 
             if (bindRes.isSuccess()) {
                 ch = bindRes.channel();
-                ch.closeFuture().addListener((ChannelFutureListener) fut -> eventLoopGroup.shutdownGracefully());
+                ch.closeFuture();

Review comment:
       looks like this statement does nothing

##########
File path: modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
##########
@@ -98,12 +95,11 @@ public void testOperationFailsWhenAllServersFail() throws Exception {
         }
     }
 
-    private void stop(IgniteBiTuple<ClientHandlerModule, ConfigurationRegistry> srv) throws Exception {
+    private void stop(AutoCloseable srv) throws Exception {

Review comment:
       and this method can be replaced with `IgniteUtils.closeAll`

##########
File path: modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
##########
@@ -162,13 +166,20 @@
                 List.of(),
                 List.of()
         );
+    
+        NetworkConfiguration networkConfiguration = nodeCfgMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY);
+        
+        ClusterLocalConfiguration clusterLocalConfiguration = new ClusterLocalConfiguration(

Review comment:
       can be written shorter:
   ```
   var clusterLocalConfiguration = new ClusterLocalConfiguration(name, new MessageSerializationRegistryImpl());
   ```

##########
File path: modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
##########
@@ -123,36 +129,29 @@ private ChannelFuture startEndpoint() throws InterruptedException {
 
         int port = 0;
         Channel ch = null;
-
-        // TODO: Reuse Netty infrastructure from network module IGNITE-15307.
-        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-        ServerBootstrap b = new ServerBootstrap();
-
-        b.group(eventLoopGroup)
-                .channel(NioServerSocketChannel.class)
-                .childHandler(new ChannelInitializer<>() {
+    
+        var bootstrap = bootstrapFactory.createServerBootstrap();

Review comment:
       we have an agreement that using `var` in such cases is illegal

##########
File path: modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.configuration.schemas.network.InboundView;
+import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+import org.apache.ignite.configuration.schemas.network.OutboundView;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.network.netty.NamedNioEventLoopGroup;
+
+/**
+ * Netty bootstrap factory.

Review comment:
       I would also add a comment on what is the purpose of this class

##########
File path: modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
##########
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.network;
-
-import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
-
-/**
- * Cluster service factory.
- */
-public interface ClusterServiceFactory {

Review comment:
       Sorry, but I don't agree with this change for multiple reasons:
   1. It's not related to the original problem.
   2. I don't think that it does not encapsulate anything: it encapsulates a particular ServiceFactory implementation (which is the main point of the Factory pattern).
   3. The code looks cleaner with it.

##########
File path: modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
##########
@@ -331,47 +239,18 @@ public SocketAddress address() {
                     channel.close();
                 }
                 
-                return serverCloseFuture;
+                return serverCloseFuture == null ? CompletableFuture.<Void>completedFuture(null) : serverCloseFuture;
             }).thenCompose(Function.identity());
         }
     }
     
-    /**
-     * Shutdown event loops.
-     */
-    private void shutdownEventLoopGroups() {
-        // TODO: IGNITE-14538 quietPeriod and timeout should be configurable.
-        bossGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS);
-        workerGroup.shutdownGracefully(0L, 15, TimeUnit.SECONDS);
-    }
-    
     /**
      * Returns {@code true} if the server is running, {@code false} otherwise.
      *
      * @return {@code true} if the server is running, {@code false} otherwise.
      */
     @TestOnly
     public boolean isRunning() {
-        return channel != null && channel.isOpen() && !bossGroup.isShuttingDown() && !workerGroup.isShuttingDown();
-    }
-    
-    /**
-     * Returns acceptor event loop group.
-     *
-     * @return Acceptor event loop group.
-     */
-    @TestOnly
-    public NioEventLoopGroup getBossGroup() {
-        return bossGroup;
-    }
-    
-    /**
-     * Returns worker event loop group.
-     *
-     * @return Worker event loop group.
-     */
-    @TestOnly
-    public NioEventLoopGroup getWorkerGroup() {
-        return workerGroup;
+        return channel != null && channel.isOpen();

Review comment:
       `channel` is `volatile` field, I would suggest reading into a local variable first. Otherwise this method is not thread-safe

##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
##########
@@ -117,14 +122,21 @@ public void start() {
                                         )
                                 )
                 ).join();
+    
+                bootstrapFactory.start();
                 
                 clusterSvc.start();
             }
             
             @Override
             public void stop() {
-                clusterSvc.stop();
-                nodeConfigurationMgr.stop();
+                try {
+                    clusterSvc.stop();
+                    bootstrapFactory.stop();
+                    nodeConfigurationMgr.stop();
+                } catch (Exception e) {
+                    throw new Error(e);

Review comment:
       throwing `Error` is a bit too harsh, let's change it to `RuntimeException`

##########
File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
##########
@@ -37,23 +37,33 @@
 import org.apache.ignite.network.AbstractClusterService;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NodeFinder;
 import org.apache.ignite.network.NodeFinderFactory;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 
 /**
- * {@link ClusterServiceFactory} implementation that uses ScaleCube for messaging and topology services.
+ * Cluster service factory that uses ScaleCube for messaging and topology services.
  */
-public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
-    /** {@inheritDoc} */
-    @Override
-    public ClusterService createClusterService(ClusterLocalConfiguration context, NetworkConfiguration networkConfiguration) {
+public class ScaleCubeClusterServiceFactory {
+    /**
+     * Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
+     *
+     * @param context               Cluster context.
+     * @param networkConfiguration  Network configuration.
+     * @param nettyBootstrapFactory Bootstrap factory.
+     * @return New cluster service.
+     */
+    public ClusterService createClusterService(
+            ClusterLocalConfiguration context,
+            NetworkConfiguration networkConfiguration,
+            NettyBootstrapFactory nettyBootstrapFactory
+    ) {
         var topologyService = new ScaleCubeTopologyService();
 
         var messagingService = new ScaleCubeMessagingService();
-
+    

Review comment:
       please check your IDEA whitespace settings. See this Slack thread: https://gridgain.slack.com/archives/C01AC70AY9M/p1636731017188900

##########
File path: modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
##########
@@ -56,12 +57,12 @@
     /** Latest version of the direct marshalling protocol. */
     public static final byte DIRECT_PROTOCOL_VERSION = 1;
     
+    /** Bootstrap factory. */
+    private final NettyBootstrapFactory bootstrapFactory;

Review comment:
       I can see that this field is only needed for a method only used in tests. And it is only needed in tests to stop the factory. I think we can remove this field by rewriting the test code a little bit.

##########
File path: modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
##########
@@ -98,12 +95,11 @@ public void testOperationFailsWhenAllServersFail() throws Exception {
         }
     }
 
-    private void stop(IgniteBiTuple<ClientHandlerModule, ConfigurationRegistry> srv) throws Exception {
+    private void stop(AutoCloseable srv) throws Exception {

Review comment:
       I would also suggest to use `AfterEach` to stop the started servers

##########
File path: modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
##########
@@ -331,47 +239,18 @@ public SocketAddress address() {
                     channel.close();
                 }
                 
-                return serverCloseFuture;
+                return serverCloseFuture == null ? CompletableFuture.<Void>completedFuture(null) : serverCloseFuture;

Review comment:
       please mark `serverCloseFuture` as `@Nullable`

##########
File path: modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
##########
@@ -331,47 +239,18 @@ public SocketAddress address() {
                     channel.close();
                 }
                 
-                return serverCloseFuture;
+                return serverCloseFuture == null ? CompletableFuture.<Void>completedFuture(null) : serverCloseFuture;

Review comment:
       also, looks like there's a possible race condition here, since `serverCloseFuture` may not be read under the `startStopLock`. Consider extracting the read into a local variable:
   ```
   var local = serverCloseFuture;
   
   return serverStartFuture.handle((unused, throwable) -> {
       if (channel != null) {
           channel.close();
       }
       
       return local == null ? CompletableFuture.<Void>completedFuture(null) : local;
   }).thenCompose(Function.identity());
   ```            




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org