You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/11/01 10:04:59 UTC

[ignite-3] branch main updated: IGNITE-15477 Network module readme. Netty thread naming improvement. (#406)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7050546  IGNITE-15477 Network module readme. Netty thread naming improvement. (#406)
7050546 is described below

commit 70505463dcd35d3fe212573257fa108b4699ee41
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Mon Nov 1 13:04:54 2021 +0300

    IGNITE-15477 Network module readme. Netty thread naming improvement. (#406)
---
 modules/network/README.md                          |  48 ++++++++++++++
 modules/network/docs/network-flow.png              | Bin 0 -> 67254 bytes
 modules/network/docs/network-flow.puml             |  72 +++++++++++++++++++++
 modules/network/docs/threading-2.png               | Bin 0 -> 45474 bytes
 modules/network/docs/threading-2.puml              |  43 ++++++++++++
 modules/network/docs/threading.png                 | Bin 0 -> 32334 bytes
 modules/network/docs/threading.puml                |  39 +++++++++++
 .../internal/network/netty/ConnectionManager.java  |   5 +-
 .../network/netty/NamedNioEventLoopGroup.java      |  57 ++++++++++++++++
 .../ignite/internal/network/netty/NettyServer.java |  29 +++++++--
 .../scalecube/ScaleCubeClusterServiceFactory.java  |   1 -
 .../scalecube/ScaleCubeMessagingService.java       |   2 +-
 .../internal/network/netty/NettyServerTest.java    |   2 +
 13 files changed, 287 insertions(+), 11 deletions(-)

diff --git a/modules/network/README.md b/modules/network/README.md
index f2e6483..4aac91b 100644
--- a/modules/network/README.md
+++ b/modules/network/README.md
@@ -1 +1,49 @@
 This module provides implementations for the Network API module.
+
+## Brief overview
+Ignite uses ScaleCube to form a network cluster and exchange messages.
+
+## Message serialization
+Ignite uses direct marshalling to serialize and deserialize messages.
+For every `@Transferable` message interface `ignite-network-annotation-processor` generates
+an implementation for the message interface, a serializer and a deserializer.
+Supported types:
+ + All primitives
+ + Other `@Transferable` objects
+ + `java.lang.String`
+ + `java.util.UUID`
+ + `org.apache.ignite.lang.IgniteUuid`
+ + `java.util.BitSet`
+ + `java.util.Collection<V>` where `V` can be any supported type
+ + `java.util.Map<K, V>` where `K` and `V` can be any supported type
+ + Arrays of all supported types
+
+## Threading
+Every Ignite node has three network thread pool executors and thread naming formats:
++ Client worker - handles channel events on a client (`{consistentId}-client-X`)
++ Server boss - accepting incoming connections (`{consistentId}-srv-accept-X`)
++ Server worker - handles channel events on a server (`{consistentId}-srv-worker-X`),
+where `X` is the index of the thread in a pool.
+
+Messages are then passed on to message listeners of the ConnectionManager.   
+In case of ClusterService over ScaleCube (see `ScaleCubeClusterServiceFactory`),
+messages are passed down to the ClusterService via the Project Reactor's Sink which enforces a strict order of message handling:
+a new message can't be received by ClusterService until a previous message is **handled** (see [message handling](#message-handling)).
+![Threading](docs/threading.png)
+Message handling can also be offloaded to another thread:
+![Threading](docs/threading-2.png)
+Note that in this case the network message would be considered **handled** before it is processed  
+by another thread.
+
+ScaleCube uses `sc-cluster-X` (where X is an address of the node, e.g. `localhost-3344`) scheduler for the failure 
+detector, gossip protocol, metadata store and the membership protocol.
+
+
+## Message handling
+Message handlers are called in the order they were added.  
+Message is considered **handled** after all the message handlers have been invoked.
+
+## Message's flow example
+Two nodes, Alice and Bob.
+User is sending a message from Alice to Bob within any thread.
+![Network flow between two nodes](docs/network-flow.png)
diff --git a/modules/network/docs/network-flow.png b/modules/network/docs/network-flow.png
new file mode 100644
index 0000000..f937b3a
Binary files /dev/null and b/modules/network/docs/network-flow.png differ
diff --git a/modules/network/docs/network-flow.puml b/modules/network/docs/network-flow.puml
new file mode 100644
index 0000000..277ea02
--- /dev/null
+++ b/modules/network/docs/network-flow.puml
@@ -0,0 +1,72 @@
+/'
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+'/
+
+@startuml
+'https://plantuml.com/sequence-diagram
+
+autonumber
+actor Alice as alice
+participant MessagingService as aliceMessageService
+participant "ScaleCube inbound \n DirectProcessor (Alice)" as aliceInbound
+participant "Executor \n alice-client-worker" as aliceWorker
+entity Socket as sock
+participant "Executor \n bob-srv-worker" as bobWorker
+participant "ScaleCube inbound \n DirectProcessor (Bob)" as bobInbound
+actor Bob as bob
+
+alice -> aliceMessageService : Message
+
+aliceMessageService -> aliceWorker : Submit to netty \n worker thread
+activate aliceWorker #Green
+
+aliceWorker -> sock : Write and flush \n message data to socket
+deactivate aliceWorker
+
+sock -> bobWorker : Read from socket
+activate bobWorker #Green
+
+bobWorker --> bobInbound : Previous messages
+bobWorker -> bobInbound
+deactivate bobWorker
+activate bobInbound #Green
+
+bobInbound --> bob : Previous messages
+
+bobInbound -> bob : Message from Alice
+deactivate bobInbound
+
+bob -> bobWorker : Response from Bob
+activate bobWorker #Green
+
+bobWorker -> sock : Write and flush \n message data to socket
+deactivate bobWorker
+
+sock -> aliceWorker : Read from socket
+activate aliceWorker #Green
+
+aliceWorker --> aliceInbound : Previous messages
+aliceWorker -> aliceInbound : Response from Bob
+deactivate aliceWorker
+activate aliceInbound #Green
+
+aliceInbound --> aliceMessageService : Previous messages
+aliceInbound -> aliceMessageService : Response from Bob
+deactivate aliceInbound
+
+aliceMessageService -> alice : Response from Bob
+
+@enduml
diff --git a/modules/network/docs/threading-2.png b/modules/network/docs/threading-2.png
new file mode 100644
index 0000000..c4acce0
Binary files /dev/null and b/modules/network/docs/threading-2.png differ
diff --git a/modules/network/docs/threading-2.puml b/modules/network/docs/threading-2.puml
new file mode 100644
index 0000000..b68046f
--- /dev/null
+++ b/modules/network/docs/threading-2.puml
@@ -0,0 +1,43 @@
+/'
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+'/
+
+@startuml
+'https://plantuml.com/sequence-diagram
+
+autonumber
+
+actor "Alice" as alice
+participant "alice-client-worker-3" as aliceWorker1
+participant "alice-client-worker-4" as aliceWorker2
+entity "Socket" as socket
+participant "bob-srv-worker-2" as bobWorker1
+participant "bob-srv-worker-5" as bobWorker2
+participant "Bob's request handler \n(e.g. disruptor-stripe)" as bobHandler
+actor Bob as bob
+
+alice -> aliceWorker1 : Request from Alice \nwithin a user thread
+aliceWorker1 -> socket : Request from Alice
+socket -> bobWorker1 : Request from Alice
+bobWorker1 -> bobHandler : Request from Alice
+bobHandler -> bob : Request from Alice
+
+bob -> bobWorker2 : Response from Bob
+bobWorker2 -> socket : Response from Bob
+socket -> aliceWorker2 : Response from Bob
+aliceWorker2 -> alice : Response from Bob
+
+@enduml
diff --git a/modules/network/docs/threading.png b/modules/network/docs/threading.png
new file mode 100644
index 0000000..666567d
Binary files /dev/null and b/modules/network/docs/threading.png differ
diff --git a/modules/network/docs/threading.puml b/modules/network/docs/threading.puml
new file mode 100644
index 0000000..873d868
--- /dev/null
+++ b/modules/network/docs/threading.puml
@@ -0,0 +1,39 @@
+/'
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+'/
+
+@startuml
+'https://plantuml.com/sequence-diagram
+
+autonumber
+
+actor "Alice" as alice
+participant "alice-client-worker-3" as aliceWorker1
+participant "alice-client-worker-4" as aliceWorker2
+entity "Socket" as socket
+participant "bob-srv-worker-2" as bobWorker1
+actor Bob as bob
+
+alice -> aliceWorker1 : Request from Alice \nwithin a user thread
+aliceWorker1 -> socket : Request from Alice
+socket -> bobWorker1 : Request from Alice
+bobWorker1 -> bob : Request from Alice
+
+bob -> socket : Response from Bob
+socket -> aliceWorker2 : Response from Bob
+aliceWorker2 -> alice : Response from Bob
+
+@enduml
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 8d84b08..8125895 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -35,7 +35,6 @@ import java.util.stream.Stream;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.ignite.configuration.schemas.network.NetworkView;
 import org.apache.ignite.configuration.schemas.network.OutboundView;
@@ -61,7 +60,7 @@ public class ConnectionManager {
     private final Bootstrap clientBootstrap;
 
     /** Client socket channel handler event loop group. */
-    private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup();
+    private final EventLoopGroup clientWorkerGroup;
 
     /** Server. */
     private final NettyServer server;
@@ -110,12 +109,14 @@ public class ConnectionManager {
         this.consistentId = consistentId;
         this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
         this.server = new NettyServer(
+            consistentId,
             networkConfiguration,
             serverHandshakeManagerFactory,
             this::onNewIncomingChannel,
             this::onMessage,
             serializationRegistry
         );
+        this.clientWorkerGroup = NamedNioEventLoopGroup.create(consistentId + "-client");
         this.clientBootstrap = createClientBootstrap(clientWorkerGroup, networkConfiguration.outbound());
     }
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java
new file mode 100644
index 0000000..8b13f2b
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NamedNioEventLoopGroup.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import io.netty.util.concurrent.FastThreadLocalThread;
+
+/**
+ * Named netty event loop.
+ */
+public class NamedNioEventLoopGroup extends NioEventLoopGroup {
+    /**
+     * Constructor.
+     *
+     * @param threadFactory Thread factory.
+     */
+    private NamedNioEventLoopGroup(ThreadFactory threadFactory) {
+        super(threadFactory);
+    }
+
+    /**
+     * Creates event loop.
+     *
+     * @param namePrefix Tread name prefix.
+     * @return Event loop.
+     */
+    public static NioEventLoopGroup create(String namePrefix) {
+        var factory = new DefaultThreadFactory(namePrefix, Thread.MAX_PRIORITY) {
+            /** Thread index. */
+            private final AtomicInteger nextId = new AtomicInteger();
+
+            /** {@inheritDoc} */
+            @Override protected Thread newThread(Runnable r, String unused) {
+                return new FastThreadLocalThread(threadGroup, r, namePrefix + '-' + nextId.incrementAndGet());
+            }
+        };
+        return new NamedNioEventLoopGroup(factory);
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index eaccfb5..f633dea 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -55,10 +55,10 @@ public class NettyServer {
     private final ServerBootstrap bootstrap;
 
     /** Socket accepter event loop group. */
-    private final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
+    private final NioEventLoopGroup bossGroup;
 
     /** Socket handler event loop group. */
-    private final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+    private final NioEventLoopGroup workerGroup;
 
     /** Server socket configuration. */
     private final NetworkView configuration;
@@ -80,10 +80,7 @@ public class NettyServer {
     private volatile ServerChannel channel;
 
     /** Server close future. */
-    private CompletableFuture<Void> serverCloseFuture = CompletableFuture.allOf(
-        NettyUtils.toCompletableFuture(bossGroup.terminationFuture()),
-        NettyUtils.toCompletableFuture(workerGroup.terminationFuture())
-    );
+    private CompletableFuture<Void> serverCloseFuture;
 
     /** New connections listener. */
     private final Consumer<NettySender> newConnectionListener;
@@ -94,6 +91,7 @@ public class NettyServer {
     /**
      * Constructor.
      *
+     * @param consistentId Consistent id.
      * @param configuration Server configuration.
      * @param handshakeManager Handshake manager supplier.
      * @param newConnectionListener New connections listener.
@@ -101,18 +99,28 @@ public class NettyServer {
      * @param serializationRegistry Serialization registry.
      */
     public NettyServer(
+        String consistentId,
         NetworkView configuration,
         Supplier<HandshakeManager> handshakeManager,
         Consumer<NettySender> newConnectionListener,
         BiConsumer<SocketAddress, NetworkMessage> messageListener,
         MessageSerializationRegistry serializationRegistry
     ) {
-        this(new ServerBootstrap(), configuration, handshakeManager, newConnectionListener, messageListener, serializationRegistry);
+        this(
+            consistentId,
+            new ServerBootstrap(),
+            configuration,
+            handshakeManager,
+            newConnectionListener,
+            messageListener,
+            serializationRegistry
+        );
     }
 
     /**
      * Constructor.
      *
+     * @param consistentId Consistent id.
      * @param bootstrap Server bootstrap.
      * @param configuration Server configuration.
      * @param handshakeManager Handshake manager supplier.
@@ -121,6 +129,7 @@ public class NettyServer {
      * @param serializationRegistry Serialization registry.
      */
     public NettyServer(
+        String consistentId,
         ServerBootstrap bootstrap,
         NetworkView configuration,
         Supplier<HandshakeManager> handshakeManager,
@@ -134,6 +143,12 @@ public class NettyServer {
         this.newConnectionListener = newConnectionListener;
         this.messageListener = messageListener;
         this.serializationRegistry = serializationRegistry;
+        this.bossGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-accept");
+        this.workerGroup = NamedNioEventLoopGroup.create(consistentId + "-srv-worker");
+        serverCloseFuture = CompletableFuture.allOf(
+            NettyUtils.toCompletableFuture(bossGroup.terminationFuture()),
+            NettyUtils.toCompletableFuture(workerGroup.terminationFuture())
+        );
     }
 
     /**
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 59b2e72..0b10ba4 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -51,7 +51,6 @@ import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 
 /**
  * {@link ClusterServiceFactory} implementation that uses ScaleCube for messaging and topology services.
- * TODO: IGNITE-14538: This factory should use ScaleCube configuration instead of default parameters.
  */
 public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
     /** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index 0f02620..40ddc2d 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -94,7 +94,7 @@ class ScaleCubeMessagingService extends AbstractMessagingService {
 
     /** {@inheritDoc} */
     @Override public CompletableFuture<Void> send(NetworkAddress addr, NetworkMessage msg, String correlationId) {
-        // TODO: IGNITE-15161 Temporarly, probably should be removed after the implementation
+        // TODO: IGNITE-15161 Temporarily, probably should be removed after the implementation
         // TODO of stopping the clusterService cause some sort of stop thread-safety logic will be implemented.
         if (cluster.isShutdown())
             return failedFuture(new NodeStoppingException());
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index d03a31d..1d76627 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -211,6 +211,7 @@ public class NettyServerTest {
             });
 
         server = new NettyServer(
+            "test",
             serverCfg.value(),
             () -> handshakeManager,
             sender -> {},
@@ -273,6 +274,7 @@ public class NettyServerTest {
         Mockito.doReturn(future).when(bootstrap).bind(Mockito.anyInt());
 
         var server = new NettyServer(
+            "test",
             bootstrap,
             serverCfg.value(),
             () -> mock(HandshakeManager.class),