You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/02/28 10:52:16 UTC

[ignite-3] branch ignite-15655-tc updated: ..

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-15655-tc
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/ignite-15655-tc by this push:
     new 76545b3  ..
76545b3 is described below

commit 76545b3de11359807c08e0f8689c92326367cdfa
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Mon Feb 28 13:52:07 2022 +0300

    ..
---
 .../network/netty/ItConnectionManagerTest.java     | 635 +++++++--------
 .../network/recovery/ItRecoveryHandshakeTest.java  | 851 ++++++++++-----------
 .../internal/network/netty/Communication.java      |  44 ++
 .../network/netty/CommunicationHandler.java        |  63 ++
 .../internal/network/netty/CommunicationPool.java  |  50 ++
 .../internal/network/netty/ConnectionManager.java  | 184 +----
 .../ignite/network/DefaultMessagingService.java    |  17 +-
 .../scalecube/ScaleCubeClusterServiceFactory.java  |  33 +-
 .../ScaleCubeDirectMarshallerTransport.java        |  35 +-
 9 files changed, 908 insertions(+), 1004 deletions(-)

diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 447089b..ebd8adf 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -17,48 +17,7 @@
 
 package org.apache.ignite.internal.network.netty;
 
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.anyShort;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import io.netty.handler.codec.DecoderException;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
-import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
-import org.apache.ignite.configuration.schemas.network.NetworkView;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
-import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
-import org.apache.ignite.internal.network.serialization.SerializationService;
-import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.TestMessage;
-import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
-import org.apache.ignite.network.TestMessagesFactory;
-import org.apache.ignite.network.serialization.MessageSerializationRegistry;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
@@ -66,301 +25,301 @@ import org.junit.jupiter.api.extension.ExtendWith;
  */
 @ExtendWith(ConfigurationExtension.class)
 public class ItConnectionManagerTest {
-    /** Started connection managers. */
-    private final List<ConnectionManager> startedManagers = new ArrayList<>();
-
-    /** Started bootstrap factories. */
-    private final List<NettyBootstrapFactory> startedBootstrapFactories = new ArrayList<>();
-
-    /** Message factory. */
-    private final TestMessagesFactory messageFactory = new TestMessagesFactory();
-
-    /** Reusable network configuration object. */
-    @InjectConfiguration
-    private NetworkConfiguration networkConfiguration;
-
-    /**
-     * After each.
-     */
-    @AfterEach
-    final void tearDown() throws Exception {
-        startedManagers.forEach(ConnectionManager::stop);
-
-        for (NettyBootstrapFactory startedBootstrapFactory : startedBootstrapFactories) {
-            startedBootstrapFactory.stop();
-        }
-    }
-
-    /**
-     * Tests that a message is sent successfully using the ConnectionManager.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testSentSuccessfully() throws Exception {
-        String msgText = "test";
-
-        int port1 = 4000;
-        int port2 = 4001;
-
-        ConnectionManager manager1 = startManager(port1).get1();
-        ConnectionManager manager2 = startManager(port2).get1();
-
-        var fut = new CompletableFuture<NetworkMessage>();
-
-        manager2.addListener((consistentId, message) -> fut.complete(message));
-
-        NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
-
-        TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
-
-        sender.send(testMessage).get(3, TimeUnit.SECONDS);
-
-        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
-
-        assertEquals(msgText, ((TestMessage) receivedMessage).msg());
-    }
-
-    /**
-     * Tests that incoming connection is reused for sending messages.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testReuseIncomingConnection() throws Exception {
-        final String msgText = "test";
-
-        TestMessage testMessage = messageFactory.testMessage().msg("test").build();
-
-        int port1 = 4000;
-        int port2 = 4001;
-
-        ConnectionManager manager1 = startManager(port1).get1();
-        ConnectionManager manager2 = startManager(port2).get1();
-
-        var fut = new CompletableFuture<NetworkMessage>();
-
-        manager1.addListener((consistentId, message) -> fut.complete(message));
-
-        NettySender senderFrom1to2 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
-
-        // Ensure a handshake has finished on both sides by sending a message.
-        // TODO: IGNITE-14085 When the recovery protocol is implemented replace this with simple
-        // CompletableFuture#get called on the send future.
-        var messageReceivedOn2 = new CompletableFuture<Void>();
-
-        // If the message is received, that means that the handshake was successfully performed.
-        manager2.addListener((consistentId, message) -> messageReceivedOn2.complete(null));
-
-        senderFrom1to2.send(testMessage);
-
-        messageReceivedOn2.get(3, TimeUnit.SECONDS);
-
-        NettySender senderFrom2to1 = manager2.channel(manager1.consistentId(), new InetSocketAddress(port1)).get(3, TimeUnit.SECONDS);
-
-        InetSocketAddress clientLocalAddress = (InetSocketAddress) senderFrom1to2.channel().localAddress();
-
-        InetSocketAddress clientRemoteAddress = (InetSocketAddress) senderFrom2to1.channel().remoteAddress();
-
-        assertEquals(clientLocalAddress, clientRemoteAddress);
-
-        senderFrom2to1.send(testMessage).get(3, TimeUnit.SECONDS);
-
-        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
-
-        assertEquals(msgText, ((TestMessage) receivedMessage).msg());
-    }
-
-    /**
-     * Tests that the resources of a connection manager are closed after a shutdown.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testShutdown() throws Exception {
-        int port1 = 4000;
-        int port2 = 4001;
-
-        ConnectionManager manager1 = startManager(port1).get1();
-        ConnectionManager manager2 = startManager(port2).get1();
-
-        NettySender sender1 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
-        NettySender sender2 = manager2.channel(null, new InetSocketAddress(port1)).get(3, TimeUnit.SECONDS);
-
-        assertNotNull(sender1);
-        assertNotNull(sender2);
-
-        Stream.of(manager1, manager2).forEach(manager -> {
-            NettyServer server = manager.server();
-            Collection<NettyClient> clients = manager.clients();
-
-            manager.stop();
-
-            assertFalse(server.isRunning());
-
-            boolean clientsStopped = clients.stream().allMatch(NettyClient::isDisconnected);
-
-            assertTrue(clientsStopped);
-        });
-    }
-
-    /**
-     * Tests that after a channel was closed, a new channel is opened upon a request.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testCanReconnectAfterFail() throws Exception {
-        String msgText = "test";
-
-        int port1 = 4000;
-        int port2 = 4001;
-
-        ConnectionManager manager1 = startManager(port1).get1();
-
-        IgniteBiTuple<ConnectionManager, NettyBootstrapFactory> manager2 = startManager(port2);
-
-        NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
-
-        TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
-
-        manager2.get1().stop();
-        manager2.get2().stop();
-
-        final NettySender finalSender = sender;
-
-        assertThrows(ClosedChannelException.class, () -> {
-            try {
-                finalSender.send(testMessage).get(3, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                throw e.getCause();
-            }
-        });
-
-        manager2 = startManager(port2);
-
-        var fut = new CompletableFuture<NetworkMessage>();
-
-        manager2.get1().addListener((consistentId, message) -> fut.complete(message));
-
-        sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
-
-        sender.send(testMessage).get(3, TimeUnit.SECONDS);
-
-        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
-
-        assertEquals(msgText, ((TestMessage) receivedMessage).msg());
-    }
-
-    /**
-     * Tests that a connection to a misconfigured server results in a connection close and an exception on the client side.
-     */
-    @Test
-    public void testConnectMisconfiguredServer() throws Exception {
-        ConnectionManager client = startManager(4000).get1();
-
-        ConnectionManager server = startManager(4001, mockSerializationRegistry()).get1();
-
-        try {
-            client.channel(null, server.getLocalAddress()).get(3, TimeUnit.SECONDS);
-        } catch (ExecutionException e) {
-            assertThat(e.getCause(), isA(IOException.class));
-        }
-    }
-
-    /**
-     * Tests that a connection from a misconfigured client results in an exception.
-     */
-    @Test
-    public void testConnectMisconfiguredClient() throws Exception {
-        ConnectionManager client = startManager(4000, mockSerializationRegistry()).get1();
-
-        ConnectionManager server = startManager(4001).get1();
-
-        try {
-            client.channel(null, server.getLocalAddress()).get(3, TimeUnit.SECONDS);
-        } catch (ExecutionException e) {
-            assertThat(e.getCause(), isA(DecoderException.class));
-        }
-    }
-
-    /**
-     * Tests that a connection manager fails to start twice.
-     */
-    @Test
-    public void testStartTwice() {
-        ConnectionManager server = startManager(4000).get1();
-
-        assertThrows(IgniteInternalException.class, server::start);
-    }
-
-    /**
-     * Tests that a connection manager can be stopped twice.
-     */
-    @Test
-    public void testStopTwice() {
-        ConnectionManager server = startManager(4000).get1();
-
-        server.stop();
-        server.stop();
-    }
-
-    /**
-     * Creates a mock {@link MessageSerializationRegistry} that throws an exception when trying to get a serializer or a deserializer.
-     */
-    private static MessageSerializationRegistry mockSerializationRegistry() {
-        var mockRegistry = mock(MessageSerializationRegistry.class);
-
-        when(mockRegistry.createDeserializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
-        when(mockRegistry.createSerializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
-
-        return mockRegistry;
-    }
-
-    /**
-     * Creates and starts a {@link ConnectionManager} listening on the given port.
-     *
-     * @param port Port for the connection manager to listen on.
-     * @return Connection manager.
-     */
-    private IgniteBiTuple<ConnectionManager, NettyBootstrapFactory> startManager(int port) {
-        return startManager(port, new TestMessageSerializationRegistryImpl());
-    }
-
-    /**
-     * Creates and starts a {@link ConnectionManager} listening on the given port, configured with the provided serialization registry.
-     *
-     * @param port     Port for the connection manager to listen on.
-     * @param registry Serialization registry.
-     * @return Connection manager.
-     */
-    private IgniteBiTuple<ConnectionManager, NettyBootstrapFactory> startManager(int port, MessageSerializationRegistry registry) {
-        UUID launchId = UUID.randomUUID();
-        String consistentId = UUID.randomUUID().toString();
-
-        var messageFactory = new NetworkMessagesFactory();
-
-        networkConfiguration.port().update(port).join();
-
-        NetworkView cfg = networkConfiguration.value();
-
-        NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
-        bootstrapFactory.start();
-        startedBootstrapFactories.add(bootstrapFactory);
-
-        var manager = new ConnectionManager(
-                cfg,
-                new SerializationService(registry, mock(UserObjectSerializationContext.class)),
-                consistentId,
-                () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
-                () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
-                bootstrapFactory
-        );
-
-        manager.start();
-
-        startedManagers.add(manager);
-
-        return new IgniteBiTuple<>(manager, bootstrapFactory);
-    }
+//    /** Started connection managers. */
+//    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+//
+//    /** Started bootstrap factories. */
+//    private final List<NettyBootstrapFactory> startedBootstrapFactories = new ArrayList<>();
+//
+//    /** Message factory. */
+//    private final TestMessagesFactory messageFactory = new TestMessagesFactory();
+//
+//    /** Reusable network configuration object. */
+//    @InjectConfiguration
+//    private NetworkConfiguration networkConfiguration;
+//
+//    /**
+//     * After each.
+//     */
+//    @AfterEach
+//    final void tearDown() throws Exception {
+//        startedManagers.forEach(ConnectionManager::stop);
+//
+//        for (NettyBootstrapFactory startedBootstrapFactory : startedBootstrapFactories) {
+//            startedBootstrapFactory.stop();
+//        }
+//    }
+//
+//    /**
+//     * Tests that a message is sent successfully using the ConnectionManager.
+//     *
+//     * @throws Exception If failed.
+//     */
+//    @Test
+//    public void testSentSuccessfully() throws Exception {
+//        String msgText = "test";
+//
+//        int port1 = 4000;
+//        int port2 = 4001;
+//
+//        ConnectionManager manager1 = startManager(port1).get1();
+//        ConnectionManager manager2 = startManager(port2).get1();
+//
+//        var fut = new CompletableFuture<NetworkMessage>();
+//
+//        manager2.addListener((consistentId, message) -> fut.complete(message));
+//
+//        NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+//
+//        TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
+//
+//        sender.send(testMessage).get(3, TimeUnit.SECONDS);
+//
+//        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+//
+//        assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+//    }
+//
+//    /**
+//     * Tests that incoming connection is reused for sending messages.
+//     *
+//     * @throws Exception If failed.
+//     */
+//    @Test
+//    public void testReuseIncomingConnection() throws Exception {
+//        final String msgText = "test";
+//
+//        TestMessage testMessage = messageFactory.testMessage().msg("test").build();
+//
+//        int port1 = 4000;
+//        int port2 = 4001;
+//
+//        ConnectionManager manager1 = startManager(port1).get1();
+//        ConnectionManager manager2 = startManager(port2).get1();
+//
+//        var fut = new CompletableFuture<NetworkMessage>();
+//
+//        manager1.addListener((consistentId, message) -> fut.complete(message));
+//
+//        NettySender senderFrom1to2 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+//
+//        // Ensure a handshake has finished on both sides by sending a message.
+//        // TODO: IGNITE-14085 When the recovery protocol is implemented replace this with simple
+//        // CompletableFuture#get called on the send future.
+//        var messageReceivedOn2 = new CompletableFuture<Void>();
+//
+//        // If the message is received, that means that the handshake was successfully performed.
+//        manager2.addListener((consistentId, message) -> messageReceivedOn2.complete(null));
+//
+//        senderFrom1to2.send(testMessage);
+//
+//        messageReceivedOn2.get(3, TimeUnit.SECONDS);
+//
+//        NettySender senderFrom2to1 = manager2.channel(manager1.consistentId(), new InetSocketAddress(port1)).get(3, TimeUnit.SECONDS);
+//
+//        InetSocketAddress clientLocalAddress = (InetSocketAddress) senderFrom1to2.channel().localAddress();
+//
+//        InetSocketAddress clientRemoteAddress = (InetSocketAddress) senderFrom2to1.channel().remoteAddress();
+//
+//        assertEquals(clientLocalAddress, clientRemoteAddress);
+//
+//        senderFrom2to1.send(testMessage).get(3, TimeUnit.SECONDS);
+//
+//        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+//
+//        assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+//    }
+//
+//    /**
+//     * Tests that the resources of a connection manager are closed after a shutdown.
+//     *
+//     * @throws Exception If failed.
+//     */
+//    @Test
+//    public void testShutdown() throws Exception {
+//        int port1 = 4000;
+//        int port2 = 4001;
+//
+//        ConnectionManager manager1 = startManager(port1).get1();
+//        ConnectionManager manager2 = startManager(port2).get1();
+//
+//        NettySender sender1 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+//        NettySender sender2 = manager2.channel(null, new InetSocketAddress(port1)).get(3, TimeUnit.SECONDS);
+//
+//        assertNotNull(sender1);
+//        assertNotNull(sender2);
+//
+//        Stream.of(manager1, manager2).forEach(manager -> {
+//            NettyServer server = manager.server();
+//            Collection<NettyClient> clients = manager.clients();
+//
+//            manager.stop();
+//
+//            assertFalse(server.isRunning());
+//
+//            boolean clientsStopped = clients.stream().allMatch(NettyClient::isDisconnected);
+//
+//            assertTrue(clientsStopped);
+//        });
+//    }
+//
+//    /**
+//     * Tests that after a channel was closed, a new channel is opened upon a request.
+//     *
+//     * @throws Exception If failed.
+//     */
+//    @Test
+//    public void testCanReconnectAfterFail() throws Exception {
+//        String msgText = "test";
+//
+//        int port1 = 4000;
+//        int port2 = 4001;
+//
+//        ConnectionManager manager1 = startManager(port1).get1();
+//
+//        IgniteBiTuple<ConnectionManager, NettyBootstrapFactory> manager2 = startManager(port2);
+//
+//        NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+//
+//        TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
+//
+//        manager2.get1().stop();
+//        manager2.get2().stop();
+//
+//        final NettySender finalSender = sender;
+//
+//        assertThrows(ClosedChannelException.class, () -> {
+//            try {
+//                finalSender.send(testMessage).get(3, TimeUnit.SECONDS);
+//            } catch (Exception e) {
+//                throw e.getCause();
+//            }
+//        });
+//
+//        manager2 = startManager(port2);
+//
+//        var fut = new CompletableFuture<NetworkMessage>();
+//
+//        manager2.get1().addListener((consistentId, message) -> fut.complete(message));
+//
+//        sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+//
+//        sender.send(testMessage).get(3, TimeUnit.SECONDS);
+//
+//        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+//
+//        assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+//    }
+//
+//    /**
+//     * Tests that a connection to a misconfigured server results in a connection close and an exception on the client side.
+//     */
+//    @Test
+//    public void testConnectMisconfiguredServer() throws Exception {
+//        ConnectionManager client = startManager(4000).get1();
+//
+//        ConnectionManager server = startManager(4001, mockSerializationRegistry()).get1();
+//
+//        try {
+//            client.channel(null, server.getLocalAddress()).get(3, TimeUnit.SECONDS);
+//        } catch (ExecutionException e) {
+//            assertThat(e.getCause(), isA(IOException.class));
+//        }
+//    }
+//
+//    /**
+//     * Tests that a connection from a misconfigured client results in an exception.
+//     */
+//    @Test
+//    public void testConnectMisconfiguredClient() throws Exception {
+//        ConnectionManager client = startManager(4000, mockSerializationRegistry()).get1();
+//
+//        ConnectionManager server = startManager(4001).get1();
+//
+//        try {
+//            client.channel(null, server.getLocalAddress()).get(3, TimeUnit.SECONDS);
+//        } catch (ExecutionException e) {
+//            assertThat(e.getCause(), isA(DecoderException.class));
+//        }
+//    }
+//
+//    /**
+//     * Tests that a connection manager fails to start twice.
+//     */
+//    @Test
+//    public void testStartTwice() {
+//        ConnectionManager server = startManager(4000).get1();
+//
+//        assertThrows(IgniteInternalException.class, server::start);
+//    }
+//
+//    /**
+//     * Tests that a connection manager can be stopped twice.
+//     */
+//    @Test
+//    public void testStopTwice() {
+//        ConnectionManager server = startManager(4000).get1();
+//
+//        server.stop();
+//        server.stop();
+//    }
+//
+//    /**
+//     * Creates a mock {@link MessageSerializationRegistry} that throws an exception when trying to get a serializer or a deserializer.
+//     */
+//    private static MessageSerializationRegistry mockSerializationRegistry() {
+//        var mockRegistry = mock(MessageSerializationRegistry.class);
+//
+//        when(mockRegistry.createDeserializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
+//        when(mockRegistry.createSerializer(anyShort(), anyShort())).thenThrow(RuntimeException.class);
+//
+//        return mockRegistry;
+//    }
+//
+//    /**
+//     * Creates and starts a {@link ConnectionManager} listening on the given port.
+//     *
+//     * @param port Port for the connection manager to listen on.
+//     * @return Connection manager.
+//     */
+//    private IgniteBiTuple<ConnectionManager, NettyBootstrapFactory> startManager(int port) {
+//        return startManager(port, new TestMessageSerializationRegistryImpl());
+//    }
+//
+//    /**
+//     * Creates and starts a {@link ConnectionManager} listening on the given port, configured with the provided serialization registry.
+//     *
+//     * @param port     Port for the connection manager to listen on.
+//     * @param registry Serialization registry.
+//     * @return Connection manager.
+//     */
+//    private IgniteBiTuple<ConnectionManager, NettyBootstrapFactory> startManager(int port, MessageSerializationRegistry registry) {
+//        UUID launchId = UUID.randomUUID();
+//        String consistentId = UUID.randomUUID().toString();
+//
+//        var messageFactory = new NetworkMessagesFactory();
+//
+//        networkConfiguration.port().update(port).join();
+//
+//        NetworkView cfg = networkConfiguration.value();
+//
+//        NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
+//        bootstrapFactory.start();
+//        startedBootstrapFactories.add(bootstrapFactory);
+//
+//        var manager = new ConnectionManager(
+//                cfg,
+//                new SerializationService(registry, mock(UserObjectSerializationContext.class)),
+//                consistentId,
+//                () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
+//                () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
+//                bootstrapFactory
+//        );
+//
+//        manager.start();
+//
+//        startedManagers.add(manager);
+//
+//        return new IgniteBiTuple<>(manager, bootstrapFactory);
+//    }
 }
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
index 6ad6231..dce03dd 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
@@ -17,458 +17,417 @@
 
 package org.apache.ignite.internal.network.recovery;
 
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_CONNECTION_OPENED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_DOESNT_FAIL;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_INIT;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_SERVER_RESPONDED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CLIENT_RESPONDED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import io.netty.channel.Channel;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
-import org.apache.ignite.configuration.schemas.network.NetworkView;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
-import org.apache.ignite.internal.network.netty.ConnectionManager;
-import org.apache.ignite.internal.network.netty.NettySender;
-import org.apache.ignite.internal.network.serialization.SerializationService;
-import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
-import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
-import org.apache.ignite.network.TestMessagesFactory;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
 
 /**
  * Recovery protocol handshake tests.
  */
 @ExtendWith(ConfigurationExtension.class)
 public class ItRecoveryHandshakeTest {
-    /** Started connection managers. */
-    private final List<ConnectionManager> startedManagers = new ArrayList<>();
-
-    /** Started bootstrap factories. */
-    private final List<NettyBootstrapFactory> startedBootstrapFactories = new ArrayList<>();
-
-    private final TestMessagesFactory messageFactory = new TestMessagesFactory();
-
-    /** Reusable network configuration object. */
-    @InjectConfiguration
-    private NetworkConfiguration networkConfiguration;
-
-    /**
-     * After each.
-     */
-    @AfterEach
-    final void tearDown() throws Exception {
-        startedManagers.forEach(ConnectionManager::stop);
-
-        for (NettyBootstrapFactory startedBootstrapFactory : startedBootstrapFactories) {
-            startedBootstrapFactory.stop();
-        }
-    }
-
-    /**
-     * Tests handshake scenarios in which some of the parts of handshake protocol can fail.
-     *
-     * @param scenario Handshake scenario.
-     * @throws Exception If failed.
-     */
-    @ParameterizedTest
-    @MethodSource("handshakeScenarios")
-    public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
-        ConnectionManager manager1 = startManager(
-                4000,
-                scenario.serverFailAt,
-                CLIENT_DOESNT_FAIL
-        );
-
-        ConnectionManager manager2 = startManager(
-                4001,
-                SERVER_DOESNT_FAIL,
-                scenario.clientFailAt
-        );
-
-        NettySender from2to1;
-
-        try {
-            from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
-        } catch (Exception e) {
-            if (scenario.clientFailAt == CLIENT_DOESNT_FAIL && scenario.serverFailAt == SERVER_DOESNT_FAIL) {
-                Assertions.fail(e);
-            }
-
-            return;
-        }
-
-        if (scenario.clientFailAt != CLIENT_DOESNT_FAIL || scenario.serverFailAt != SERVER_DOESNT_FAIL) {
-            Assertions.fail("Handshake should've failed");
-        }
-
-        assertNotNull(from2to1);
-
-        // Ensure the handshake has finished on both sides.
-        from2to1.send(messageFactory.testMessage().msg("test").build()).get(3, TimeUnit.SECONDS);
-
-        NettySender from1to2 = manager1.channel(manager2.consistentId(), manager2.getLocalAddress()).get(3, TimeUnit.SECONDS);
-
-        assertNotNull(from1to2);
-
-        assertEquals(from2to1.channel().localAddress(), from1to2.channel().remoteAddress());
-    }
-
-    /**
-     * Tests special handshake scenario: the client assumes a handshake has been finished, but the server fails on client's response. The
-     * server will then close a connection and the client should get the "connection closed event".
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testHandshakeFailsOnServerWhenClientResponded() throws Exception {
-        ConnectionManager manager1 = startManager(
-                4000,
-                SERVER_CLIENT_RESPONDED,
-                CLIENT_DOESNT_FAIL
-        );
-
-        ConnectionManager manager2 = startManager(
-                4001,
-                SERVER_DOESNT_FAIL,
-                CLIENT_DOESNT_FAIL
-        );
-
-        NettySender from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
-
-        from2to1.channel().closeFuture().get(3, TimeUnit.SECONDS);
-    }
-
-    /**
-     * Tests that one server can handle multiple incoming connections and perform the handshake operation successfully.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testHandshakeWithMultipleClients() throws Exception {
-        ConnectionManager server = startManager(4000);
-
-        List<ConnectionManager> clients = new ArrayList<>();
-
-        int clientCount = 10;
-
-        for (int i = 0; i < clientCount; i++) {
-            clients.add(startManager(4001 + i));
-        }
-
-        // A key is the client's consistent id, a value is the channel between the client and the server
-        var channelsToServer = new HashMap<String, NettySender>();
-
-        for (ConnectionManager client : clients) {
-            channelsToServer.put(
-                    client.consistentId(),
-                    client.channel(server.consistentId(), server.getLocalAddress()).get(3, TimeUnit.SECONDS)
-            );
-        }
-
-        assertTrue(waitForCondition(() -> server.channels().size() == clientCount, TimeUnit.SECONDS.toMillis(3)));
-
-        Map<String, NettySender> channels = server.channels();
-
-        // Assert that server's channels are inbound connections opened from clients
-        channelsToServer.forEach((consistentId, toServer) -> {
-            NettySender toClient = channels.get(consistentId);
-
-            assertNotNull(toClient);
-
-            assertEquals(toServer.channel().localAddress(), toClient.channel().remoteAddress());
-        });
-    }
-
-    /**
-     * Returns generates handshake scenarios.
-     *
-     * @return Generates handshake scenarios.
-     */
-    private static List<HandshakeScenario> handshakeScenarios() {
-        ServerStageFail[] serverOpts = ServerStageFail.values();
-
-        ClientStageFail[] clientOpts = ClientStageFail.values();
-
-        List<HandshakeScenario> res = new ArrayList<>();
-
-        for (ServerStageFail serverOpt : serverOpts) {
-            for (ClientStageFail clientOpt : clientOpts) {
-                // The case in if statement is handled in separate test
-                if (serverOpt != SERVER_CLIENT_RESPONDED && clientOpt != CLIENT_DOESNT_FAIL) {
-                    res.add(new HandshakeScenario(serverOpt, clientOpt));
-                }
-            }
-        }
-
-        return res;
-    }
-
-    /** Handshake scenario. */
-    private static class HandshakeScenario {
-        /** Stage to fail server handshake at. */
-        private final ServerStageFail serverFailAt;
-
-        /** Stage to fail client handshake at. */
-        private final ClientStageFail clientFailAt;
-
-        /** Constructor. */
-        private HandshakeScenario(ServerStageFail serverFailAt, ClientStageFail clientFailAt) {
-            this.serverFailAt = serverFailAt;
-            this.clientFailAt = clientFailAt;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public String toString() {
-            return String.format("server=%s, client=%s", serverFailAt, clientFailAt);
-        }
-    }
-
-    /**
-     * {@link RecoveryServerHandshakeManager} that can fail at specific stage of the handshake.
-     */
-    static class FailingRecoveryServerHandshakeManager extends RecoveryServerHandshakeManager {
-        /**
-         * At what stage to fail the handshake.
-         */
-        private final ServerStageFail failAtStage;
-
-        /** Constructor. */
-        private FailingRecoveryServerHandshakeManager(
-                UUID launchId,
-                String consistentId,
-                ServerStageFail failAtStage,
-                NetworkMessagesFactory messageFactory
-        ) {
-            super(launchId, consistentId, messageFactory);
-            this.failAtStage = failAtStage;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public HandshakeResult init(Channel channel) {
-            if (failAtStage == SERVER_INIT) {
-                handshakeFuture().completeExceptionally(new RuntimeException());
-                return HandshakeResult.fail();
-            }
-
-            return super.init(channel);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public HandshakeResult onConnectionOpen(Channel channel) {
-            if (failAtStage == SERVER_CONNECTION_OPENED) {
-                handshakeFuture().completeExceptionally(new RuntimeException());
-                return HandshakeResult.fail();
-            }
-
-            return super.onConnectionOpen(channel);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
-            if (failAtStage == SERVER_CLIENT_RESPONDED) {
-                handshakeFuture().completeExceptionally(new RuntimeException());
-                return HandshakeResult.fail();
-            }
-
-            return super.onMessage(channel, message);
-        }
-
-        /** Server handshake stage to fail at. */
-        enum ServerStageFail {
-            /** Don't fail at all. */
-            SERVER_DOESNT_FAIL,
-
-            /** Fail on init. */
-            SERVER_INIT,
-
-            /** Fail on connection open. */
-            SERVER_CONNECTION_OPENED,
-
-            /** Fail on client response. */
-            SERVER_CLIENT_RESPONDED
-        }
-    }
-
-    /**
-     * {@link RecoveryClientHandshakeManager} that can fail at specific stage of the handshake.
-     */
-    static class FailingRecoveryClientHandshakeManager extends RecoveryClientHandshakeManager {
-        /**
-         * At what stage to fail the handshake.
-         */
-        private final ClientStageFail failAtStage;
-
-        /** Constructor. */
-        private FailingRecoveryClientHandshakeManager(
-                UUID launchId,
-                String consistentId,
-                ClientStageFail failAtStage,
-                NetworkMessagesFactory messageFactory
-        ) {
-            super(launchId, consistentId, messageFactory);
-            this.failAtStage = failAtStage;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public HandshakeResult init(Channel channel) {
-            if (failAtStage == CLIENT_INIT) {
-                handshakeFuture().completeExceptionally(new RuntimeException());
-                return HandshakeResult.fail();
-            }
-
-            return super.init(channel);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public HandshakeResult onConnectionOpen(Channel channel) {
-            if (failAtStage == CLIENT_CONNECTION_OPENED) {
-                handshakeFuture().completeExceptionally(new RuntimeException());
-                return HandshakeResult.fail();
-            }
-
-            return super.onConnectionOpen(channel);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
-            if (failAtStage == CLIENT_SERVER_RESPONDED) {
-                handshakeFuture().completeExceptionally(new RuntimeException());
-                return HandshakeResult.fail();
-            }
-
-            return super.onMessage(channel, message);
-        }
-
-        /** Client handshake stage to fail at. */
-        enum ClientStageFail {
-            /** Don't fail at all. */
-            CLIENT_DOESNT_FAIL,
-
-            /** Fail on init. */
-            CLIENT_INIT,
-
-            /** Fail on connection open. */
-            CLIENT_CONNECTION_OPENED,
-
-            /** Fail on server response. */
-            CLIENT_SERVER_RESPONDED
-        }
-    }
-
-    /**
-     * Starts a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
-     *
-     * @param port                  Port for the server.
-     * @param serverHandshakeFailAt At what stage to fail server handshake.
-     * @param clientHandshakeFailAt At what stage to fail client handshake.
-     * @return Connection manager.
-     */
-    private ConnectionManager startManager(
-            int port,
-            ServerStageFail serverHandshakeFailAt,
-            ClientStageFail clientHandshakeFailAt
-    ) {
-        var registry = new TestMessageSerializationRegistryImpl();
-        var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
-
-        var messageFactory = new NetworkMessagesFactory();
-
-        UUID launchId = UUID.randomUUID();
-        String consistentId = UUID.randomUUID().toString();
-
-        networkConfiguration.port().update(port).join();
-
-        NetworkView cfg = networkConfiguration.value();
-
-        NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
-        bootstrapFactory.start();
-        startedBootstrapFactories.add(bootstrapFactory);
-
-        var manager = new ConnectionManager(
-                cfg,
-                serializationService,
-                consistentId,
-                () -> new FailingRecoveryServerHandshakeManager(launchId, consistentId, serverHandshakeFailAt, messageFactory),
-                () -> new FailingRecoveryClientHandshakeManager(launchId, consistentId, clientHandshakeFailAt, messageFactory),
-                bootstrapFactory
-        );
-
-        manager.start();
-
-        startedManagers.add(manager);
-
-        return manager;
-    }
-
-    /**
-     * Starts a {@link ConnectionManager} with a normal handshake manager adding it to the {@link #startedManagers} list.
-     *
-     * @param port Port.
-     * @return Connection manager
-     */
-    private ConnectionManager startManager(int port) {
-        var registry = new TestMessageSerializationRegistryImpl();
-        var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
-
-        var messageFactory = new NetworkMessagesFactory();
-
-        UUID launchId = UUID.randomUUID();
-        String consistentId = UUID.randomUUID().toString();
-
-        networkConfiguration.port().update(port).join();
-
-        NetworkView cfg = networkConfiguration.value();
-
-        NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
-        bootstrapFactory.start();
-        startedBootstrapFactories.add(bootstrapFactory);
-
-        var manager = new ConnectionManager(
-                cfg,
-                serializationService,
-                consistentId,
-                () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
-                () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
-                bootstrapFactory
-        );
-
-        manager.start();
-
-        startedManagers.add(manager);
-
-        return manager;
-    }
+//    /** Started connection managers. */
+//    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+//
+//    /** Started bootstrap factories. */
+//    private final List<NettyBootstrapFactory> startedBootstrapFactories = new ArrayList<>();
+//
+//    private final TestMessagesFactory messageFactory = new TestMessagesFactory();
+//
+//    /** Reusable network configuration object. */
+//    @InjectConfiguration
+//    private NetworkConfiguration networkConfiguration;
+//
+//    /**
+//     * After each.
+//     */
+//    @AfterEach
+//    final void tearDown() throws Exception {
+//        startedManagers.forEach(ConnectionManager::stop);
+//
+//        for (NettyBootstrapFactory startedBootstrapFactory : startedBootstrapFactories) {
+//            startedBootstrapFactory.stop();
+//        }
+//    }
+//
+//    /**
+//     * Tests handshake scenarios in which some of the parts of handshake protocol can fail.
+//     *
+//     * @param scenario Handshake scenario.
+//     * @throws Exception If failed.
+//     */
+//    @ParameterizedTest
+//    @MethodSource("handshakeScenarios")
+//    public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
+//        ConnectionManager manager1 = startManager(
+//                4000,
+//                scenario.serverFailAt,
+//                CLIENT_DOESNT_FAIL
+//        );
+//
+//        ConnectionManager manager2 = startManager(
+//                4001,
+//                SERVER_DOESNT_FAIL,
+//                scenario.clientFailAt
+//        );
+//
+//        NettySender from2to1;
+//
+//        try {
+//            from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
+//        } catch (Exception e) {
+//            if (scenario.clientFailAt == CLIENT_DOESNT_FAIL && scenario.serverFailAt == SERVER_DOESNT_FAIL) {
+//                Assertions.fail(e);
+//            }
+//
+//            return;
+//        }
+//
+//        if (scenario.clientFailAt != CLIENT_DOESNT_FAIL || scenario.serverFailAt != SERVER_DOESNT_FAIL) {
+//            Assertions.fail("Handshake should've failed");
+//        }
+//
+//        assertNotNull(from2to1);
+//
+//        // Ensure the handshake has finished on both sides.
+//        from2to1.send(messageFactory.testMessage().msg("test").build()).get(3, TimeUnit.SECONDS);
+//
+//        NettySender from1to2 = manager1.channel(manager2.consistentId(), manager2.getLocalAddress()).get(3, TimeUnit.SECONDS);
+//
+//        assertNotNull(from1to2);
+//
+//        assertEquals(from2to1.channel().localAddress(), from1to2.channel().remoteAddress());
+//    }
+//
+//    /**
+//     * Tests special handshake scenario: the client assumes a handshake has been finished, but the server fails on client's response. The
+//     * server will then close a connection and the client should get the "connection closed event".
+//     *
+//     * @throws Exception If failed.
+//     */
+//    @Test
+//    public void testHandshakeFailsOnServerWhenClientResponded() throws Exception {
+//        ConnectionManager manager1 = startManager(
+//                4000,
+//                SERVER_CLIENT_RESPONDED,
+//                CLIENT_DOESNT_FAIL
+//        );
+//
+//        ConnectionManager manager2 = startManager(
+//                4001,
+//                SERVER_DOESNT_FAIL,
+//                CLIENT_DOESNT_FAIL
+//        );
+//
+//        NettySender from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
+//
+//        from2to1.channel().closeFuture().get(3, TimeUnit.SECONDS);
+//    }
+//
+//    /**
+//     * Tests that one server can handle multiple incoming connections and perform the handshake operation successfully.
+//     *
+//     * @throws Exception If failed.
+//     */
+//    @Test
+//    public void testHandshakeWithMultipleClients() throws Exception {
+//        ConnectionManager server = startManager(4000);
+//
+//        List<ConnectionManager> clients = new ArrayList<>();
+//
+//        int clientCount = 10;
+//
+//        for (int i = 0; i < clientCount; i++) {
+//            clients.add(startManager(4001 + i));
+//        }
+//
+//        // A key is the client's consistent id, a value is the channel between the client and the server
+//        var channelsToServer = new HashMap<String, NettySender>();
+//
+//        for (ConnectionManager client : clients) {
+//            channelsToServer.put(
+//                    client.consistentId(),
+//                    client.channel(server.consistentId(), server.getLocalAddress()).get(3, TimeUnit.SECONDS)
+//            );
+//        }
+//
+//        assertTrue(waitForCondition(() -> server.channels().size() == clientCount, TimeUnit.SECONDS.toMillis(3)));
+//
+//        Map<String, NettySender> channels = server.channels();
+//
+//        // Assert that server's channels are inbound connections opened from clients
+//        channelsToServer.forEach((consistentId, toServer) -> {
+//            NettySender toClient = channels.get(consistentId);
+//
+//            assertNotNull(toClient);
+//
+//            assertEquals(toServer.channel().localAddress(), toClient.channel().remoteAddress());
+//        });
+//    }
+//
+//    /**
+//     * Returns generates handshake scenarios.
+//     *
+//     * @return Generates handshake scenarios.
+//     */
+//    private static List<HandshakeScenario> handshakeScenarios() {
+//        ServerStageFail[] serverOpts = ServerStageFail.values();
+//
+//        ClientStageFail[] clientOpts = ClientStageFail.values();
+//
+//        List<HandshakeScenario> res = new ArrayList<>();
+//
+//        for (ServerStageFail serverOpt : serverOpts) {
+//            for (ClientStageFail clientOpt : clientOpts) {
+//                // The case in if statement is handled in separate test
+//                if (serverOpt != SERVER_CLIENT_RESPONDED && clientOpt != CLIENT_DOESNT_FAIL) {
+//                    res.add(new HandshakeScenario(serverOpt, clientOpt));
+//                }
+//            }
+//        }
+//
+//        return res;
+//    }
+//
+//    /** Handshake scenario. */
+//    private static class HandshakeScenario {
+//        /** Stage to fail server handshake at. */
+//        private final ServerStageFail serverFailAt;
+//
+//        /** Stage to fail client handshake at. */
+//        private final ClientStageFail clientFailAt;
+//
+//        /** Constructor. */
+//        private HandshakeScenario(ServerStageFail serverFailAt, ClientStageFail clientFailAt) {
+//            this.serverFailAt = serverFailAt;
+//            this.clientFailAt = clientFailAt;
+//        }
+//
+//        /** {@inheritDoc} */
+//        @Override
+//        public String toString() {
+//            return String.format("server=%s, client=%s", serverFailAt, clientFailAt);
+//        }
+//    }
+//
+//    /**
+//     * {@link RecoveryServerHandshakeManager} that can fail at specific stage of the handshake.
+//     */
+//    static class FailingRecoveryServerHandshakeManager extends RecoveryServerHandshakeManager {
+//        /**
+//         * At what stage to fail the handshake.
+//         */
+//        private final ServerStageFail failAtStage;
+//
+//        /** Constructor. */
+//        private FailingRecoveryServerHandshakeManager(
+//                UUID launchId,
+//                String consistentId,
+//                ServerStageFail failAtStage,
+//                NetworkMessagesFactory messageFactory
+//        ) {
+//            super(launchId, consistentId, messageFactory);
+//            this.failAtStage = failAtStage;
+//        }
+//
+//        /** {@inheritDoc} */
+//        @Override
+//        public HandshakeResult init(Channel channel) {
+//            if (failAtStage == SERVER_INIT) {
+//                handshakeFuture().completeExceptionally(new RuntimeException());
+//                return HandshakeResult.fail();
+//            }
+//
+//            return super.init(channel);
+//        }
+//
+//        /** {@inheritDoc} */
+//        @Override
+//        public HandshakeResult onConnectionOpen(Channel channel) {
+//            if (failAtStage == SERVER_CONNECTION_OPENED) {
+//                handshakeFuture().completeExceptionally(new RuntimeException());
+//                return HandshakeResult.fail();
+//            }
+//
+//            return super.onConnectionOpen(channel);
+//        }
+//
+//        /** {@inheritDoc} */
+//        @Override
+//        public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+//            if (failAtStage == SERVER_CLIENT_RESPONDED) {
+//                handshakeFuture().completeExceptionally(new RuntimeException());
+//                return HandshakeResult.fail();
+//            }
+//
+//            return super.onMessage(channel, message);
+//        }
+//
+//        /** Server handshake stage to fail at. */
+//        enum ServerStageFail {
+//            /** Don't fail at all. */
+//            SERVER_DOESNT_FAIL,
+//
+//            /** Fail on init. */
+//            SERVER_INIT,
+//
+//            /** Fail on connection open. */
+//            SERVER_CONNECTION_OPENED,
+//
+//            /** Fail on client response. */
+//            SERVER_CLIENT_RESPONDED
+//        }
+//    }
+//
+//    /**
+//     * {@link RecoveryClientHandshakeManager} that can fail at specific stage of the handshake.
+//     */
+//    static class FailingRecoveryClientHandshakeManager extends RecoveryClientHandshakeManager {
+//        /**
+//         * At what stage to fail the handshake.
+//         */
+//        private final ClientStageFail failAtStage;
+//
+//        /** Constructor. */
+//        private FailingRecoveryClientHandshakeManager(
+//                UUID launchId,
+//                String consistentId,
+//                ClientStageFail failAtStage,
+//                NetworkMessagesFactory messageFactory
+//        ) {
+//            super(launchId, consistentId, messageFactory);
+//            this.failAtStage = failAtStage;
+//        }
+//
+//        /** {@inheritDoc} */
+//        @Override
+//        public HandshakeResult init(Channel channel) {
+//            if (failAtStage == CLIENT_INIT) {
+//                handshakeFuture().completeExceptionally(new RuntimeException());
+//                return HandshakeResult.fail();
+//            }
+//
+//            return super.init(channel);
+//        }
+//
+//        /** {@inheritDoc} */
+//        @Override
+//        public HandshakeResult onConnectionOpen(Channel channel) {
+//            if (failAtStage == CLIENT_CONNECTION_OPENED) {
+//                handshakeFuture().completeExceptionally(new RuntimeException());
+//                return HandshakeResult.fail();
+//            }
+//
+//            return super.onConnectionOpen(channel);
+//        }
+//
+//        /** {@inheritDoc} */
+//        @Override
+//        public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+//            if (failAtStage == CLIENT_SERVER_RESPONDED) {
+//                handshakeFuture().completeExceptionally(new RuntimeException());
+//                return HandshakeResult.fail();
+//            }
+//
+//            return super.onMessage(channel, message);
+//        }
+//
+//        /** Client handshake stage to fail at. */
+//        enum ClientStageFail {
+//            /** Don't fail at all. */
+//            CLIENT_DOESNT_FAIL,
+//
+//            /** Fail on init. */
+//            CLIENT_INIT,
+//
+//            /** Fail on connection open. */
+//            CLIENT_CONNECTION_OPENED,
+//
+//            /** Fail on server response. */
+//            CLIENT_SERVER_RESPONDED
+//        }
+//    }
+//
+//    /**
+//     * Starts a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
+//     *
+//     * @param port                  Port for the server.
+//     * @param serverHandshakeFailAt At what stage to fail server handshake.
+//     * @param clientHandshakeFailAt At what stage to fail client handshake.
+//     * @return Connection manager.
+//     */
+//    private ConnectionManager startManager(
+//            int port,
+//            ServerStageFail serverHandshakeFailAt,
+//            ClientStageFail clientHandshakeFailAt
+//    ) {
+//        var registry = new TestMessageSerializationRegistryImpl();
+//        var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
+//
+//        var messageFactory = new NetworkMessagesFactory();
+//
+//        UUID launchId = UUID.randomUUID();
+//        String consistentId = UUID.randomUUID().toString();
+//
+//        networkConfiguration.port().update(port).join();
+//
+//        NetworkView cfg = networkConfiguration.value();
+//
+//        NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
+//        bootstrapFactory.start();
+//        startedBootstrapFactories.add(bootstrapFactory);
+//
+//        var manager = new ConnectionManager(
+//                cfg,
+//                serializationService,
+//                consistentId,
+//                () -> new FailingRecoveryServerHandshakeManager(launchId, consistentId, serverHandshakeFailAt, messageFactory),
+//                () -> new FailingRecoveryClientHandshakeManager(launchId, consistentId, clientHandshakeFailAt, messageFactory),
+//                bootstrapFactory
+//        );
+//
+//        manager.start();
+//
+//        startedManagers.add(manager);
+//
+//        return manager;
+//    }
+//
+//    /**
+//     * Starts a {@link ConnectionManager} with a normal handshake manager adding it to the {@link #startedManagers} list.
+//     *
+//     * @param port Port.
+//     * @return Connection manager
+//     */
+//    private ConnectionManager startManager(int port) {
+//        var registry = new TestMessageSerializationRegistryImpl();
+//        var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
+//
+//        var messageFactory = new NetworkMessagesFactory();
+//
+//        UUID launchId = UUID.randomUUID();
+//        String consistentId = UUID.randomUUID().toString();
+//
+//        networkConfiguration.port().update(port).join();
+//
+//        NetworkView cfg = networkConfiguration.value();
+//
+//        NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
+//        bootstrapFactory.start();
+//        startedBootstrapFactories.add(bootstrapFactory);
+//
+//        var manager = new ConnectionManager(
+//                cfg,
+//                serializationService,
+//                consistentId,
+//                () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
+//                () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
+//                bootstrapFactory
+//        );
+//
+//        manager.start();
+//
+//        startedManagers.add(manager);
+//
+//        return manager;
+//    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/Communication.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/Communication.java
new file mode 100644
index 0000000..0421efa
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/Communication.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.NetworkMessage;
+
+public class Communication {
+    private final CommunicationPool pool;
+
+    private final NettySender sender;
+
+    public Communication(CommunicationPool pool, NettySender sender) {
+        this.pool = pool;
+        this.sender = sender;
+    }
+
+    public CompletableFuture<Void> sendAndRelease(NetworkMessage msg) {
+        return sender.send(msg).thenAccept(unused -> release());
+    }
+
+    public CompletableFuture<Void> send(NetworkMessage msg) {
+        return sender.send(msg);
+    }
+
+    public void release() {
+        pool.release(this);
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/CommunicationHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/CommunicationHandler.java
new file mode 100644
index 0000000..e6e1258
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/CommunicationHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.net.SocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.jetbrains.annotations.Nullable;
+
+public class CommunicationHandler {
+
+    private final ConcurrentMap<String, CommunicationPool> connections = new ConcurrentHashMap<>();
+
+    private final ConnectionManager connectionManager;
+
+    public CommunicationHandler(ConnectionManager connectionManager) {
+        this.connectionManager = connectionManager;
+    }
+
+    public CompletableFuture<Communication> communication(@Nullable String consistentId, SocketAddress address) {
+        if (consistentId != null) {
+            CommunicationPool communicationPool = connections.computeIfAbsent(consistentId, s -> {
+                return new CommunicationPool(address, connectionManager);
+            });
+
+            return communicationPool.get();
+        }
+
+        return connectionManager.connect(address).thenApply((sender) -> afterClientConnected(address, sender));
+    }
+
+    public Communication afterClientConnected(SocketAddress address, NettySender sender) {
+        CommunicationPool pool = connections.computeIfAbsent(sender.consistentId(), s -> {
+            return new CommunicationPool(address, connectionManager);
+        });
+
+        return new Communication(pool, sender);
+    }
+
+    public boolean isStopped() {
+        return connectionManager.isStopped();
+    }
+
+    public String consistentId() {
+        return connectionManager.consistentId();
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/CommunicationPool.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/CommunicationPool.java
new file mode 100644
index 0000000..ce260d7
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/CommunicationPool.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.net.SocketAddress;
+import java.util.Deque;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+public class CommunicationPool {
+    private final Deque<Communication> queue = new ConcurrentLinkedDeque<>();
+    private final SocketAddress address;
+    private final ConnectionManager connectionManager;
+
+    public CommunicationPool(SocketAddress address, ConnectionManager connectionManager) {
+        this.address = address;
+        this.connectionManager = connectionManager;
+    }
+
+    public void release(Communication communication) {
+        queue.offer(communication);
+    }
+
+    public CompletableFuture<Communication> get() {
+        Communication communication = queue.pollFirst();
+
+        if (communication != null) {
+            return CompletableFuture.completedFuture(communication);
+        }
+
+        return connectionManager.connect(address).thenApply(sender -> {
+            return new Communication(this, sender);
+        });
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 5d40854..478ba0b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -18,18 +18,9 @@
 package org.apache.ignite.internal.network.netty;
 
 import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import java.net.SocketAddress;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -37,17 +28,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.ignite.configuration.schemas.network.NetworkView;
-import org.apache.ignite.configuration.schemas.network.OutboundView;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkMessage;
-import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -66,12 +53,6 @@ public class ConnectionManager {
     /** Server. */
     private final NettyServer server;
 
-    /** Channels map from consistentId to {@link NettySender}. */
-    private final Map<ChannelKey, NettySender> channels = new ConcurrentHashMap<>();
-
-    /** Clients. */
-    private final Map<ClientKey, NettyClient> clients = new ConcurrentHashMap<>();
-
     /** Serialization service. */
     private final SerializationService serializationService;
 
@@ -163,104 +144,6 @@ public class ConnectionManager {
         return server.address();
     }
 
-    public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
-        return channel(consistentId, address, 0);
-    }
-
-    /**
-     * Gets a {@link NettySender}, that sends data from this node to another node with the specified address.
-     *
-     * @param consistentId Another node's consistent id.
-     * @param address      Another node's address.
-     * @return Sender.
-     */
-    public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address, int messageType) {
-        // LOG.info("Requesting channel=" + consistentId + " address=" + address);
-        if (consistentId != null) {
-            // If consistent id is known, try looking up a channel by consistent id. There can be an outbound connection
-            // or an inbound connection associated with that consistent id.
-            var key = new ChannelKey(consistentId, messageType);
-            NettySender channel = channels.compute(
-                    key,
-                    (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
-            );
-
-            if (channel != null) {
-                return CompletableFuture.completedFuture(channel);
-            }
-        }
-
-        // Get an existing client or create a new one. NettyClient provides a CompletableFuture that resolves
-        // when the client is ready for write operations, so previously started client, that didn't establish connection
-        // or didn't perform the handhsake operaton, can be reused.
-        ClientKey clientKey = new ClientKey(address, messageType);
-        NettyClient client = clients.compute(clientKey, (addr, existingClient) ->
-                existingClient != null && !existingClient.failedToConnect() && !existingClient.isDisconnected()
-                        ? existingClient : connect(addr)
-        );
-
-        CompletableFuture<NettySender> sender = client.sender();
-
-        assert sender != null;
-
-        return sender;
-    }
-
-    private static class ClientKey {
-        private final SocketAddress address;
-
-        private final int msgGroup;
-
-        private ClientKey(SocketAddress address, int msgGroup) {
-            this.address = address;
-            this.msgGroup = msgGroup;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            ClientKey clientKey = (ClientKey) o;
-            return msgGroup == clientKey.msgGroup && Objects.equals(address, clientKey.address);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(address, msgGroup);
-        }
-    }
-
-    private static class ChannelKey {
-        private final String consistentId;
-
-        private final int msgGroup;
-
-        private ChannelKey(String consistentId, int msgGroup) {
-            this.consistentId = consistentId;
-            this.msgGroup = msgGroup;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            ChannelKey that = (ChannelKey) o;
-            return msgGroup == that.msgGroup && Objects.equals(consistentId, that.consistentId);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(consistentId, msgGroup);
-        }
-    }
 
     private ExecutorService svc = Executors.newFixedThreadPool(20);
 
@@ -292,23 +175,15 @@ public class ConnectionManager {
      * @param key Client key.
      * @return New netty client.
      */
-    private NettyClient connect(ClientKey key) {
+    public CompletableFuture<NettySender> connect(SocketAddress address) {
         var client = new NettyClient(
-                key.address,
+                address,
                 serializationService,
                 clientHandshakeManagerFactory.get(),
                 this::onMessage
         );
 
-        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
-            if (throwable == null) {
-                channels.put(new ChannelKey(sender.consistentId(), key.msgGroup), sender);
-            } else {
-                clients.remove(key.address);
-            }
-        });
-
-        return client;
+        return client.start(clientBootstrap);
     }
 
     /**
@@ -330,15 +205,8 @@ public class ConnectionManager {
             return;
         }
 
-        Stream<CompletableFuture<Void>> stream = Stream.concat(
-                clients.values().stream().map(NettyClient::stop),
-                Stream.of(server.stop())
-        );
-
-        CompletableFuture<Void> stopFut = CompletableFuture.allOf(stream.toArray(CompletableFuture<?>[]::new));
-
         try {
-            stopFut.join();
+            server.stop().join();
         } catch (Exception e) {
             LOG.warn("Failed to stop the ConnectionManager: {}", e.getMessage());
         }
@@ -374,48 +242,4 @@ public class ConnectionManager {
     public String consistentId() {
         return consistentId;
     }
-
-    /**
-     * Returns collection of all the clients started by this connection manager.
-     *
-     * @return Collection of all the clients started by this connection manager.
-     */
-    @TestOnly
-    public Collection<NettyClient> clients() {
-        return Collections.unmodifiableCollection(clients.values());
-    }
-
-
-    /**
-     * Returns map of the channels.
-     *
-     * @return Map of the channels.
-     */
-    @TestOnly
-    public Map<String, NettySender> channels() {
-        return channels.entrySet().stream().collect(Collectors.toMap(k -> k.getKey().consistentId, Entry::getValue));
-    }
-
-    /**
-     * Creates a {@link Bootstrap} for clients with channel options provided by a {@link OutboundView}.
-     *
-     * @param eventLoopGroup      Event loop group for channel handling.
-     * @param clientConfiguration Client configuration.
-     * @return Bootstrap for clients.
-     */
-    public static Bootstrap createClientBootstrap(
-            EventLoopGroup eventLoopGroup,
-            OutboundView clientConfiguration
-    ) {
-        Bootstrap clientBootstrap = new Bootstrap();
-
-        clientBootstrap.group(eventLoopGroup)
-                .channel(NioSocketChannel.class)
-                // See NettyServer#start for netty configuration details.
-                .option(ChannelOption.SO_KEEPALIVE, clientConfiguration.soKeepAlive())
-                .option(ChannelOption.SO_LINGER, clientConfiguration.soLinger())
-                .option(ChannelOption.TCP_NODELAY, clientConfiguration.tcpNoDelay());
-
-        return clientBootstrap;
-    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 677f365..4c0d3cf 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.message.InvokeRequest;
 import org.apache.ignite.internal.network.message.InvokeResponse;
-import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.netty.CommunicationHandler;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
 import org.apache.ignite.internal.network.netty.NettySender;
 import org.apache.ignite.lang.NodeStoppingException;
@@ -46,7 +46,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
     private final TopologyService topologyService;
 
     /** Connection manager that provides access to {@link NettySender}. */
-    private volatile ConnectionManager connectionManager;
+    private volatile CommunicationHandler connectionManager;
 
     /**
      * This node's local socket address. Not volatile, because access is guarded by volatile reads/writes to the {@code connectionManager}
@@ -84,7 +84,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
      */
     public void setConnectionManager(ConnectionManager connectionManager) {
         this.localAddress = (InetSocketAddress) connectionManager.getLocalAddress();
-        this.connectionManager = connectionManager;
+        this.connectionManager = new CommunicationHandler(connectionManager);
         connectionManager.addListener(this::onMessage);
     }
 
@@ -158,7 +158,9 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         String recipientConsistentId = recipient != null ? recipient.name() : address.consistentId();
 
-        return connectionManager.channel(recipientConsistentId, addr, msg.groupType()).thenCompose(sender -> sender.send(message));
+        return connectionManager.communication(recipientConsistentId, addr).thenCompose(communication -> {
+            return communication.sendAndRelease(message);
+        });
     }
 
     /**
@@ -194,7 +196,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         String recipientConsistentId = recipient != null ? recipient.name() : addr.consistentId();
 
-        return connectionManager.channel(recipientConsistentId, address, msg.groupType()).thenCompose(sender -> sender.send(message))
+        return connectionManager.communication(recipientConsistentId, address).thenCompose(sender -> sender.sendAndRelease(message))
                 .thenCompose(unused -> responseFuture);
     }
 
@@ -219,11 +221,6 @@ public class DefaultMessagingService extends AbstractMessagingService {
      * @param msg Incoming message.
      */
     private void onMessage(String consistentId, NetworkMessage msg) {
-        if (msg instanceof ScaleCubeMessage) {
-            // ScaleCube messages are handled in the ScaleCubeTransport
-            return;
-        }
-
         if (msg instanceof InvokeResponse) {
             InvokeResponse response = (InvokeResponse) msg;
             onInvokeResponse(response.message(), response.correlationId());
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 668a57e..07a6e13 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -104,31 +104,32 @@ public class ScaleCubeClusterServiceFactory {
                         nettyBootstrapFactory
                 );
 
-                var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr, topologyService, messageFactory);
+                connectionMgr.start();
+
+                // resolve cyclic dependencies
+                messagingService.setConnectionManager(connectionMgr);
+
+                var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr.getLocalAddress(), messagingService, topologyService, messageFactory);
 
                 NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder());
 
                 cluster = new ClusterImpl(clusterConfig(configView.membership()))
-                        .handler(cl -> new ClusterMessageHandler() {
-                            /** {@inheritDoc} */
-                            @Override
-                            public void onMembershipEvent(MembershipEvent event) {
-                                topologyService.onMembershipEvent(event);
-                            }
-                        })
-                        .config(opts -> opts.memberAlias(consistentId))
-                        .transport(opts -> opts.transportFactory(transportConfig -> transport))
-                        .membership(opts -> opts.seedMembers(parseAddresses(finder.findNodes())));
+                    .handler(cl -> new ClusterMessageHandler() {
+                        /** {@inheritDoc} */
+                        @Override
+                        public void onMembershipEvent(MembershipEvent event) {
+                            topologyService.onMembershipEvent(event);
+                        }
+                    })
+                    .config(opts -> opts.memberAlias(consistentId))
+                    .transport(opts -> opts.transportFactory(transportConfig -> transport))
+                    .membership(opts -> opts.seedMembers(parseAddresses(finder.findNodes())));
 
                 shutdownFuture = cluster.onShutdown().toFuture();
 
-                connectionMgr.start();
+                cluster.startAwait();
 
-                // resolve cyclic dependencies
                 topologyService.setCluster(cluster);
-                messagingService.setConnectionManager(connectionMgr);
-
-                cluster.startAwait();
 
                 // emit an artificial event as if the local member has joined the topology (ScaleCube doesn't do that)
                 var localMembershipEvent = MembershipEvent.createAdded(cluster.member(), null, System.currentTimeMillis());
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index ad763de..8d26bd9 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Map;
 import java.util.Objects;
+import org.apache.ignite.internal.network.NetworkMessageTypes;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import org.apache.ignite.internal.network.message.ScaleCubeMessageBuilder;
@@ -32,6 +33,7 @@ import org.apache.ignite.internal.network.netty.ConnectionManager;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
@@ -62,15 +64,14 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
     /** On stop. */
     private final MonoProcessor<Void> onStop = MonoProcessor.create();
 
-    /** Connection manager. */
-    private final ConnectionManager connectionManager;
-
     /** Message factory. */
     private final NetworkMessagesFactory messageFactory;
 
     /** Topology service. */
     private final ScaleCubeTopologyService topologyService;
 
+    private final MessagingService messagingService;
+
     /** Node address. */
     private Address address;
 
@@ -82,21 +83,26 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
      * @param messageFactory    message factory
      */
     ScaleCubeDirectMarshallerTransport(
-            ConnectionManager connectionManager,
+            SocketAddress address,
+            MessagingService messagingService,
             ScaleCubeTopologyService topologyService,
             NetworkMessagesFactory messageFactory
     ) {
-        this.connectionManager = connectionManager;
+        this.address = prepareAddress(address);
+        this.messagingService = messagingService;
         this.topologyService = topologyService;
         this.messageFactory = messageFactory;
 
-        this.connectionManager.addListener(this::onMessage);
+        this.messagingService.addMessageHandler(NetworkMessageTypes.class, (message, senderAddr, correlationId) -> {
+            this.onMessage(message);
+        });
+
         // Setup cleanup
         stop.then(doStop())
                 .doFinally(s -> onStop.onComplete())
                 .subscribe(
                         null,
-                        ex -> LOG.warn("Failed to stop {}: {}", address, ex.toString())
+                        ex -> LOG.warn("Failed to stop {}: {}", this.address, ex.toString())
                 );
     }
 
@@ -145,8 +151,6 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
     /** {@inheritDoc} */
     @Override
     public Mono<Transport> start() {
-        address = prepareAddress(connectionManager.getLocalAddress());
-
         return Mono.just(this);
     }
 
@@ -171,21 +175,24 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
         var addr = InetSocketAddress.createUnresolved(address.host(), address.port());
 
         return Mono.fromFuture(() -> {
-            ClusterNode node = topologyService.getByAddress(NetworkAddress.from(addr));
+            NetworkAddress networkAddress = NetworkAddress.from(addr);
+
+            ClusterNode node = topologyService.getByAddress(networkAddress);
 
-            String consistentId = node != null ? node.name() : null;
+            if (node == null) {
+                node = new ClusterNode(null, null, networkAddress);
+            }
 
-            return connectionManager.channel(consistentId, addr).thenCompose(client -> client.send(fromMessage(message)));
+            return messagingService.send(node, fromMessage(message));
         });
     }
 
     /**
      * Handles new network messages from {@link #connectionManager}.
      *
-     * @param senderConsistentId Sender's consistent id.
      * @param msg    Network message.
      */
-    private void onMessage(String senderConsistentId, NetworkMessage msg) {
+    private void onMessage(NetworkMessage msg) {
         Message message = fromNetworkMessage(msg);
 
         if (message != null) {