You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/05/13 15:41:25 UTC

[ignite-3] branch main updated: IGNITE-14085 Implement network recovery protocol

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c075cdd8 IGNITE-14085 Implement network recovery protocol
8c075cdd8 is described below

commit 8c075cdd8defd5ff811c1fa68864650d22c7d06e
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Sun May 1 01:59:42 2022 +0400

    IGNITE-14085 Implement network recovery protocol
---
 .../internal/network/processor/MessageClass.java   |   1 +
 .../org/apache/ignite/network/NetworkMessage.java  |  11 +-
 .../network/netty/ItConnectionManagerTest.java     |   7 +-
 .../network/recovery/ItRecoveryHandshakeTest.java  | 478 ---------------------
 .../internal/network/NetworkMessageTypes.java      |  18 +-
 .../network/handshake/HandshakeManager.java        |  25 +-
 .../internal/network/netty/ConnectionManager.java  |  74 ++--
 .../netty/DefaultRecoveryDescriptorProvider.java   | 109 +++++
 .../internal/network/netty/HandshakeHandler.java   |  61 +--
 .../internal/network/netty/InboundDecoder.java     |   3 +
 .../network/netty/InboundRecoveryHandler.java      |  72 ++++
 .../netty/IoExceptionSuppressingHandler.java       |   3 +
 .../internal/network/netty/MessageHandler.java     |   8 +
 .../ignite/internal/network/netty/NettyClient.java |  11 +-
 .../ignite/internal/network/netty/NettyServer.java |  21 +-
 .../internal/network/netty/OutboundEncoder.java    |   3 +
 .../network/netty/OutboundRecoveryHandler.java     |  54 +++
 .../internal/network/netty/PipelineUtils.java      |  68 +++
 .../recovery/RecoveryClientHandshakeManager.java   | 167 +++++--
 .../network/recovery/RecoveryDescriptor.java       | 127 ++++++
 ...essage.java => RecoveryDescriptorProvider.java} |  27 +-
 .../recovery/RecoveryServerHandshakeManager.java   | 148 ++++++-
 ...artMessage.java => AcknowledgementMessage.java} |  24 +-
 ...artMessage.java => HandshakeFinishMessage.java} |  25 +-
 .../recovery/message/HandshakeStartMessage.java    |   6 +-
 .../message/HandshakeStartResponseMessage.java     |  12 +-
 ...shakeStartMessage.java => InternalMessage.java} |  26 +-
 .../apache/ignite/network/OutNetworkObject.java    |  27 ++
 .../scalecube/ScaleCubeClusterServiceFactory.java  |   5 +-
 .../internal/network/netty/NettyClientTest.java    |  16 +-
 .../internal/network/netty/NettyServerTest.java    |  11 +-
 .../network/netty/RecoveryHandshakeTest.java       | 440 +++++++++++++++++++
 32 files changed, 1315 insertions(+), 773 deletions(-)

diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/MessageClass.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/MessageClass.java
index d00712727..caeab8d6d 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/MessageClass.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/MessageClass.java
@@ -81,6 +81,7 @@ public class MessageClass {
                 .filter(e -> !typeUtils.isSameType(e.asType(), NetworkMessage.class))
                 .flatMap(e -> e.getEnclosedElements().stream())
                 .filter(e -> e.getKind() == ElementKind.METHOD)
+                .filter(e -> !((ExecutableElement) e).isDefault())
                 // use a tree map to sort getters by name and remove duplicates
                 .collect(Collectors.toMap(
                         e -> e.getSimpleName().toString(),
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
index c31acf7ca..e0d00cf1e 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
@@ -24,7 +24,7 @@ import it.unimi.dsi.fastutil.ints.IntSet;
  */
 public interface NetworkMessage {
     /** Size of the message type (in bytes), used during (de-)serialization. */
-    static final int MSG_TYPE_SIZE_BYTES = 4;
+    int MSG_TYPE_SIZE_BYTES = 4;
 
     /**
      * Message type. Must be <b>distinct</b> among all messages in a <i>message group</i>. Only positive values are allowed.
@@ -51,4 +51,13 @@ public interface NetworkMessage {
     default void unmarshal(Object marshaller, Object descriptors) throws Exception {
         // No-op.
     }
+
+    /**
+     * Returns {@code true} if this message needs an acknowledgement from the remote node, {@code false} otherwise.
+     *
+     * @return {@code true} if this message needs an acknowledgement from the remote node, {@code false} otherwise.
+     */
+    default boolean needAck() {
+        return true;
+    }
 }
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 9f9b95786..fe4315490 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -46,8 +46,6 @@ import org.apache.ignite.configuration.schemas.network.NetworkView;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
-import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -147,7 +145,7 @@ public class ItConnectionManagerTest {
         NettySender senderFrom1to2 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
 
         // Ensure a handshake has finished on both sides by sending a message.
-        // TODO: IGNITE-14085 When the recovery protocol is implemented replace this with simple
+        // TODO: IGNITE-16947 When the recovery protocol is implemented replace this with simple
         // CompletableFuture#get called on the send future.
         var messageReceivedOn2 = new CompletableFuture<Void>();
 
@@ -353,9 +351,8 @@ public class ItConnectionManagerTest {
         var manager = new ConnectionManager(
                 cfg,
                 new SerializationService(registry, mock(UserObjectSerializationContext.class)),
+                launchId,
                 consistentId,
-                () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
-                () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
                 bootstrapFactory
         );
 
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
deleted file mode 100644
index f081d195d..000000000
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.network.recovery;
-
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_CONNECTION_OPENED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_DOESNT_FAIL;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_INIT;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_SERVER_RESPONDED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CLIENT_RESPONDED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
-import static org.apache.ignite.internal.network.recovery.ItRecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import io.netty.channel.Channel;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
-import org.apache.ignite.configuration.schemas.network.NetworkView;
-import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
-import org.apache.ignite.internal.network.netty.ConnectionManager;
-import org.apache.ignite.internal.network.netty.NettySender;
-import org.apache.ignite.internal.network.serialization.SerializationService;
-import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
-import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.OutNetworkObject;
-import org.apache.ignite.network.TestMessage;
-import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
-import org.apache.ignite.network.TestMessagesFactory;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-
-/**
- * Recovery protocol handshake tests.
- */
-@ExtendWith(ConfigurationExtension.class)
-public class ItRecoveryHandshakeTest {
-    /** Started connection managers. */
-    private final List<ConnectionManager> startedManagers = new ArrayList<>();
-
-    /** Started bootstrap factories. */
-    private final List<NettyBootstrapFactory> startedBootstrapFactories = new ArrayList<>();
-
-    private final TestMessagesFactory messageFactory = new TestMessagesFactory();
-
-    /** Reusable network configuration object. */
-    @InjectConfiguration
-    private NetworkConfiguration networkConfiguration;
-
-    /**
-     * After each.
-     */
-    @AfterEach
-    final void tearDown() throws Exception {
-        startedManagers.forEach(ConnectionManager::stop);
-
-        for (NettyBootstrapFactory startedBootstrapFactory : startedBootstrapFactories) {
-            startedBootstrapFactory.stop();
-        }
-    }
-
-    /**
-     * Tests handshake scenarios in which some of the parts of handshake protocol can fail.
-     *
-     * @param scenario Handshake scenario.
-     * @throws Exception If failed.
-     */
-    @ParameterizedTest
-    @MethodSource("handshakeScenarios")
-    public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
-        ConnectionManager manager1 = startManager(
-                4000,
-                scenario.serverFailAt,
-                CLIENT_DOESNT_FAIL
-        );
-
-        ConnectionManager manager2 = startManager(
-                4001,
-                SERVER_DOESNT_FAIL,
-                scenario.clientFailAt
-        );
-
-        NettySender from2to1;
-
-        try {
-            from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
-        } catch (Exception e) {
-            if (scenario.clientFailAt == CLIENT_DOESNT_FAIL && scenario.serverFailAt == SERVER_DOESNT_FAIL) {
-                Assertions.fail(e);
-            }
-
-            return;
-        }
-
-        if (scenario.clientFailAt != CLIENT_DOESNT_FAIL || scenario.serverFailAt != SERVER_DOESNT_FAIL) {
-            Assertions.fail("Handshake should've failed");
-        }
-
-        assertNotNull(from2to1);
-
-        // Ensure the handshake has finished on both sides.
-        TestMessage msg = messageFactory.testMessage().msg("test").build();
-        from2to1.send(new OutNetworkObject(msg, Collections.emptyList())).get(3, TimeUnit.SECONDS);
-
-        NettySender from1to2 = manager1.channel(manager2.consistentId(), manager2.getLocalAddress()).get(3, TimeUnit.SECONDS);
-
-        assertNotNull(from1to2);
-
-        assertEquals(from2to1.channel().localAddress(), from1to2.channel().remoteAddress());
-    }
-
-    /**
-     * Tests special handshake scenario: the client assumes a handshake has been finished, but the server fails on client's response. The
-     * server will then close a connection and the client should get the "connection closed event".
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testHandshakeFailsOnServerWhenClientResponded() throws Exception {
-        ConnectionManager manager1 = startManager(
-                4000,
-                SERVER_CLIENT_RESPONDED,
-                CLIENT_DOESNT_FAIL
-        );
-
-        ConnectionManager manager2 = startManager(
-                4001,
-                SERVER_DOESNT_FAIL,
-                CLIENT_DOESNT_FAIL
-        );
-
-        NettySender from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
-
-        from2to1.channel().closeFuture().get(3, TimeUnit.SECONDS);
-    }
-
-    /**
-     * Tests that one server can handle multiple incoming connections and perform the handshake operation successfully.
-     *
-     * @throws Exception If failed.
-     */
-    @Test
-    public void testHandshakeWithMultipleClients() throws Exception {
-        ConnectionManager server = startManager(4000);
-
-        List<ConnectionManager> clients = new ArrayList<>();
-
-        int clientCount = 10;
-
-        for (int i = 0; i < clientCount; i++) {
-            clients.add(startManager(4001 + i));
-        }
-
-        // A key is the client's consistent id, a value is the channel between the client and the server
-        var channelsToServer = new HashMap<String, NettySender>();
-
-        for (ConnectionManager client : clients) {
-            channelsToServer.put(
-                    client.consistentId(),
-                    client.channel(server.consistentId(), server.getLocalAddress()).get(3, TimeUnit.SECONDS)
-            );
-        }
-
-        assertTrue(waitForCondition(() -> server.channels().size() == clientCount, TimeUnit.SECONDS.toMillis(3)));
-
-        Map<String, NettySender> channels = server.channels();
-
-        // Assert that server's channels are inbound connections opened from clients
-        channelsToServer.forEach((consistentId, toServer) -> {
-            NettySender toClient = channels.get(consistentId);
-
-            assertNotNull(toClient);
-
-            assertEquals(toServer.channel().localAddress(), toClient.channel().remoteAddress());
-        });
-    }
-
-    /**
-     * Returns generates handshake scenarios.
-     *
-     * @return Generates handshake scenarios.
-     */
-    private static List<HandshakeScenario> handshakeScenarios() {
-        ServerStageFail[] serverOpts = ServerStageFail.values();
-
-        ClientStageFail[] clientOpts = ClientStageFail.values();
-
-        List<HandshakeScenario> res = new ArrayList<>();
-
-        for (ServerStageFail serverOpt : serverOpts) {
-            for (ClientStageFail clientOpt : clientOpts) {
-                // The case in if statement is handled in separate test
-                if (serverOpt != SERVER_CLIENT_RESPONDED && clientOpt != CLIENT_DOESNT_FAIL) {
-                    res.add(new HandshakeScenario(serverOpt, clientOpt));
-                }
-            }
-        }
-
-        return res;
-    }
-
-    /** Handshake scenario. */
-    private static class HandshakeScenario {
-        /** Stage to fail server handshake at. */
-        private final ServerStageFail serverFailAt;
-
-        /** Stage to fail client handshake at. */
-        private final ClientStageFail clientFailAt;
-
-        /** Constructor. */
-        private HandshakeScenario(ServerStageFail serverFailAt, ClientStageFail clientFailAt) {
-            this.serverFailAt = serverFailAt;
-            this.clientFailAt = clientFailAt;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public String toString() {
-            return String.format("server=%s, client=%s", serverFailAt, clientFailAt);
-        }
-    }
-
-    /**
-     * {@link RecoveryServerHandshakeManager} that can fail at specific stage of the handshake.
-     */
-    static class FailingRecoveryServerHandshakeManager extends RecoveryServerHandshakeManager {
-        /**
-         * At what stage to fail the handshake.
-         */
-        private final ServerStageFail failAtStage;
-
-        /** Constructor. */
-        private FailingRecoveryServerHandshakeManager(
-                UUID launchId,
-                String consistentId,
-                ServerStageFail failAtStage,
-                NetworkMessagesFactory messageFactory
-        ) {
-            super(launchId, consistentId, messageFactory);
-            this.failAtStage = failAtStage;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public HandshakeResult init(Channel channel) {
-            if (failAtStage == SERVER_INIT) {
-                handshakeFuture().completeExceptionally(new RuntimeException());
-                return HandshakeResult.fail();
-            }
-
-            return super.init(channel);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public HandshakeResult onConnectionOpen(Channel channel) {
-            if (failAtStage == SERVER_CONNECTION_OPENED) {
-                handshakeFuture().completeExceptionally(new RuntimeException());
-                return HandshakeResult.fail();
-            }
-
-            return super.onConnectionOpen(channel);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
-            if (failAtStage == SERVER_CLIENT_RESPONDED) {
-                handshakeFuture().completeExceptionally(new RuntimeException());
-                return HandshakeResult.fail();
-            }
-
-            return super.onMessage(channel, message);
-        }
-
-        /** Server handshake stage to fail at. */
-        enum ServerStageFail {
-            /** Don't fail at all. */
-            SERVER_DOESNT_FAIL,
-
-            /** Fail on init. */
-            SERVER_INIT,
-
-            /** Fail on connection open. */
-            SERVER_CONNECTION_OPENED,
-
-            /** Fail on client response. */
-            SERVER_CLIENT_RESPONDED
-        }
-    }
-
-    /**
-     * {@link RecoveryClientHandshakeManager} that can fail at specific stage of the handshake.
-     */
-    static class FailingRecoveryClientHandshakeManager extends RecoveryClientHandshakeManager {
-        /**
-         * At what stage to fail the handshake.
-         */
-        private final ClientStageFail failAtStage;
-
-        /** Constructor. */
-        private FailingRecoveryClientHandshakeManager(
-                UUID launchId,
-                String consistentId,
-                ClientStageFail failAtStage,
-                NetworkMessagesFactory messageFactory
-        ) {
-            super(launchId, consistentId, messageFactory);
-            this.failAtStage = failAtStage;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public HandshakeResult init(Channel channel) {
-            if (failAtStage == CLIENT_INIT) {
-                handshakeFuture().completeExceptionally(new RuntimeException());
-                return HandshakeResult.fail();
-            }
-
-            return super.init(channel);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public HandshakeResult onConnectionOpen(Channel channel) {
-            if (failAtStage == CLIENT_CONNECTION_OPENED) {
-                handshakeFuture().completeExceptionally(new RuntimeException());
-                return HandshakeResult.fail();
-            }
-
-            return super.onConnectionOpen(channel);
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
-            if (failAtStage == CLIENT_SERVER_RESPONDED) {
-                handshakeFuture().completeExceptionally(new RuntimeException());
-                return HandshakeResult.fail();
-            }
-
-            return super.onMessage(channel, message);
-        }
-
-        /** Client handshake stage to fail at. */
-        enum ClientStageFail {
-            /** Don't fail at all. */
-            CLIENT_DOESNT_FAIL,
-
-            /** Fail on init. */
-            CLIENT_INIT,
-
-            /** Fail on connection open. */
-            CLIENT_CONNECTION_OPENED,
-
-            /** Fail on server response. */
-            CLIENT_SERVER_RESPONDED
-        }
-    }
-
-    /**
-     * Starts a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
-     *
-     * @param port                  Port for the server.
-     * @param serverHandshakeFailAt At what stage to fail server handshake.
-     * @param clientHandshakeFailAt At what stage to fail client handshake.
-     * @return Connection manager.
-     */
-    private ConnectionManager startManager(
-            int port,
-            ServerStageFail serverHandshakeFailAt,
-            ClientStageFail clientHandshakeFailAt
-    ) {
-        var registry = new TestMessageSerializationRegistryImpl();
-        var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
-
-        var messageFactory = new NetworkMessagesFactory();
-
-        UUID launchId = UUID.randomUUID();
-        String consistentId = UUID.randomUUID().toString();
-
-        networkConfiguration.port().update(port).join();
-
-        NetworkView cfg = networkConfiguration.value();
-
-        NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
-        bootstrapFactory.start();
-        startedBootstrapFactories.add(bootstrapFactory);
-
-        var manager = new ConnectionManager(
-                cfg,
-                serializationService,
-                consistentId,
-                () -> new FailingRecoveryServerHandshakeManager(launchId, consistentId, serverHandshakeFailAt, messageFactory),
-                () -> new FailingRecoveryClientHandshakeManager(launchId, consistentId, clientHandshakeFailAt, messageFactory),
-                bootstrapFactory
-        );
-
-        manager.start();
-
-        startedManagers.add(manager);
-
-        return manager;
-    }
-
-    /**
-     * Starts a {@link ConnectionManager} with a normal handshake manager adding it to the {@link #startedManagers} list.
-     *
-     * @param port Port.
-     * @return Connection manager
-     */
-    private ConnectionManager startManager(int port) {
-        var registry = new TestMessageSerializationRegistryImpl();
-        var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
-
-        var messageFactory = new NetworkMessagesFactory();
-
-        UUID launchId = UUID.randomUUID();
-        String consistentId = UUID.randomUUID().toString();
-
-        networkConfiguration.port().update(port).join();
-
-        NetworkView cfg = networkConfiguration.value();
-
-        NettyBootstrapFactory bootstrapFactory = new NettyBootstrapFactory(networkConfiguration, consistentId);
-        bootstrapFactory.start();
-        startedBootstrapFactories.add(bootstrapFactory);
-
-        var manager = new ConnectionManager(
-                cfg,
-                serializationService,
-                consistentId,
-                () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
-                () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
-                bootstrapFactory
-        );
-
-        manager.start();
-
-        startedManagers.add(manager);
-
-        return manager;
-    }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
index ee0df8b92..ac99b87bd 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
@@ -23,6 +23,8 @@ import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
 import org.apache.ignite.internal.network.message.InvokeRequest;
 import org.apache.ignite.internal.network.message.InvokeResponse;
 import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
 import org.apache.ignite.network.annotations.MessageGroup;
@@ -57,18 +59,28 @@ public class NetworkMessageTypes {
      */
     public static final short HANDSHAKE_START_RESPONSE = 4;
 
+    /**
+     * Type for {@link HandshakeFinishMessage}.
+     */
+    public static final short HANDSHAKE_FINISH = 5;
+
+    /**
+     * Type for {@link AcknowledgementMessage}.
+     */
+    public static final short ACKNOWLEDGEMENT = 6;
+
     /**
      * Type for {@link ClassDescriptorMessage}.
      */
-    public static final short CLASS_DESCRIPTOR_MESSAGE = 5;
+    public static final short CLASS_DESCRIPTOR_MESSAGE = 7;
 
     /**
      * Type for {@link FieldDescriptorMessage}.
      */
-    public static final short FIELD_DESCRIPTOR_MESSAGE = 6;
+    public static final short FIELD_DESCRIPTOR_MESSAGE = 8;
 
     /**
      * Type for {@link ClassDescriptorListMessage}.
      */
-    public static final short CLASS_DESCRIPTOR_LIST_MESSAGE = 7;
+    public static final short CLASS_DESCRIPTOR_LIST_MESSAGE = 9;
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
index c7d039788..e9de9e6d0 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java
@@ -17,9 +17,8 @@
 
 package org.apache.ignite.internal.network.handshake;
 
-import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.internal.network.netty.HandshakeHandler;
 import org.apache.ignite.internal.network.netty.NettySender;
 import org.apache.ignite.network.NetworkMessage;
 
@@ -28,29 +27,25 @@ import org.apache.ignite.network.NetworkMessage;
  */
 public interface HandshakeManager {
     /**
-     * Initialize handshake manager with the channel.
+     * Initializes the handshake manager.
      *
-     * @param channel Channel.
-     * @return Action to perform by {@link HandshakeHandler}.
+     * @param handlerContext Channel handler context.
      */
-    HandshakeResult init(Channel channel);
+    void onInit(ChannelHandlerContext handlerContext);
 
     /**
-     * Handle an event of the connection opening.
-     *
-     * @param channel Channel.
-     * @return Action to perform by {@link HandshakeHandler}.
+     * Handles an event of the connection opening.
      */
-    HandshakeResult onConnectionOpen(Channel channel);
+    default void onConnectionOpen() {
+        // No-op.
+    }
 
     /**
-     * Handle an incoming message.
+     * Handles an incoming message.
      *
-     * @param channel Channel.
      * @param message Message to handle.
-     * @return Action to perform by {@link HandshakeHandler}.
      */
-    HandshakeResult onMessage(Channel channel, NetworkMessage message);
+    void onMessage(NetworkMessage message);
 
     /**
      * Returns future that represents the handshake operation.
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index 7c1ae03ed..82f40ab40 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -18,25 +18,25 @@
 package org.apache.ignite.internal.network.netty;
 
 import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import java.net.SocketAddress;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 import java.util.stream.Stream;
 import org.apache.ignite.configuration.schemas.network.NetworkView;
-import org.apache.ignite.configuration.schemas.network.OutboundView;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
@@ -48,6 +48,9 @@ import org.jetbrains.annotations.TestOnly;
  * Class that manages connections both incoming and outgoing.
  */
 public class ConnectionManager {
+    /** Message factory. */
+    private static final NetworkMessagesFactory FACTORY = new NetworkMessagesFactory();
+
     /** Logger. */
     private static final IgniteLogger LOG = IgniteLogger.forClass(ConnectionManager.class);
 
@@ -60,6 +63,7 @@ public class ConnectionManager {
     /** Server. */
     private final NettyServer server;
 
+    // TODO: IGNITE-16948 Should be a map consistentId -> connectionId -> sender
     /** Channels map from consistentId to {@link NettySender}. */
     private final Map<String, NettySender> channels = new ConcurrentHashMap<>();
 
@@ -75,8 +79,8 @@ public class ConnectionManager {
     /** Node consistent id. */
     private final String consistentId;
 
-    /** Client handshake manager factory. */
-    private final Supplier<HandshakeManager> clientHandshakeManagerFactory;
+    /** Node launch id. As opposed to {@link #consistentId}, this identifier changes between restarts. */
+    private final UUID launchId;
 
     /** Start flag. */
     private final AtomicBoolean started = new AtomicBoolean(false);
@@ -84,31 +88,32 @@ public class ConnectionManager {
     /** Stop flag. */
     private final AtomicBoolean stopped = new AtomicBoolean(false);
 
+    /** Recovery descriptor provider. */
+    private final RecoveryDescriptorProvider descriptorProvider = new DefaultRecoveryDescriptorProvider();
+
     /**
      * Constructor.
      *
      * @param networkConfiguration          Network configuration.
      * @param serializationService          Serialization service.
+     * @param launchId                      Launch id of this node.
      * @param consistentId                  Consistent id of this node.
-     * @param serverHandshakeManagerFactory Server handshake manager factory.
-     * @param clientHandshakeManagerFactory Client handshake manager factory.
      * @param bootstrapFactory              Bootstrap factory.
      */
     public ConnectionManager(
             NetworkView networkConfiguration,
             SerializationService serializationService,
+            UUID launchId,
             String consistentId,
-            Supplier<HandshakeManager> serverHandshakeManagerFactory,
-            Supplier<HandshakeManager> clientHandshakeManagerFactory,
             NettyBootstrapFactory bootstrapFactory
     ) {
         this.serializationService = serializationService;
+        this.launchId = launchId;
         this.consistentId = consistentId;
-        this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
 
         this.server = new NettyServer(
                 networkConfiguration,
-                serverHandshakeManagerFactory,
+                this::createServerHandshakeManager,
                 this::onNewIncomingChannel,
                 this::onMessage,
                 serializationService,
@@ -137,7 +142,7 @@ public class ConnectionManager {
 
             server.start().get();
 
-            LOG.info("Connection created [address=" + server.address() + ']');
+            LOG.info("Server started [address=" + server.address() + ']');
         } catch (ExecutionException e) {
             Throwable cause = e.getCause();
             throw new IgniteInternalException("Failed to start the connection manager: " + cause.getMessage(), cause);
@@ -180,10 +185,11 @@ public class ConnectionManager {
 
         // Get an existing client or create a new one. NettyClient provides a CompletableFuture that resolves
         // when the client is ready for write operations, so previously started client, that didn't establish connection
-        // or didn't perform the handhsake operaton, can be reused.
+        // or didn't perform the handshake operation, can be reused.
+        // TODO: IGNITE-16948 Connection id may be different from 0
         NettyClient client = clients.compute(address, (addr, existingClient) ->
                 existingClient != null && !existingClient.failedToConnect() && !existingClient.isDisconnected()
-                        ? existingClient : connect(addr)
+                        ? existingClient : connect(addr, (short) 0)
         );
 
         CompletableFuture<NettySender> sender = client.sender();
@@ -221,11 +227,11 @@ public class ConnectionManager {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {
         var client = new NettyClient(
                 address,
                 serializationService,
-                clientHandshakeManagerFactory.get(),
+                createClientHandshakeManager(connectionId),
                 this::onMessage
         );
 
@@ -284,6 +290,14 @@ public class ConnectionManager {
         return stopped.get();
     }
 
+    private HandshakeManager createClientHandshakeManager(short connectionId) {
+        return new RecoveryClientHandshakeManager(launchId, consistentId, connectionId, FACTORY, descriptorProvider);
+    }
+
+    private HandshakeManager createServerHandshakeManager() {
+        return new RecoveryServerHandshakeManager(launchId, consistentId, FACTORY, descriptorProvider);
+    }
+
     /**
      * Returns connection manager's {@link #server}.
      *
@@ -314,7 +328,6 @@ public class ConnectionManager {
         return Collections.unmodifiableCollection(clients.values());
     }
 
-
     /**
      * Returns map of the channels.
      *
@@ -324,27 +337,4 @@ public class ConnectionManager {
     public Map<String, NettySender> channels() {
         return Collections.unmodifiableMap(channels);
     }
-
-    /**
-     * Creates a {@link Bootstrap} for clients with channel options provided by a {@link OutboundView}.
-     *
-     * @param eventLoopGroup      Event loop group for channel handling.
-     * @param clientConfiguration Client configuration.
-     * @return Bootstrap for clients.
-     */
-    public static Bootstrap createClientBootstrap(
-            EventLoopGroup eventLoopGroup,
-            OutboundView clientConfiguration
-    ) {
-        Bootstrap clientBootstrap = new Bootstrap();
-
-        clientBootstrap.group(eventLoopGroup)
-                .channel(NioSocketChannel.class)
-                // See NettyServer#start for netty configuration details.
-                .option(ChannelOption.SO_KEEPALIVE, clientConfiguration.soKeepAlive())
-                .option(ChannelOption.SO_LINGER, clientConfiguration.soLinger())
-                .option(ChannelOption.TCP_NODELAY, clientConfiguration.tcpNoDelay());
-
-        return clientBootstrap;
-    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
new file mode 100644
index 000000000..3e900af6c
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Default implementation of the {@link RecoveryDescriptorProvider}.
+ */
+public class DefaultRecoveryDescriptorProvider implements RecoveryDescriptorProvider {
+    // TODO: IGNITE-16954 Make this configurable
+    private static final int DEFAULT_QUEUE_LIMIT = 10;
+
+    /** Recovery descriptors. */
+    private final Map<ChannelKey, RecoveryDescriptor> recoveryDescriptors = new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override
+    public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID launchId, short connectionIndex, boolean inbound) {
+        var key = new ChannelKey(consistentId, launchId, connectionIndex, inbound);
+
+        return recoveryDescriptors.computeIfAbsent(key, channelKey -> new RecoveryDescriptor(DEFAULT_QUEUE_LIMIT));
+    }
+
+    /** Channel key. */
+    private static class ChannelKey {
+        /** Remote node's consistent id. */
+        private final String consistentId;
+
+        /** Remote node's launch id. */
+        private final UUID launchId;
+
+        /**
+         * Connection id. Every connection between this node and a remote node has a unique connection id,
+         * but connections with different nodes may have the same ids.
+         */
+        private final short connectionId;
+
+        /** {@code true} if channel is inbound, {@code false} otherwise. */
+        private final boolean inbound;
+
+        private ChannelKey(String consistentId, UUID launchId, short connectionId, boolean inbound) {
+            this.consistentId = consistentId;
+            this.launchId = launchId;
+            this.connectionId = connectionId;
+            this.inbound = inbound;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            ChannelKey that = (ChannelKey) o;
+
+            if (connectionId != that.connectionId) {
+                return false;
+            }
+            if (inbound != that.inbound) {
+                return false;
+            }
+            if (!consistentId.equals(that.consistentId)) {
+                return false;
+            }
+            return launchId.equals(that.launchId);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int hashCode() {
+            int result = consistentId.hashCode();
+            result = 31 * result + launchId.hashCode();
+            result = 31 * result + (int) connectionId;
+            result = 31 * result + (inbound ? 1 : 0);
+            return result;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public String toString() {
+            return S.toString(ChannelKey.class, this);
+        }
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
index 61088d049..67fb032ed 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
@@ -19,11 +19,10 @@ package org.apache.ignite.internal.network.netty;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import java.util.function.Function;
-import org.apache.ignite.internal.network.handshake.HandshakeAction;
+import java.util.function.Consumer;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NetworkMessage;
 
@@ -31,37 +30,43 @@ import org.apache.ignite.network.NetworkMessage;
  * Netty handler of the handshake operation.
  */
 public class HandshakeHandler extends ChannelInboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "handshake-handler";
+
     /** Logger. */
     private static final IgniteLogger LOG = IgniteLogger.forClass(HandshakeHandler.class);
 
     /** Handshake manager. */
     private final HandshakeManager manager;
 
-    /** Function that creates a handler that substitutes this handshake handler after the handshake is done. */
-    private final Function<String, ChannelInboundHandlerAdapter> newHandlerCreator;
+    private final Consumer<InNetworkObject> messageListener;
+
+    private final PerSessionSerializationService serializationService;
 
     /**
      * Constructor.
      *
      * @param manager Handshake manager.
+     * @param messageListener Message listener.
+     * @param serializationService Serialization service.
      */
-    public HandshakeHandler(HandshakeManager manager, Function<String, ChannelInboundHandlerAdapter> newHandlerCreator) {
+    public HandshakeHandler(HandshakeManager manager, Consumer<InNetworkObject> messageListener,
+            PerSessionSerializationService serializationService) {
         this.manager = manager;
-        this.newHandlerCreator = newHandlerCreator;
+        this.messageListener = messageListener;
+        this.serializationService = serializationService;
     }
 
     /** {@inheritDoc} */
     @Override
     public void handlerAdded(ChannelHandlerContext ctx) {
-        HandshakeResult handshakeResult = manager.init(ctx.channel());
-
-        handleHandshakeAction(handshakeResult, ctx);
+        manager.onInit(ctx);
     }
 
     /** {@inheritDoc} */
     @Override
     public void channelActive(ChannelHandlerContext ctx) {
-        HandshakeResult handshakeResult = manager.onConnectionOpen(ctx.channel());
+        manager.onConnectionOpen();
 
         manager.handshakeFuture().whenComplete((unused, throwable) -> {
             if (throwable != null) {
@@ -71,18 +76,13 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter {
             }
         });
 
-        handleHandshakeAction(handshakeResult, ctx);
-
         ctx.fireChannelActive();
     }
 
     /** {@inheritDoc} */
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        HandshakeResult handshakeResult = manager.onMessage(ctx.channel(), (NetworkMessage) msg);
-
-        handleHandshakeAction(handshakeResult, ctx);
-        // No need to forward the message to the next handler as this message only matters for a handshake.
+        manager.onMessage((NetworkMessage) msg);
     }
 
     /** {@inheritDoc} */
@@ -104,29 +104,12 @@ public class HandshakeHandler extends ChannelInboundHandlerAdapter {
     }
 
     /**
-     * Handle {@link HandshakeAction}.
+     * Creates a {@link MessageHandler} for the current pipeline.
      *
-     * @param result Handshake result.
-     * @param ctx    Netty channel context.
+     * @param remoteConsistentId Remote node's consistent id.
+     * @return Message handler.
      */
-    private void handleHandshakeAction(HandshakeResult result, ChannelHandlerContext ctx) {
-        HandshakeAction action = result.action();
-        switch (action) {
-            case REMOVE_HANDLER:
-                ChannelInboundHandlerAdapter newHandler = newHandlerCreator.apply(result.consistentId());
-                String handlerName = newHandler.getClass().getName();
-                ctx.pipeline().replace(this, handlerName, newHandler);
-                break;
-
-            case FAIL:
-                ctx.channel().close();
-                break;
-
-            case NOOP:
-                break;
-
-            default:
-                break;
-        }
+    public MessageHandler createMessageHandler(String remoteConsistentId) {
+        return new MessageHandler(messageListener, remoteConsistentId, serializationService);
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
index cd1bc2b5d..b609f1fcb 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
@@ -37,6 +37,9 @@ import org.apache.ignite.network.serialization.MessageReader;
  * Decodes {@link ByteBuf}s into {@link NetworkMessage}s.
  */
 public class InboundDecoder extends ByteToMessageDecoder {
+    /** Handler name. */
+    public static final String NAME = "inbound-decoder";
+
     /** Logger. */
     private static final IgniteLogger LOG = IgniteLogger.forClass(InboundDecoder.class);
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java
new file mode 100644
index 000000000..43b6685bb
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.util.Collections;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Inbound handler that handles incoming acknowledgement messages and sends acknowledgement messages for other messages.
+ */
+public class InboundRecoveryHandler extends ChannelInboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "inbound-recovery-handler";
+
+    /** Recovery descriptor. */
+    private final RecoveryDescriptor descriptor;
+
+    /** Messages factory. */
+    private final NetworkMessagesFactory factory;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Recovery descriptor.
+     * @param factory Message factory.
+     */
+    public InboundRecoveryHandler(RecoveryDescriptor descriptor, NetworkMessagesFactory factory) {
+        this.descriptor = descriptor;
+        this.factory = factory;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        NetworkMessage message = (NetworkMessage) msg;
+
+        if (message instanceof AcknowledgementMessage) {
+            AcknowledgementMessage ackMessage = (AcknowledgementMessage) msg;
+            long receivedMessages = ackMessage.receivedMessages();
+
+            descriptor.acknowledge(receivedMessages);
+        } else if (message.needAck()) {
+            AcknowledgementMessage ackMsg = factory.acknowledgementMessage()
+                    .receivedMessages(descriptor.onReceive()).build();
+
+            ctx.channel().writeAndFlush(new OutNetworkObject(ackMsg, Collections.emptyList(), false));
+        }
+
+        super.channelRead(ctx, message);
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/IoExceptionSuppressingHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/IoExceptionSuppressingHandler.java
index 5cbe57e6a..0ebd8ea50 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/IoExceptionSuppressingHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/IoExceptionSuppressingHandler.java
@@ -26,6 +26,9 @@ import org.apache.ignite.lang.IgniteLogger;
  * Netty handler for suppressing IO exceptions that can happen if a remote peer abruptly closes the connection.
  */
 class IoExceptionSuppressingHandler extends ChannelInboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "io-exception-suppressing-handler";
+
     /** Logger. */
     private static final IgniteLogger LOG = IgniteLogger.forClass(IoExceptionSuppressingHandler.class);
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
index 0cc13f7a1..164c0b475 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.network.netty;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.util.function.Consumer;
+import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
 import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.network.NetworkMessage;
 
@@ -27,6 +28,9 @@ import org.apache.ignite.network.NetworkMessage;
  * Network message handler that delegates handling to {@link #messageListener}.
  */
 public class MessageHandler extends ChannelInboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "message-handler";
+
     /** Message listener. */
     private final Consumer<InNetworkObject> messageListener;
 
@@ -54,6 +58,10 @@ public class MessageHandler extends ChannelInboundHandlerAdapter {
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
         NetworkMessage message = (NetworkMessage) msg;
 
+        if (message instanceof AcknowledgementMessage) {
+            return;
+        }
+
         messageListener.accept(new InNetworkObject(message, consistentId, serializationService.compositeDescriptorRegistry()));
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
index 7983e4001..fb2698de8 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
@@ -21,7 +21,6 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.stream.ChunkedWriteHandler;
 import java.net.SocketAddress;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -110,15 +109,7 @@ public class NettyClient {
                 public void initChannel(SocketChannel ch) {
                     var sessionSerializationService = new PerSessionSerializationService(serializationService);
 
-                    ch.pipeline().addLast(
-                            new InboundDecoder(sessionSerializationService),
-                            new HandshakeHandler(handshakeManager,
-                                    (consistentId) -> new MessageHandler(messageListener, consistentId, sessionSerializationService)
-                            ),
-                            new ChunkedWriteHandler(),
-                            new OutboundEncoder(sessionSerializationService),
-                            new IoExceptionSuppressingHandler()
-                    );
+                    PipelineUtils.setup(ch.pipeline(), sessionSerializationService, handshakeManager, messageListener);
                 }
             });
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index 7050d8e7b..1a2b8a0cd 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -23,7 +23,6 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ServerChannel;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.stream.ChunkedWriteHandler;
 import java.net.SocketAddress;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -130,25 +129,7 @@ public class NettyServer {
                             // Get handshake manager for the new channel.
                             HandshakeManager manager = handshakeManager.get();
 
-                            ch.pipeline().addLast(
-                                    /*
-                                     * Decoder that uses the MessageReader
-                                     * to read chunked data.
-                                     */
-                                    new InboundDecoder(sessionSerializationService),
-                                    // Handshake handler.
-                                    new HandshakeHandler(manager,
-                                            (consistentId) -> new MessageHandler(messageListener, consistentId, sessionSerializationService)
-                                    ),
-                                    /*
-                                     * Encoder that uses the MessageWriter
-                                     * to write chunked data.
-                                     */
-                                    new ChunkedWriteHandler(),
-                                    // Converts NetworkMessage to a ChunkedNetworkMessageInput
-                                    new OutboundEncoder(sessionSerializationService),
-                                    new IoExceptionSuppressingHandler()
-                            );
+                            PipelineUtils.setup(ch.pipeline(), sessionSerializationService, manager, messageListener);
 
                             manager.handshakeFuture().thenAccept(newConnectionListener);
                         }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index f31b4d754..137d6dc6d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -38,6 +38,9 @@ import org.apache.ignite.network.serialization.MessageSerializer;
  * An encoder for the outbound messages that uses {@link DirectMessageWriter}.
  */
 public class OutboundEncoder extends MessageToMessageEncoder<OutNetworkObject> {
+    /** Handler name. */
+    public static final String NAME = "outbound-encoder";
+
     private static final NetworkMessagesFactory MSG_FACTORY = new NetworkMessagesFactory();
 
     /** Serialization registry. */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundRecoveryHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundRecoveryHandler.java
new file mode 100644
index 000000000..7a4037863
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundRecoveryHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.network.OutNetworkObject;
+
+/** Outbound handler that adds outgoing message to the recovery descriptor. */
+public class OutboundRecoveryHandler extends ChannelOutboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "outbound-recovery-handler";
+
+    /** Recovery descriptor. */
+    private final RecoveryDescriptor descriptor;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Recovery descriptor.
+     */
+    public OutboundRecoveryHandler(RecoveryDescriptor descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+        OutNetworkObject outNetworkObject = (OutNetworkObject) msg;
+
+        if (outNetworkObject.shouldBeSavedForRecovery()) {
+            descriptor.add(outNetworkObject);
+        }
+
+        super.write(ctx, msg, promise);
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java
new file mode 100644
index 000000000..6c2ba6799
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+
+/** Pipeline utils. */
+public class PipelineUtils {
+    /** {@link ChunkedWriteHandler}'s name. */
+    private static final String CHUNKED_WRITE_HANDLER_NAME = "chunked-write-handler";
+
+    /**
+     * Sets up initial pipeline.
+     *
+     * @param pipeline Channel pipeline.
+     * @param serializationService Serialization service.
+     * @param handshakeManager Handshake manager.
+     * @param messageListener Message listener.
+     */
+    public static void setup(ChannelPipeline pipeline, PerSessionSerializationService serializationService,
+                HandshakeManager handshakeManager, Consumer<InNetworkObject> messageListener) {
+        pipeline.addLast(InboundDecoder.NAME, new InboundDecoder(serializationService));
+        pipeline.addLast(HandshakeHandler.NAME, new HandshakeHandler(handshakeManager, messageListener, serializationService));
+        pipeline.addLast(CHUNKED_WRITE_HANDLER_NAME, new ChunkedWriteHandler());
+        pipeline.addLast(OutboundEncoder.NAME, new OutboundEncoder(serializationService));
+        pipeline.addLast(IoExceptionSuppressingHandler.NAME, new IoExceptionSuppressingHandler());
+    }
+
+    /**
+     * Changes pipeline after the handshake.
+     *
+     * @param pipeline Pipeline.
+     * @param descriptor Recovery descriptor.
+     * @param messageHandler Message handler.
+     * @param factory Message factory.
+     */
+    public static void afterHandshake(
+            ChannelPipeline pipeline,
+            RecoveryDescriptor descriptor,
+            MessageHandler messageHandler,
+            NetworkMessagesFactory factory
+    ) {
+        pipeline.addAfter(OutboundEncoder.NAME, OutboundRecoveryHandler.NAME, new OutboundRecoveryHandler(descriptor));
+        pipeline.addBefore(HandshakeHandler.NAME, InboundRecoveryHandler.NAME, new InboundRecoveryHandler(descriptor, factory));
+        pipeline.addAfter(HandshakeHandler.NAME, MessageHandler.NAME, messageHandler);
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 2a5cf7a48..c4d030bb7 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -19,19 +19,25 @@ package org.apache.ignite.internal.network.recovery;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
 import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
+import org.apache.ignite.internal.network.netty.HandshakeHandler;
+import org.apache.ignite.internal.network.netty.MessageHandler;
 import org.apache.ignite.internal.network.netty.NettySender;
 import org.apache.ignite.internal.network.netty.NettyUtils;
+import org.apache.ignite.internal.network.netty.PipelineUtils;
+import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.OutNetworkObject;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Recovery protocol handshake manager for a client.
@@ -43,63 +49,113 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
     /** Consistent id. */
     private final String consistentId;
 
+    /** Message factory. */
+    private final NetworkMessagesFactory messageFactory;
+
+    /** Recovery descriptor provider. */
+    private final RecoveryDescriptorProvider recoveryDescriptorProvider;
+
+    /** Connection id. */
+    private final short connectionId;
+
     /** Handshake completion future. */
     private final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>();
 
-    /** Message factory. */
-    private final NetworkMessagesFactory messageFactory;
+    /** Remote node's launch id. */
+    private UUID remoteLaunchId;
+
+    /** Remote node's consistent id. */
+    private String remoteConsistentId;
+
+    /** Netty pipeline channel handler context. */
+    private ChannelHandlerContext ctx;
+
+    /** Channel. */
+    private Channel channel;
+
+    /** Netty pipeline handshake handler. */
+    private HandshakeHandler handler;
+
+    /** Recovery descriptor. */
+    private RecoveryDescriptor recoveryDescriptor;
 
     /**
      * Constructor.
      *
-     * @param launchId       Launch id.
-     * @param consistentId   Consistent id.
+     * @param launchId Launch id.
+     * @param consistentId Consistent id.
      * @param messageFactory Message factory.
+     * @param recoveryDescriptorProvider Recovery descriptor provider.
      */
     public RecoveryClientHandshakeManager(
-            UUID launchId, String consistentId, NetworkMessagesFactory messageFactory
-    ) {
+            UUID launchId, String consistentId, short connectionId, NetworkMessagesFactory messageFactory,
+            RecoveryDescriptorProvider recoveryDescriptorProvider) {
         this.launchId = launchId;
         this.consistentId = consistentId;
+        this.connectionId = connectionId;
         this.messageFactory = messageFactory;
+        this.recoveryDescriptorProvider = recoveryDescriptorProvider;
     }
 
     /** {@inheritDoc} */
     @Override
-    public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+    public void onInit(ChannelHandlerContext handlerContext) {
+        this.ctx = handlerContext;
+        this.channel = handlerContext.channel();
+        this.handler = (HandshakeHandler) ctx.handler();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void onMessage(NetworkMessage message) {
         if (message instanceof HandshakeStartMessage) {
             HandshakeStartMessage msg = (HandshakeStartMessage) message;
 
-            UUID remoteLaunchId = msg.launchId();
-            String remoteConsistentId = msg.consistentId();
+            this.remoteLaunchId = msg.launchId();
+            this.remoteConsistentId = msg.consistentId();
 
-            HandshakeStartResponseMessage response = messageFactory.handshakeStartResponseMessage()
-                    .launchId(launchId)
-                    .consistentId(consistentId)
-                    .receivedCount(0)
-                    .connectionsCount(0)
-                    .build();
+            this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, remoteLaunchId, connectionId,
+                    false);
 
-            ChannelFuture sendFuture = channel.writeAndFlush(new OutNetworkObject(response, Collections.emptyList()));
+            handshake(recoveryDescriptor);
 
-            NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
-                if (throwable != null) {
-                    handshakeCompleteFuture.completeExceptionally(
-                        new HandshakeException("Failed to send handshake response: " + throwable.getMessage(), throwable)
-                    );
-                } else {
-                    handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
-                }
-            });
+            return;
+        }
+
+        assert recoveryDescriptor != null : "Wrong client handshake flow";
+
+        if (message instanceof HandshakeFinishMessage) {
+            HandshakeFinishMessage msg = (HandshakeFinishMessage) message;
+            long receivedCount = msg.receivedCount();
+
+            recoveryDescriptor.acknowledge(receivedCount);
+
+            int cnt = recoveryDescriptor.unacknowledgedCount();
+
+            if (cnt == 0) {
+                finishHandshake();
+
+                return;
+            }
 
-            return HandshakeResult.removeHandler(remoteLaunchId, remoteConsistentId);
+            List<OutNetworkObject> networkMessages = recoveryDescriptor.unacknowledgedMessages();
+
+            for (OutNetworkObject networkMessage : networkMessages) {
+                channel.write(networkMessage);
+            }
+
+            channel.flush();
+
+            return;
         }
 
-        handshakeCompleteFuture.completeExceptionally(
-                new HandshakeException("Unexpected message during handshake: " + message.toString())
-        );
+        int cnt = recoveryDescriptor.unacknowledgedCount();
 
-        return HandshakeResult.fail();
+        if (cnt == 0) {
+            finishHandshake();
+        }
+
+        ctx.fireChannelRead(message);
     }
 
     /** {@inheritDoc} */
@@ -108,15 +164,48 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
         return handshakeCompleteFuture;
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public HandshakeResult init(Channel channel) {
-        return HandshakeResult.noOp();
+    private void handshake(RecoveryDescriptor descriptor) {
+        PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, createMessageHandler(), messageFactory);
+
+        HandshakeStartResponseMessage response = messageFactory.handshakeStartResponseMessage()
+                .launchId(launchId)
+                .consistentId(consistentId)
+                .receivedCount(descriptor.receivedCount())
+                .connectionId(connectionId)
+                .build();
+
+        ChannelFuture sendFuture = ctx.channel().writeAndFlush(new OutNetworkObject(response, Collections.emptyList(), false));
+
+        NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
+            if (throwable != null) {
+                handshakeCompleteFuture.completeExceptionally(
+                        new HandshakeException("Failed to send handshake response: " + throwable.getMessage(), throwable)
+                );
+            }
+        });
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public HandshakeResult onConnectionOpen(Channel channel) {
-        return HandshakeResult.noOp();
+    /**
+     * Creates a message handler using the consistent id of a remote node.
+     *
+     * @return New message handler.
+     */
+    private MessageHandler createMessageHandler() {
+        return handler.createMessageHandler(remoteConsistentId);
+    }
+
+    /**
+     * Finishes handshaking process by removing handshake handler from the pipeline and creating a {@link NettySender}.
+     */
+    private void finishHandshake() {
+        // Removes handshake handler from the pipeline as the handshake is finished
+        this.ctx.pipeline().remove(this.handler);
+
+        handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+    }
+
+    @TestOnly
+    public RecoveryDescriptor recoveryDescriptor() {
+        return recoveryDescriptor;
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
new file mode 100644
index 000000000..b8a73a0a8
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Recovery protocol descriptor.
+ */
+public class RecoveryDescriptor {
+    /** Unacknowledged messages. */
+    private final Queue<OutNetworkObject> unacknowledgedMessages;
+
+    /** Count of sent messages. */
+    private long sentCount;
+
+    /** Count of acknowledged sent messages. */
+    private long acknowledgedCount;
+
+    /** Count of received messages. */
+    private long receivedCount;
+
+    /**
+     * Constructor.
+     *
+     * @param queueLimit Maximum size of unacknowledged messages queue.
+     */
+    public RecoveryDescriptor(int queueLimit) {
+        this.unacknowledgedMessages = new ArrayDeque<>(queueLimit);
+    }
+
+    /**
+     * Returns count of received messages.
+     *
+     * @return Count of received messages.
+     */
+    public long receivedCount() {
+        return receivedCount;
+    }
+
+    /**
+     * Acknowledges that sent messages were received by the remote node.
+     *
+     * @param messagesReceivedByRemote Number of all messages received by the remote node.
+     */
+    public void acknowledge(long messagesReceivedByRemote) {
+        while (acknowledgedCount < messagesReceivedByRemote) {
+            OutNetworkObject req = unacknowledgedMessages.poll();
+
+            assert req != null;
+
+            acknowledgedCount++;
+        }
+    }
+
+    /**
+     * Returns the number of the messages unacknowledged by the remote node.
+     *
+     * @return The number of the messages unacknowledged by the remote node.
+     */
+    public int unacknowledgedCount() {
+        long res = sentCount - acknowledgedCount;
+        int size = unacknowledgedMessages.size();
+
+        assert res >= 0;
+        assert res == size;
+
+        return size;
+    }
+
+    /**
+     * Returns unacknowledged messages.
+     *
+     * @return Unacknowledged messages.
+     */
+    public List<OutNetworkObject> unacknowledgedMessages() {
+        return new ArrayList<>(unacknowledgedMessages);
+    }
+
+    /**
+     * Adds a sent message.
+     *
+     * @param msg Message.
+     */
+    public void add(OutNetworkObject msg) {
+        msg.shouldBeSavedForRecovery(false);
+        sentCount++;
+        unacknowledgedMessages.add(msg);
+    }
+
+    /**
+     * Handles the event of receiving a new message.
+     *
+     * @return Number of received messages.
+     */
+    public long onReceive() {
+        receivedCount++;
+
+        return receivedCount;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return S.toString(RecoveryDescriptor.class, this);
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
similarity index 58%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
index fc429d7cc..ff3f20406 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptorProvider.java
@@ -15,29 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.network.recovery.message;
+package org.apache.ignite.internal.network.recovery;
 
 import java.util.UUID;
-import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Handshake start message.
+ * Interface that provides recovery descriptors for channels between two nodes.
  */
-@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends NetworkMessage {
+public interface RecoveryDescriptorProvider {
     /**
-     * Returns launch id.
+     * Returns a {@link RecoveryDescriptor}.
      *
-     * @return Launch id.
+     * @param consistentId Remote node consistent id.
+     * @param launchId Remote node launch id.
+     * @param connectionIndex Connection id.
+     * @param inbound {@code true} if the connection is inbound, {@code false} otherwise.
+     * @return Recovery descriptor.
      */
-    UUID launchId();
-
-    /**
-     * Returns consistent id.
-     *
-     * @return Consistent id.
-     */
-    String consistentId();
+    RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID launchId, short connectionIndex, boolean inbound);
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index cfe546ecb..17c516817 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -19,19 +19,25 @@ package org.apache.ignite.internal.network.recovery;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
 import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.handshake.HandshakeException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
+import org.apache.ignite.internal.network.netty.HandshakeHandler;
+import org.apache.ignite.internal.network.netty.MessageHandler;
 import org.apache.ignite.internal.network.netty.NettySender;
 import org.apache.ignite.internal.network.netty.NettyUtils;
+import org.apache.ignite.internal.network.netty.PipelineUtils;
+import org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.OutNetworkObject;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Recovery protocol handshake manager for a server.
@@ -43,42 +49,70 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
     /** Consistent id. */
     private final String consistentId;
 
+    /** Message factory. */
+    private final NetworkMessagesFactory messageFactory;
+
     /** Handshake completion future. */
     private final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>();
 
-    /** Message factory. */
-    private final NetworkMessagesFactory messageFactory;
+    /** Remote node's launch id. */
+    private UUID remoteLaunchId;
+
+    /** Remote node's consistent id. */
+    private String remoteConsistentId;
+
+    /** Netty pipeline channel handler context. */
+    private ChannelHandlerContext ctx;
+
+    /** Channel. */
+    private Channel channel;
+
+    /** Netty pipeline handshake handler. */
+    private HandshakeHandler handler;
+
+    /** Count of messages received by the remote node. */
+    private long receivedCount;
+
+    /** Recovery descriptor provider. */
+    private final RecoveryDescriptorProvider recoveryDescriptorProvider;
+
+    /** Recovery descriptor. */
+    private RecoveryDescriptor recoveryDescriptor;
 
     /**
      * Constructor.
      *
-     * @param launchId       Launch id.
-     * @param consistentId   Consistent id.
+     * @param launchId Launch id.
+     * @param consistentId Consistent id.
      * @param messageFactory Message factory.
+     * @param recoveryDescriptorProvider Recovery descriptor provider.
      */
     public RecoveryServerHandshakeManager(
-            UUID launchId, String consistentId, NetworkMessagesFactory messageFactory
-    ) {
+            UUID launchId, String consistentId, NetworkMessagesFactory messageFactory,
+            RecoveryDescriptorProvider recoveryDescriptorProvider) {
         this.launchId = launchId;
         this.consistentId = consistentId;
         this.messageFactory = messageFactory;
+        this.recoveryDescriptorProvider = recoveryDescriptorProvider;
     }
 
     /** {@inheritDoc} */
     @Override
-    public HandshakeResult init(Channel channel) {
-        return HandshakeResult.noOp();
+    public void onInit(ChannelHandlerContext handlerContext) {
+        this.ctx = handlerContext;
+        this.channel = handlerContext.channel();
+        this.handler = (HandshakeHandler) ctx.handler();
     }
 
     /** {@inheritDoc} */
     @Override
-    public HandshakeResult onConnectionOpen(Channel channel) {
+    public void onConnectionOpen() {
         HandshakeStartMessage handshakeStartMessage = messageFactory.handshakeStartMessage()
                 .launchId(launchId)
                 .consistentId(consistentId)
                 .build();
 
-        ChannelFuture sendFuture = channel.writeAndFlush(new OutNetworkObject(handshakeStartMessage, Collections.emptyList()));
+        ChannelFuture sendFuture = channel.writeAndFlush(new OutNetworkObject(handshakeStartMessage, Collections.emptyList(), false));
 
         NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
             if (throwable != null) {
@@ -87,29 +121,86 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
                 );
             }
         });
-
-        return HandshakeResult.noOp();
     }
 
     /** {@inheritDoc} */
     @Override
-    public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+    public void onMessage(NetworkMessage message) {
         if (message instanceof HandshakeStartResponseMessage) {
             HandshakeStartResponseMessage msg = (HandshakeStartResponseMessage) message;
 
-            UUID remoteLaunchId = msg.launchId();
-            String remoteConsistentId = msg.consistentId();
+            this.remoteLaunchId = msg.launchId();
+            this.remoteConsistentId = msg.consistentId();
+            this.receivedCount = msg.receivedCount();
 
-            handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+            this.recoveryDescriptor = recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, remoteLaunchId,
+                    msg.connectionId(), true);
 
-            return HandshakeResult.removeHandler(remoteLaunchId, remoteConsistentId);
+            handshake(recoveryDescriptor);
+
+            return;
+        }
+
+        assert recoveryDescriptor != null : "Wrong server handshake flow";
+
+        if (recoveryDescriptor.unacknowledgedCount() == 0) {
+            finishHandshake();
         }
 
-        handshakeCompleteFuture.completeExceptionally(
-                new HandshakeException("Unexpected message during handshake: " + message.toString())
+        ctx.fireChannelRead(message);
+    }
+
+    private void handshake(RecoveryDescriptor descriptor) {
+        PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, createMessageHandler(), messageFactory);
+
+        HandshakeFinishMessage response = messageFactory.handshakeFinishMessage()
+                .receivedCount(descriptor.receivedCount())
+                .build();
+
+        CompletableFuture<Void> sendFuture = NettyUtils.toCompletableFuture(
+                channel.write(new OutNetworkObject(response, Collections.emptyList(), false))
         );
 
-        return HandshakeResult.fail();
+        descriptor.acknowledge(receivedCount);
+
+        int unacknowledgedCount = descriptor.unacknowledgedCount();
+
+        if (unacknowledgedCount > 0) {
+            var futs = new CompletableFuture[unacknowledgedCount + 1];
+            futs[0] = sendFuture;
+
+            List<OutNetworkObject> networkMessages = descriptor.unacknowledgedMessages();
+
+            for (int i = 0; i < networkMessages.size(); i++) {
+                OutNetworkObject networkMessage = networkMessages.get(i);
+                futs[i + 1] = NettyUtils.toCompletableFuture(channel.write(networkMessage));
+            }
+
+            sendFuture = CompletableFuture.allOf(futs);
+        }
+
+        channel.flush();
+
+        boolean hasUnacknowledgedMessages = unacknowledgedCount > 0;
+
+        sendFuture.whenComplete((unused, throwable) -> {
+            if (throwable != null) {
+                handshakeCompleteFuture.completeExceptionally(
+                        new HandshakeException("Failed to send handshake response: " + throwable.getMessage(), throwable)
+                );
+            } else if (!hasUnacknowledgedMessages) {
+                finishHandshake();
+            }
+        });
+    }
+
+    /**
+     * Creates a message handler using the consistent id of a remote node.
+     *
+     * @return New message handler.
+     */
+    private MessageHandler createMessageHandler() {
+        return handler.createMessageHandler(remoteConsistentId);
     }
 
     /** {@inheritDoc} */
@@ -117,4 +208,19 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
     public CompletableFuture<NettySender> handshakeFuture() {
         return handshakeCompleteFuture;
     }
+
+    /**
+     * Finishes handshaking process by removing handshake handler from the pipeline and creating a {@link NettySender}.
+     */
+    private void finishHandshake() {
+        // Removes handshake handler from the pipeline as the handshake is finished
+        this.ctx.pipeline().remove(this.handler);
+
+        handshakeCompleteFuture.complete(new NettySender(channel, remoteLaunchId.toString(), remoteConsistentId));
+    }
+
+    @TestOnly
+    public RecoveryDescriptor recoveryDescriptor() {
+        return recoveryDescriptor;
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/AcknowledgementMessage.java
similarity index 67%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/AcknowledgementMessage.java
index fc429d7cc..091ad0e1f 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/AcknowledgementMessage.java
@@ -17,27 +17,19 @@
 
 package org.apache.ignite.internal.network.recovery.message;
 
-import java.util.UUID;
-import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.network.NetworkMessage;
+import static org.apache.ignite.internal.network.NetworkMessageTypes.ACKNOWLEDGEMENT;
+
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Handshake start message.
+ * Message that holds count of messages received by node.
  */
-@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends NetworkMessage {
-    /**
-     * Returns launch id.
-     *
-     * @return Launch id.
-     */
-    UUID launchId();
-
+@Transferable(ACKNOWLEDGEMENT)
+public interface AcknowledgementMessage extends InternalMessage {
     /**
-     * Returns consistent id.
+     * Returns count of messages received within specific channel between two nodes.
      *
-     * @return Consistent id.
+     * @return count of messages received within specific channel between two nodes.
      */
-    String consistentId();
+    long receivedMessages();
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeFinishMessage.java
similarity index 66%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeFinishMessage.java
index fc429d7cc..f93905ed6 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeFinishMessage.java
@@ -17,27 +17,20 @@
 
 package org.apache.ignite.internal.network.recovery.message;
 
-import java.util.UUID;
-import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.network.NetworkMessage;
+import static org.apache.ignite.internal.network.NetworkMessageTypes.HANDSHAKE_FINISH;
+
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Handshake start message.
+ * Handshake finish message, contains the quantity of the received messages.
+ * This message is sent from a server to a client as a response to the {@link HandshakeStartResponseMessage}.
  */
-@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends NetworkMessage {
-    /**
-     * Returns launch id.
-     *
-     * @return Launch id.
-     */
-    UUID launchId();
-
+@Transferable(HANDSHAKE_FINISH)
+public interface HandshakeFinishMessage extends InternalMessage {
     /**
-     * Returns consistent id.
+     * Returns number of received messages.
      *
-     * @return Consistent id.
+     * @return Number of received messages.
      */
-    String consistentId();
+    long receivedCount();
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
index fc429d7cc..e021516f1 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
@@ -19,14 +19,14 @@ package org.apache.ignite.internal.network.recovery.message;
 
 import java.util.UUID;
 import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Handshake start message.
+ * Handshake start message, contains info about the node.
+ * This message is sent from a server to a client at the connection opening.
  */
 @Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends NetworkMessage {
+public interface HandshakeStartMessage extends InternalMessage {
     /**
      * Returns launch id.
      *
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
index 77e520758..833ec48d4 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartResponseMessage.java
@@ -19,14 +19,14 @@ package org.apache.ignite.internal.network.recovery.message;
 
 import java.util.UUID;
 import org.apache.ignite.internal.network.NetworkMessageTypes;
-import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Handshake start response message.
+ * Handshake start response message, contains info about the node, connection id and the quantity of the received messages.
+ * This message is sent from a client to a server as a response to the {@link HandshakeStartMessage}.
  */
 @Transferable(NetworkMessageTypes.HANDSHAKE_START_RESPONSE)
-public interface HandshakeStartResponseMessage extends NetworkMessage {
+public interface HandshakeStartResponseMessage extends InternalMessage {
     /**
      * Returns launch id.
      *
@@ -49,9 +49,9 @@ public interface HandshakeStartResponseMessage extends NetworkMessage {
     long receivedCount();
 
     /**
-     * Returns connections count.
+     * Returns connection id.
      *
-     * @return Connections count.
+     * @return Connection id.
      */
-    long connectionsCount();
+    short connectionId();
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/InternalMessage.java
similarity index 65%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/InternalMessage.java
index fc429d7cc..2d82a1e8b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeStartMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/InternalMessage.java
@@ -17,27 +17,15 @@
 
 package org.apache.ignite.internal.network.recovery.message;
 
-import java.util.UUID;
-import org.apache.ignite.internal.network.NetworkMessageTypes;
 import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
 
 /**
- * Handshake start message.
+ * Message that does not need an acknowledgement from the remote node.
  */
-@Transferable(NetworkMessageTypes.HANDSHAKE_START)
-public interface HandshakeStartMessage extends NetworkMessage {
-    /**
-     * Returns launch id.
-     *
-     * @return Launch id.
-     */
-    UUID launchId();
-
-    /**
-     * Returns consistent id.
-     *
-     * @return Consistent id.
-     */
-    String consistentId();
+public interface InternalMessage extends NetworkMessage {
+    /** {@inheritDoc} */
+    @Override
+    default boolean needAck() {
+        return false;
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/OutNetworkObject.java b/modules/network/src/main/java/org/apache/ignite/network/OutNetworkObject.java
index f2e9499b7..786a3fe7b 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/OutNetworkObject.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/OutNetworkObject.java
@@ -30,6 +30,13 @@ public class OutNetworkObject {
     /** List of class descriptor messages. */
     private final List<ClassDescriptorMessage> descriptors;
 
+    /**
+     * Flag indicating if this outgoing message should be added to the unacknowledged messages queue of the recovery descriptor.
+     * Acknowledgement message and handshake messages should not have this flag set to {@code true}. After adding the message
+     * to the queue this flag should be set to {@code false}.
+     */
+    private boolean shouldBeSavedForRecovery;
+
     /**
      * Constructor.
      *
@@ -37,8 +44,28 @@ public class OutNetworkObject {
      * @param descriptors Class descriptors.
      */
     public OutNetworkObject(NetworkMessage networkMessage, List<ClassDescriptorMessage> descriptors) {
+        this(networkMessage, descriptors, true);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param networkMessage Network message.
+     * @param descriptors Class descriptors.
+     * @param shouldBeSavedForRecovery See {@link #shouldBeSavedForRecovery}.
+     */
+    public OutNetworkObject(NetworkMessage networkMessage, List<ClassDescriptorMessage> descriptors, boolean shouldBeSavedForRecovery) {
         this.networkMessage = networkMessage;
         this.descriptors = descriptors;
+        this.shouldBeSavedForRecovery = shouldBeSavedForRecovery;
+    }
+
+    public boolean shouldBeSavedForRecovery() {
+        return shouldBeSavedForRecovery;
+    }
+
+    public void shouldBeSavedForRecovery(boolean shouldBeSavedForRecovery) {
+        this.shouldBeSavedForRecovery = shouldBeSavedForRecovery;
     }
 
     public NetworkMessage networkMessage() {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index df5ff973a..7daf90272 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -35,8 +35,6 @@ import org.apache.ignite.configuration.schemas.network.NetworkView;
 import org.apache.ignite.configuration.schemas.network.ScaleCubeView;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
-import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
-import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
 import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
 import org.apache.ignite.internal.network.serialization.SerializationService;
@@ -103,9 +101,8 @@ public class ScaleCubeClusterServiceFactory {
                 connectionMgr = new ConnectionManager(
                         configView,
                         serializationService,
+                        launchId,
                         consistentId,
-                        () -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
-                        () -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
                         nettyBootstrapFactory
                 );
 
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index 54ab77605..95dd327d1 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.embedded.EmbeddedChannel;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -34,7 +35,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.NetworkMessage;
 import org.junit.jupiter.api.AfterEach;
@@ -248,8 +248,8 @@ public class NettyClientTest {
 
         /** {@inheritDoc} */
         @Override
-        public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
-            return HandshakeResult.removeHandler(null, null);
+        public void onMessage(NetworkMessage message) {
+            // No-op.
         }
 
         /** {@inheritDoc} */
@@ -260,14 +260,8 @@ public class NettyClientTest {
 
         /** {@inheritDoc} */
         @Override
-        public HandshakeResult init(Channel channel) {
-            return HandshakeResult.noOp();
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public HandshakeResult onConnectionOpen(Channel channel) {
-            return HandshakeResult.noOp();
+        public void onInit(ChannelHandlerContext ctx) {
+            // No-op.
         }
     }
 }
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index 4e84a8980..f07fb5395 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -42,7 +42,6 @@ import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
 import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
-import org.apache.ignite.internal.network.handshake.HandshakeResult;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -147,10 +146,6 @@ public class NettyServerTest {
         HandshakeManager handshakeManager = mock(HandshakeManager.class);
 
         when(handshakeManager.handshakeFuture()).thenReturn(CompletableFuture.completedFuture(mock(NettySender.class)));
-        HandshakeResult noOp = HandshakeResult.noOp();
-        when(handshakeManager.init(any())).thenReturn(noOp);
-        when(handshakeManager.onConnectionOpen(any())).thenReturn(noOp);
-        when(handshakeManager.onMessage(any(), any())).thenReturn(noOp);
 
         MessageSerializationRegistry registry = mock(MessageSerializationRegistry.class);
 
@@ -220,10 +215,10 @@ public class NettyServerTest {
 
         InOrder order = Mockito.inOrder(handshakeManager);
 
-        order.verify(handshakeManager, timeout()).init(any());
+        order.verify(handshakeManager, timeout()).onInit(any());
         order.verify(handshakeManager, timeout()).handshakeFuture();
-        order.verify(handshakeManager, timeout()).onConnectionOpen(any());
-        order.verify(handshakeManager, timeout()).onMessage(any(), any());
+        order.verify(handshakeManager, timeout()).onConnectionOpen();
+        order.verify(handshakeManager, timeout()).onMessage(any());
     }
 
     /**
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
new file mode 100644
index 000000000..e83384a2f
--- /dev/null
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Recovery protocol handshake flow test.
+ */
+public class RecoveryHandshakeTest {
+    /** Connection id. */
+    private static final short CONNECTION_ID = 1337;
+
+    /** Serialization registry. */
+    private static final MessageSerializationRegistry MESSAGE_REGISTRY = new TestMessageSerializationRegistryImpl();
+
+    /** Message factory. */
+    private static final NetworkMessagesFactory MESSAGE_FACTORY = new NetworkMessagesFactory();
+
+    /** Test message factory. */
+    private static final TestMessagesFactory TEST_MESSAGES_FACTORY = new TestMessagesFactory();
+
+    @Test
+    public void testHandshake() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
+        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(clientSideChannel.readOutbound());
+        assertNull(serverSideChannel.readOutbound());
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testHandshakeWithUnacknowledgedServerMessage() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        UUID clientLaunchId = UUID.randomUUID();
+        RecoveryDescriptor serverRecoveryDescriptor = serverRecovery.getRecoveryDescriptor("client", clientLaunchId, CONNECTION_ID, true);
+        addUnacknowledgedMessages(serverRecoveryDescriptor);
+
+        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager("client", clientLaunchId,
+                clientRecovery);
+        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(serverRecovery);
+
+        var messageCaptor = new AtomicReference<TestMessage>();
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, (inObject) -> {
+            NetworkMessage msg = inObject.message();
+
+            assertInstanceOf(TestMessage.class, msg);
+
+            messageCaptor.set((TestMessage) msg);
+        });
+
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, noMessageListener);
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(clientSideChannel.readOutbound());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        assertNull(serverSideChannel.readOutbound());
+
+        TestMessage ackedMessage = messageCaptor.get();
+        assertNotNull(ackedMessage);
+
+        checkHandshakeNotCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testHandshakeWithUnacknowledgedClientMessage() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        UUID serverLaunchId = UUID.randomUUID();
+        RecoveryDescriptor clientRecoveryDescriptor = clientRecovery.getRecoveryDescriptor("server", serverLaunchId, CONNECTION_ID, false);
+        addUnacknowledgedMessages(clientRecoveryDescriptor);
+
+        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(clientRecovery);
+        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager("server", serverLaunchId,
+                serverRecovery);
+
+        var messageCaptor = new AtomicReference<TestMessage>();
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, noMessageListener);
+
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, (inObject) -> {
+            NetworkMessage msg = inObject.message();
+
+            assertInstanceOf(TestMessage.class, msg);
+
+            messageCaptor.set((TestMessage) msg);
+        });
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(serverSideChannel.readOutbound());
+
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        assertNull(clientSideChannel.readOutbound());
+
+        TestMessage ackedMessage = messageCaptor.get();
+        assertNotNull(ackedMessage);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeNotCompleted(clientHandshakeManager);
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testPairedRecoveryDescriptors() throws Exception {
+        RecoveryDescriptorProvider node1Recovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider node2Recovery = createRecoveryDescriptorProvider();
+
+        UUID node1Uuid = UUID.randomUUID();
+        UUID node2Uuid = UUID.randomUUID();
+
+        RecoveryClientHandshakeManager chm1 = createRecoveryClientHandshakeManager("client", node1Uuid, node1Recovery);
+        RecoveryServerHandshakeManager shm1 = createRecoveryServerHandshakeManager("client", node1Uuid, node1Recovery);
+
+        RecoveryClientHandshakeManager chm2 = createRecoveryClientHandshakeManager("server", node2Uuid, node2Recovery);
+        RecoveryServerHandshakeManager shm2 = createRecoveryServerHandshakeManager("server", node2Uuid, node2Recovery);
+
+        EmbeddedChannel out1to2 = setupChannel(chm1, noMessageListener);
+        EmbeddedChannel in1to2 = setupChannel(shm1, noMessageListener);
+        EmbeddedChannel out2to1 = setupChannel(chm2, noMessageListener);
+        EmbeddedChannel in2to1 = setupChannel(shm2, noMessageListener);
+
+        exchangeServerToClient(in1to2, out2to1);
+        exchangeServerToClient(in2to1, out1to2);
+
+        exchangeClientToServer(in1to2, out2to1);
+        exchangeClientToServer(in2to1, out1to2);
+
+        assertNotSame(chm1.recoveryDescriptor(), shm1.recoveryDescriptor());
+        assertNotSame(chm2.recoveryDescriptor(), shm2.recoveryDescriptor());
+    }
+
+    @Test
+    public void testExactlyOnceServer() throws Exception {
+        testExactlyOnce(true);
+    }
+
+    @Test
+    public void testExactlyOnceClient() throws Exception {
+        testExactlyOnce(false);
+    }
+
+    /**
+     * Tests that message was received exactly once in case if network failure during acknowledgement.
+     *
+     * @param serverDidntReceiveAck {@code true} if server didn't receive the acknowledgement, {@code false} if client didn't receive
+     *                              the acknowledgement.
+     * @throws Exception If failed.
+     */
+    private void testExactlyOnce(boolean serverDidntReceiveAck) throws Exception {
+        var server = "server";
+        UUID serverLaunchId = UUID.randomUUID();
+        var client = "client";
+        UUID clientLaunchId = UUID.randomUUID();
+
+        RecoveryDescriptorProvider clientRecovery = createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = createRecoveryDescriptorProvider();
+
+        RecoveryClientHandshakeManager clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId,
+                clientRecovery);
+        RecoveryServerHandshakeManager serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId,
+                serverRecovery);
+
+        var receivedFirst = new AtomicBoolean();
+
+        var listener1 = new MessageListener("1", receivedFirst);
+
+        EmbeddedChannel clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener1 : noMessageListener);
+        EmbeddedChannel serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ?  noMessageListener : listener1);
+
+        // Normal handshake
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        EmbeddedChannel ch = serverDidntReceiveAck ? serverSideChannel : clientSideChannel;
+
+        // Add two messages to the outbound
+        ch.writeOutbound(new OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("1").build(), Collections.emptyList()));
+        ch.writeOutbound(new OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("2").build(), Collections.emptyList()));
+
+        // Send one of the messages
+        if (serverDidntReceiveAck) {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        }
+
+        // Message should be received
+        assertTrue(receivedFirst.get());
+
+        // Transfer only one acknowledgement, don't transfer the second one (simulates network failure on acknowledgement)
+        if (serverDidntReceiveAck) {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        }
+
+        // Simulate reconnection
+        clientHandshakeManager = createRecoveryClientHandshakeManager(client, clientLaunchId, clientRecovery);
+        serverHandshakeManager = createRecoveryServerHandshakeManager(server, serverLaunchId, serverRecovery);
+
+        var receivedSecond = new AtomicBoolean();
+
+        var listener2 = new MessageListener("2", receivedSecond);
+
+        clientSideChannel = setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener2 : noMessageListener);
+        serverSideChannel = setupChannel(serverHandshakeManager, serverDidntReceiveAck ? noMessageListener : listener2);
+
+        // Handshake
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        // Resending message
+        if (serverDidntReceiveAck) {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        }
+
+        // Send another acknowledgement
+        if (serverDidntReceiveAck) {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        }
+
+        assertNull(serverSideChannel.readOutbound());
+        assertNull(clientSideChannel.readOutbound());
+
+        assertTrue(receivedSecond.get());
+    }
+
+    /** Message listener that accepts a specific message only once. */
+    private static class MessageListener implements Consumer<InNetworkObject> {
+        /** Expected message. */
+        private final String expectedMessage;
+
+        /** Flag indicating that expected messages was received. */
+        private final AtomicBoolean flag;
+
+        private MessageListener(String expectedMessage, AtomicBoolean flag) {
+            this.expectedMessage = expectedMessage;
+            this.flag = flag;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void accept(InNetworkObject inNetworkObject) {
+            TestMessage msg = (TestMessage) inNetworkObject.message();
+            if (expectedMessage.equals(msg.msg())) {
+                if (!flag.compareAndSet(false, true)) {
+                    fail();
+                }
+                return;
+            }
+            fail();
+        }
+    }
+
+    private void checkPipelineAfterHandshake(EmbeddedChannel channel) {
+        assertNull(channel.pipeline().get(HandshakeHandler.NAME));
+    }
+
+    private void checkHandshakeNotCompleted(HandshakeManager manager) {
+        CompletableFuture<NettySender> handshakeFuture = manager.handshakeFuture();
+        assertFalse(handshakeFuture.isDone());
+        assertFalse(handshakeFuture.isCompletedExceptionally());
+        assertFalse(handshakeFuture.isCancelled());
+    }
+
+    private void checkHandshakeCompleted(HandshakeManager manager) {
+        CompletableFuture<NettySender> handshakeFuture = manager.handshakeFuture();
+        assertTrue(handshakeFuture.isDone());
+        assertFalse(handshakeFuture.isCompletedExceptionally());
+        assertFalse(handshakeFuture.isCancelled());
+    }
+
+    private void addUnacknowledgedMessages(RecoveryDescriptor recoveryDescriptor) {
+        TestMessage msg = TEST_MESSAGES_FACTORY.testMessage().msg("test").build();
+        recoveryDescriptor.add(new OutNetworkObject(msg, Collections.emptyList()));
+    }
+
+    private void exchangeServerToClient(EmbeddedChannel serverSideChannel, EmbeddedChannel clientSideChannel) {
+        ByteBuf handshakeStartMessage = serverSideChannel.readOutbound();
+        clientSideChannel.writeInbound(handshakeStartMessage);
+    }
+
+    private void exchangeClientToServer(EmbeddedChannel serverSideChannel, EmbeddedChannel clientSideChannel) {
+        ByteBuf handshakeStartMessage = clientSideChannel.readOutbound();
+        serverSideChannel.writeInbound(handshakeStartMessage);
+    }
+
+    private final Consumer<InNetworkObject> noMessageListener = inNetworkObject ->
+            fail("Received message while shouldn't have, [" + inNetworkObject.message() + "]");
+
+    private EmbeddedChannel setupChannel(HandshakeManager handshakeManager, Consumer<InNetworkObject> messageListener) throws Exception {
+        // Channel should not be registered at first, not before we add pipeline handlers
+        // Otherwise, events like "channel active" won't be propagated to the handlers
+        var channel = new EmbeddedChannel(false, false);
+
+        var serializationService = new SerializationService(MESSAGE_REGISTRY, createUserObjectSerializationContext());
+        var sessionSerializationService = new PerSessionSerializationService(serializationService);
+
+        PipelineUtils.setup(channel.pipeline(), sessionSerializationService, handshakeManager, messageListener);
+
+        channel.register();
+
+        return channel;
+    }
+
+    private UserObjectSerializationContext createUserObjectSerializationContext() {
+        var userObjectDescriptorRegistry = new ClassDescriptorRegistry();
+        var userObjectDescriptorFactory = new ClassDescriptorFactory(userObjectDescriptorRegistry);
+
+        var userObjectMarshaller = new DefaultUserObjectMarshaller(userObjectDescriptorRegistry, userObjectDescriptorFactory);
+
+        return new UserObjectSerializationContext(userObjectDescriptorRegistry, userObjectDescriptorFactory,
+                userObjectMarshaller);
+    }
+
+    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(RecoveryDescriptorProvider provider) {
+        return createRecoveryClientHandshakeManager("client", UUID.randomUUID(), provider);
+    }
+
+    private RecoveryClientHandshakeManager createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
+            RecoveryDescriptorProvider provider) {
+        return new RecoveryClientHandshakeManager(launchId, consistentId, CONNECTION_ID, MESSAGE_FACTORY, provider);
+    }
+
+    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
+        return createRecoveryServerHandshakeManager("server", UUID.randomUUID(), provider);
+    }
+
+    private RecoveryServerHandshakeManager createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
+            RecoveryDescriptorProvider provider) {
+        return new RecoveryServerHandshakeManager(launchId, consistentId, MESSAGE_FACTORY, provider);
+    }
+
+    private RecoveryDescriptorProvider createRecoveryDescriptorProvider() {
+        return new DefaultRecoveryDescriptorProvider();
+    }
+}