You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/04/22 16:54:19 UTC

[GitHub] [ignite-3] sashapolo commented on a change in pull request #102: IGNITE-14088 ScaleCube transport API over Netty

sashapolo commented on a change in pull request #102:
URL: https://github.com/apache/ignite-3/pull/102#discussion_r618382459



##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Class that manages connections both incoming and outgoing.

Review comment:
       ```suggestion
    * Class that manages both incoming and outgoing connections.
   ```

##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
##########
@@ -179,11 +190,17 @@ private static void stopForcefully(ClusterService cluster) throws Exception {
      * Wrapper for cluster.
      */
     private static final class Cluster {
-        /** */
-        private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
         /** */
         private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
 
+        /** */
+        private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
+
+        static {
+            SERIALIZATION_REGISTRY.registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory());

Review comment:
       `registerFactory` returns `MessageSerializationRegistry`, so you can get rid of the static block

##########
File path: modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
##########
@@ -48,8 +52,15 @@
 
 /** */
 class ITScaleCubeNetworkMessagingTest {
+    /** */
     private Cluster testCluster;
 
+    /** */
+    private final Map<String, NetworkMessage> messageStorage = new ConcurrentHashMap<>();
+
+    /** */
+    private final List<ClusterService> startedMembers = new ArrayList<>();

Review comment:
       Looks like an incorrect git merge

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Server. */
+    private NettyServer server;
+
+    /** Senders that wrap channels both incoming and outgoing. */
+    private Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>();
+
+    /** Clients. */
+    private Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = Collections.synchronizedList(new ArrayList<>());
+
+    public ConnectionManager(int port, MessageSerializationRegistry provider) {
+        this.serializationRegistry = provider;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry);
+    }
+
+    /**
+     * Start server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().get();

Review comment:
       should we use `join` instead?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Server. */
+    private NettyServer server;
+
+    /** Senders that wrap channels both incoming and outgoing. */

Review comment:
       sorry, I don't understand what you mean here

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Server. */
+    private NettyServer server;
+
+    /** Senders that wrap channels both incoming and outgoing. */
+    private Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>();
+
+    /** Clients. */
+    private Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = Collections.synchronizedList(new ArrayList<>());
+
+    public ConnectionManager(int port, MessageSerializationRegistry provider) {
+        this.serializationRegistry = provider;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry);
+    }
+
+    /**
+     * Start server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().get();
+        }
+        catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInternalException(
+                "Got interrupted while waiting for server to start: " + e.getMessage(),
+                e
+            );
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public InetSocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Get a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(InetSocketAddress address) {
+        NettySender channel = channels.get(address);
+
+        if (channel == null) {

Review comment:
       isn't this a race? Should we use a lock?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty client channel wrapper.
+ */
+public class NettyClient {
+    /** Socket channel bootstrapper. */
+    private final Bootstrap bootstrap = new Bootstrap();
+
+    /** Socket channel handler event loop group. */
+    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Destination host. */
+    private final String host;
+
+    /** Destination port. */
+    private final int port;
+
+    /** Incoming message listener. */
+    private final BiConsumer<InetSocketAddress, NetworkMessage> messageListener;
+
+    /** Future that resolves when client channel is opened. */
+    private final CompletableFuture<NettySender> clientFuture = new CompletableFuture<>();
+
+    /** Client socket channel. */
+    private Channel channel;
+
+    public NettyClient(String host, int port, MessageSerializationRegistry serializationRegistry, BiConsumer<InetSocketAddress, NetworkMessage> listener) {
+        this.host = host;
+        this.port = port;
+        this.serializationRegistry = serializationRegistry;
+        this.messageListener = listener;
+    }
+
+    /**
+     * Start client.
+     *
+     * @return Future that resolves when client channel is opened.
+     */
+    public CompletableFuture<NettySender> start() {
+        bootstrap.group(workerGroup)
+            .channel(NioSocketChannel.class)
+            /** See {@link NettyServer#start} for netty configuration details. */
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .handler(new ChannelInitializer<SocketChannel>() {
+                /** {@inheritDoc} */
+                @Override public void initChannel(SocketChannel ch)
+                    throws Exception {

Review comment:
       this exception is not thrown and can be removed

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Server. */
+    private NettyServer server;
+
+    /** Senders that wrap channels both incoming and outgoing. */
+    private Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>();
+
+    /** Clients. */
+    private Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = Collections.synchronizedList(new ArrayList<>());
+
+    public ConnectionManager(int port, MessageSerializationRegistry provider) {
+        this.serializationRegistry = provider;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry);
+    }
+
+    /**
+     * Start server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().get();
+        }
+        catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInternalException(
+                "Got interrupted while waiting for server to start: " + e.getMessage(),
+                e
+            );
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public InetSocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Get a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(InetSocketAddress address) {
+        NettySender channel = channels.get(address);
+
+        if (channel == null) {
+            NettyClient client = clients.computeIfAbsent(address, this::connect);
+            return client.sender();
+        }
+
+        return CompletableFuture.completedFuture(channel);
+    }
+
+    /**
+     * Callback that is called upon receiving of a new message.
+     *
+     * @param from Source of the message.
+     * @param message New message.
+     */
+    private void onMessage(InetSocketAddress from, NetworkMessage message) {
+        listeners.forEach(consumer -> {
+            consumer.accept(from, message);
+        });
+    }
+
+    /**
+     * Callback that is called upon new client connected to the server.
+     *
+     * @param channel Channel from client to this {@link #server}.
+     */
+    private void onNewIncomingChannel(SocketChannel channel) {
+        InetSocketAddress remoteAddress = channel.remoteAddress();
+        // TODO: there might be outgoing connection already
+        channels.put(remoteAddress, new NettySender(channel, serializationRegistry));
+    }
+
+    /**
+     * Create new client from this node to specified address.
+     *
+     * @param address Target address.
+     * @return New netty client.
+     */
+    private NettyClient connect(InetSocketAddress address) {
+        NettyClient client = new NettyClient(address.getHostName(), address.getPort(), serializationRegistry, (src, message) -> {
+            this.onMessage(src, message);
+        });
+
+        client.start().whenComplete((sender, throwable) -> {
+            if (throwable != null)
+                clients.remove(address);
+            else
+                channels.put(address, sender);
+        });
+
+        return client;
+    }
+
+    /**
+     * Add incoming message listener.
+     *
+     * @param listener Message listener.
+     */
+    public void addListener(BiConsumer<InetSocketAddress, NetworkMessage> listener) {
+        listeners.add(listener);
+    }
+
+    /**
+     * Stop server and all clients.
+     */
+    public void stop() {
+        // TODO: maybe add some flag that prohibts opening new connections from this moment?

Review comment:
       I think that this problem will be solved when you will have a single Bootstrap and worker group. Then you won't be able to accept connections if you close them.

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty client channel wrapper.
+ */
+public class NettyClient {
+    /** Socket channel bootstrapper. */
+    private final Bootstrap bootstrap = new Bootstrap();
+
+    /** Socket channel handler event loop group. */
+    private final EventLoopGroup workerGroup = new NioEventLoopGroup();

Review comment:
       Are you sure we should create a single `NioEventLoopGroup` per client channel? This is effectively a thread pool, so I would expect it to be shared. See https://github.com/netty/netty/issues/5038 for example (looks like the same applies to the `Bootstrap` class)

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Server. */
+    private NettyServer server;
+
+    /** Senders that wrap channels both incoming and outgoing. */
+    private Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>();
+
+    /** Clients. */
+    private Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = Collections.synchronizedList(new ArrayList<>());
+
+    public ConnectionManager(int port, MessageSerializationRegistry provider) {
+        this.serializationRegistry = provider;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry);
+    }
+
+    /**
+     * Start server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().get();
+        }
+        catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInternalException(
+                "Got interrupted while waiting for server to start: " + e.getMessage(),
+                e
+            );
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public InetSocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Get a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(InetSocketAddress address) {
+        NettySender channel = channels.get(address);
+
+        if (channel == null) {
+            NettyClient client = clients.computeIfAbsent(address, this::connect);
+            return client.sender();
+        }
+
+        return CompletableFuture.completedFuture(channel);
+    }
+
+    /**
+     * Callback that is called upon receiving of a new message.
+     *
+     * @param from Source of the message.
+     * @param message New message.
+     */
+    private void onMessage(InetSocketAddress from, NetworkMessage message) {
+        listeners.forEach(consumer -> {
+            consumer.accept(from, message);
+        });
+    }
+
+    /**
+     * Callback that is called upon new client connected to the server.
+     *
+     * @param channel Channel from client to this {@link #server}.
+     */
+    private void onNewIncomingChannel(SocketChannel channel) {
+        InetSocketAddress remoteAddress = channel.remoteAddress();
+        // TODO: there might be outgoing connection already
+        channels.put(remoteAddress, new NettySender(channel, serializationRegistry));
+    }
+
+    /**
+     * Create new client from this node to specified address.
+     *
+     * @param address Target address.
+     * @return New netty client.
+     */
+    private NettyClient connect(InetSocketAddress address) {
+        NettyClient client = new NettyClient(address.getHostName(), address.getPort(), serializationRegistry, (src, message) -> {

Review comment:
       `new NettyClient(address.getHostName(), address.getPort(), serializationRegistry, this::onMessage);`

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Server. */
+    private NettyServer server;

Review comment:
       all of this class' members can be `final`

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Server. */
+    private NettyServer server;
+
+    /** Senders that wrap channels both incoming and outgoing. */
+    private Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>();
+
+    /** Clients. */
+    private Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = Collections.synchronizedList(new ArrayList<>());
+
+    public ConnectionManager(int port, MessageSerializationRegistry provider) {
+        this.serializationRegistry = provider;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry);
+    }
+
+    /**
+     * Start server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().get();
+        }
+        catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInternalException(
+                "Got interrupted while waiting for server to start: " + e.getMessage(),
+                e
+            );
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public InetSocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Get a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(InetSocketAddress address) {
+        NettySender channel = channels.get(address);
+
+        if (channel == null) {
+            NettyClient client = clients.computeIfAbsent(address, this::connect);
+            return client.sender();
+        }
+
+        return CompletableFuture.completedFuture(channel);
+    }
+
+    /**
+     * Callback that is called upon receiving of a new message.
+     *
+     * @param from Source of the message.
+     * @param message New message.
+     */
+    private void onMessage(InetSocketAddress from, NetworkMessage message) {
+        listeners.forEach(consumer -> {
+            consumer.accept(from, message);
+        });
+    }
+
+    /**
+     * Callback that is called upon new client connected to the server.
+     *
+     * @param channel Channel from client to this {@link #server}.
+     */
+    private void onNewIncomingChannel(SocketChannel channel) {
+        InetSocketAddress remoteAddress = channel.remoteAddress();
+        // TODO: there might be outgoing connection already
+        channels.put(remoteAddress, new NettySender(channel, serializationRegistry));
+    }
+
+    /**
+     * Create new client from this node to specified address.
+     *
+     * @param address Target address.
+     * @return New netty client.
+     */
+    private NettyClient connect(InetSocketAddress address) {
+        NettyClient client = new NettyClient(address.getHostName(), address.getPort(), serializationRegistry, (src, message) -> {
+            this.onMessage(src, message);
+        });
+
+        client.start().whenComplete((sender, throwable) -> {
+            if (throwable != null)
+                clients.remove(address);
+            else
+                channels.put(address, sender);
+        });
+
+        return client;
+    }
+
+    /**
+     * Add incoming message listener.
+     *
+     * @param listener Message listener.
+     */
+    public void addListener(BiConsumer<InetSocketAddress, NetworkMessage> listener) {
+        listeners.add(listener);
+    }
+
+    /**
+     * Stop server and all clients.
+     */
+    public void stop() {
+        // TODO: maybe add some flag that prohibts opening new connections from this moment?
+        this.server.stop();
+        HashMap<InetSocketAddress, NettyClient> map = new HashMap<>(clients);

Review comment:
       why do you need the copy?

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Server. */
+    private NettyServer server;
+
+    /** Senders that wrap channels both incoming and outgoing. */
+    private Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>();
+
+    /** Clients. */
+    private Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = Collections.synchronizedList(new ArrayList<>());
+
+    public ConnectionManager(int port, MessageSerializationRegistry provider) {
+        this.serializationRegistry = provider;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry);
+    }
+
+    /**
+     * Start server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().get();
+        }
+        catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInternalException(
+                "Got interrupted while waiting for server to start: " + e.getMessage(),
+                e
+            );
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public InetSocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Get a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(InetSocketAddress address) {
+        NettySender channel = channels.get(address);
+
+        if (channel == null) {
+            NettyClient client = clients.computeIfAbsent(address, this::connect);
+            return client.sender();
+        }
+
+        return CompletableFuture.completedFuture(channel);
+    }
+
+    /**
+     * Callback that is called upon receiving of a new message.
+     *
+     * @param from Source of the message.
+     * @param message New message.
+     */
+    private void onMessage(InetSocketAddress from, NetworkMessage message) {
+        listeners.forEach(consumer -> {

Review comment:
       `listeners.forEach(consumer -> consumer.accept(from, message));`

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Server. */
+    private NettyServer server;
+
+    /** Senders that wrap channels both incoming and outgoing. */
+    private Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>();
+
+    /** Clients. */
+    private Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = Collections.synchronizedList(new ArrayList<>());

Review comment:
       I think `CopyOnWriteArrayList` is better here

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Server. */
+    private NettyServer server;
+
+    /** Senders that wrap channels both incoming and outgoing. */
+    private Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>();
+
+    /** Clients. */
+    private Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = Collections.synchronizedList(new ArrayList<>());
+
+    public ConnectionManager(int port, MessageSerializationRegistry provider) {
+        this.serializationRegistry = provider;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry);
+    }
+
+    /**
+     * Start server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().get();
+        }
+        catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInternalException(
+                "Got interrupted while waiting for server to start: " + e.getMessage(),
+                e
+            );
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public InetSocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Get a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(InetSocketAddress address) {
+        NettySender channel = channels.get(address);
+
+        if (channel == null) {
+            NettyClient client = clients.computeIfAbsent(address, this::connect);
+            return client.sender();
+        }
+
+        return CompletableFuture.completedFuture(channel);
+    }
+
+    /**
+     * Callback that is called upon receiving of a new message.
+     *
+     * @param from Source of the message.
+     * @param message New message.
+     */
+    private void onMessage(InetSocketAddress from, NetworkMessage message) {
+        listeners.forEach(consumer -> {
+            consumer.accept(from, message);
+        });
+    }
+
+    /**
+     * Callback that is called upon new client connected to the server.
+     *
+     * @param channel Channel from client to this {@link #server}.
+     */
+    private void onNewIncomingChannel(SocketChannel channel) {
+        InetSocketAddress remoteAddress = channel.remoteAddress();
+        // TODO: there might be outgoing connection already
+        channels.put(remoteAddress, new NettySender(channel, serializationRegistry));
+    }
+
+    /**
+     * Create new client from this node to specified address.
+     *
+     * @param address Target address.
+     * @return New netty client.
+     */
+    private NettyClient connect(InetSocketAddress address) {
+        NettyClient client = new NettyClient(address.getHostName(), address.getPort(), serializationRegistry, (src, message) -> {
+            this.onMessage(src, message);
+        });
+
+        client.start().whenComplete((sender, throwable) -> {
+            if (throwable != null)
+                clients.remove(address);
+            else
+                channels.put(address, sender);
+        });
+
+        return client;
+    }
+
+    /**
+     * Add incoming message listener.
+     *
+     * @param listener Message listener.
+     */
+    public void addListener(BiConsumer<InetSocketAddress, NetworkMessage> listener) {
+        listeners.add(listener);
+    }
+
+    /**
+     * Stop server and all clients.
+     */
+    public void stop() {
+        // TODO: maybe add some flag that prohibts opening new connections from this moment?
+        this.server.stop();
+        HashMap<InetSocketAddress, NettyClient> map = new HashMap<>(clients);
+        map.values().forEach(client -> {

Review comment:
       `map.values().forEach(NettyClient::stop);`

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty client channel wrapper.
+ */
+public class NettyClient {
+    /** Socket channel bootstrapper. */
+    private final Bootstrap bootstrap = new Bootstrap();
+
+    /** Socket channel handler event loop group. */
+    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Destination host. */
+    private final String host;
+
+    /** Destination port. */
+    private final int port;
+
+    /** Incoming message listener. */
+    private final BiConsumer<InetSocketAddress, NetworkMessage> messageListener;
+
+    /** Future that resolves when client channel is opened. */
+    private final CompletableFuture<NettySender> clientFuture = new CompletableFuture<>();
+
+    /** Client socket channel. */
+    private Channel channel;
+
+    public NettyClient(String host, int port, MessageSerializationRegistry serializationRegistry, BiConsumer<InetSocketAddress, NetworkMessage> listener) {
+        this.host = host;
+        this.port = port;
+        this.serializationRegistry = serializationRegistry;
+        this.messageListener = listener;
+    }
+
+    /**
+     * Start client.
+     *
+     * @return Future that resolves when client channel is opened.
+     */
+    public CompletableFuture<NettySender> start() {
+        bootstrap.group(workerGroup)
+            .channel(NioSocketChannel.class)
+            /** See {@link NettyServer#start} for netty configuration details. */
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .handler(new ChannelInitializer<SocketChannel>() {
+                /** {@inheritDoc} */
+                @Override public void initChannel(SocketChannel ch)
+                    throws Exception {
+                    ch.pipeline().addLast(
+                        new InboundDecoder(serializationRegistry),
+                        new MessageHandler(messageListener),
+                        new ChunkedWriteHandler()
+                    );
+                }
+        });
+
+        ChannelFuture connectFuture = bootstrap.connect(host, port);
+
+        connectFuture.addListener(connect -> {

Review comment:
       this can be written a little bit shorter:
   ```
   bootstrap.connect(host, port)
       .addListener((ChannelFutureListener)future -> {
           this.channel = future.channel();
           if (future.isSuccess())
               clientFuture.complete(new NettySender(channel, serializationRegistry));
           else
               clientFuture.completeExceptionally(future.cause());
   
           // Shutdown event loop group when channel is closed.
           channel.closeFuture().addListener(close -> {
               workerGroup.shutdownGracefully();
           });
       });
   ```

##########
File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Server. */
+    private NettyServer server;
+
+    /** Senders that wrap channels both incoming and outgoing. */
+    private Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>();
+
+    /** Clients. */
+    private Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = Collections.synchronizedList(new ArrayList<>());
+
+    public ConnectionManager(int port, MessageSerializationRegistry provider) {
+        this.serializationRegistry = provider;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry);
+    }
+
+    /**
+     * Start server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().get();
+        }
+        catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInternalException(
+                "Got interrupted while waiting for server to start: " + e.getMessage(),
+                e
+            );
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public InetSocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Get a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(InetSocketAddress address) {
+        NettySender channel = channels.get(address);
+
+        if (channel == null) {
+            NettyClient client = clients.computeIfAbsent(address, this::connect);
+            return client.sender();
+        }
+
+        return CompletableFuture.completedFuture(channel);
+    }
+
+    /**
+     * Callback that is called upon receiving of a new message.
+     *
+     * @param from Source of the message.
+     * @param message New message.
+     */
+    private void onMessage(InetSocketAddress from, NetworkMessage message) {
+        listeners.forEach(consumer -> {
+            consumer.accept(from, message);
+        });
+    }
+
+    /**
+     * Callback that is called upon new client connected to the server.
+     *
+     * @param channel Channel from client to this {@link #server}.
+     */
+    private void onNewIncomingChannel(SocketChannel channel) {
+        InetSocketAddress remoteAddress = channel.remoteAddress();
+        // TODO: there might be outgoing connection already
+        channels.put(remoteAddress, new NettySender(channel, serializationRegistry));
+    }
+
+    /**
+     * Create new client from this node to specified address.
+     *
+     * @param address Target address.
+     * @return New netty client.
+     */
+    private NettyClient connect(InetSocketAddress address) {
+        NettyClient client = new NettyClient(address.getHostName(), address.getPort(), serializationRegistry, (src, message) -> {
+            this.onMessage(src, message);
+        });
+
+        client.start().whenComplete((sender, throwable) -> {
+            if (throwable != null)
+                clients.remove(address);

Review comment:
       why don't you clean the `channels` map as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org