You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/05/28 09:26:37 UTC

[ignite-3] branch main updated: IGNITE-14082 Implemented handshake protocol in new netty-based networking implementation. (#135)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new e1d4caf  IGNITE-14082 Implemented handshake protocol in new netty-based networking implementation. (#135)
e1d4caf is described below

commit e1d4caf48ecca2b8fbb71d29e67716500ac4f910
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Fri May 28 12:26:30 2021 +0300

    IGNITE-14082 Implemented handshake protocol in new netty-based networking implementation. (#135)
---
 .../processor/internal/Processor.java              |   2 +-
 .../client/ITMetaStorageServiceTest.java           |  10 +-
 .../processor/ITAutoSerializableProcessorTest.java |   3 +-
 .../internal/AutoSerializableProcessor.java        |   7 +-
 .../ignite/network/AbstractClusterService.java     |   8 +-
 .../ignite/network/ClusterLocalConfiguration.java  |   8 +-
 .../org/apache/ignite/network/ClusterNode.java     |   6 +
 .../org/apache/ignite/network/ClusterService.java  |   6 +-
 .../ignite/network/ClusterServiceFactory.java      |   7 +-
 .../apache/ignite/network/MessagingService.java    |   4 +
 .../ignite/network/TopologyEventHandler.java       |   4 +
 .../org/apache/ignite/network/TopologyService.java |   2 +
 .../network/serialization/MessageReader.java       |   4 +
 .../MessageSerializationRegistry.java              |  26 +-
 .../network/serialization/MessageWriter.java       |   4 +
 .../org/apache/ignite/network/TestMessage.java     |   2 +-
 .../netty}/ConnectionManagerTest.java              |  86 ++++-
 .../internal/recovery/RecoveryHandshakeTest.java   | 349 +++++++++++++++++++++
 .../network/scalecube/ITNodeRestartsTest.java      |   8 +-
 .../scalecube/ITScaleCubeNetworkMessagingTest.java |   6 +
 .../direct/stream/DirectByteBufferStream.java      |   8 +
 .../stream/DirectByteBufferStreamImplV1.java       |   2 +
 .../internal/handshake/HandshakeAction.java}       |  23 +-
 .../internal/handshake/HandshakeException.java}    |  25 +-
 .../internal/handshake/HandshakeManager.java       |  59 ++++
 .../network/internal/netty/ConnectionManager.java  | 122 ++++---
 .../network/internal/netty/HandshakeHandler.java   |  98 ++++++
 .../network/internal/netty/InboundDecoder.java     |  12 +
 .../ignite/network/internal/netty/NettyClient.java |  68 ++--
 .../ignite/network/internal/netty/NettySender.java | 130 ++------
 .../ignite/network/internal/netty/NettyServer.java |  41 ++-
 .../ignite/network/internal/netty/NettyUtils.java  |   1 +
 .../{NettySender.java => OutboundEncoder.java}     |  49 +--
 .../recovery/RecoveryClientHandshakeManager.java   | 105 +++++++
 .../recovery/RecoveryServerHandshakeManager.java   | 103 ++++++
 .../recovery/message/HandshakeMessageFactory.java} |  18 +-
 .../recovery/message/HandshakeStartMessage.java}   |  38 ++-
 .../message/HandshakeStartMessageImpl.java}        |  41 ++-
 .../message/HandshakeStartResponseMessage.java     |  60 ++++
 .../message/HandshakeStartResponseMessageImpl.java |  83 +++++
 .../scalecube/ScaleCubeClusterServiceFactory.java  |  19 +-
 .../ScaleCubeDirectMarshallerTransport.java        |  15 +-
 .../scalecube/ScaleCubeMessagingService.java       |  19 +-
 .../scalecube/ScaleCubeTopologyService.java        |   4 +
 .../network/internal/netty/InboundDecoderTest.java |   4 +-
 .../network/internal/netty/NettyClientTest.java    |  74 ++++-
 .../network/internal/netty/NettyServerTest.java    | 118 ++++++-
 .../raft/server/ITRaftCounterServerTest.java       |   8 +-
 .../apache/ignite/internal/app/IgnitionImpl.java   |   2 +-
 .../ignite/distributed/ITDistributedTableTest.java |   8 +-
 50 files changed, 1592 insertions(+), 317 deletions(-)

diff --git a/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/configuration/processor/internal/Processor.java b/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/configuration/processor/internal/Processor.java
index c4bdbb7..217b3b9 100644
--- a/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/configuration/processor/internal/Processor.java
+++ b/modules/configuration-annotation-processor/src/main/java/org/apache/ignite/configuration/processor/internal/Processor.java
@@ -885,6 +885,6 @@ public class Processor extends AbstractProcessor {
 
     /** {@inheritDoc} */
     @Override public SourceVersion getSupportedSourceVersion() {
-        return SourceVersion.RELEASE_11;
+        return SourceVersion.latest();
     }
 }
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index 34df12a..36c04c0 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -50,6 +50,10 @@ import org.apache.ignite.metastorage.client.WatchListener;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
 import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
 import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
 import org.apache.ignite.network.scalecube.message.ScaleCubeMessageSerializationFactory;
@@ -104,10 +108,10 @@ public class ITMetaStorageServiceTest {
     private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
 
     /** */
-    // TODO: IGNITE-14088 Uncomment and use real serializer provider
-    //    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
     private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry()
-            .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory());
+        .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory())
+        .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+        .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory());
 
     /**  Expected server result entry. */
     private static final org.apache.ignite.internal.metastorage.server.Entry EXPECTED_SRV_RESULT_ENTRY =
diff --git a/modules/network-annotation-processor/src/integrationTest/java/org/apache/ignite/network/messages/internal/processor/ITAutoSerializableProcessorTest.java b/modules/network-annotation-processor/src/integrationTest/java/org/apache/ignite/network/messages/internal/processor/ITAutoSerializableProcessorTest.java
index 2d6e9eb..2b1b340 100644
--- a/modules/network-annotation-processor/src/integrationTest/java/org/apache/ignite/network/messages/internal/processor/ITAutoSerializableProcessorTest.java
+++ b/modules/network-annotation-processor/src/integrationTest/java/org/apache/ignite/network/messages/internal/processor/ITAutoSerializableProcessorTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.network.processor.internal;
+package org.apache.ignite.network.messages.internal.processor;
 
 import java.util.Arrays;
 import java.util.List;
@@ -25,6 +25,7 @@ import com.google.testing.compile.Compilation;
 import com.google.testing.compile.Compiler;
 import com.google.testing.compile.JavaFileObjects;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.processor.internal.AutoSerializableProcessor;
 import org.junit.jupiter.api.Test;
 
 import static com.google.testing.compile.CompilationSubject.assertThat;
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/network/processor/internal/AutoSerializableProcessor.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/network/processor/internal/AutoSerializableProcessor.java
index bb053ed..202c5a3 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/network/processor/internal/AutoSerializableProcessor.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/network/processor/internal/AutoSerializableProcessor.java
@@ -29,7 +29,6 @@ import java.util.StringJoiner;
 import java.util.stream.Collectors;
 import javax.annotation.processing.AbstractProcessor;
 import javax.annotation.processing.RoundEnvironment;
-import javax.annotation.processing.SupportedSourceVersion;
 import javax.lang.model.SourceVersion;
 import javax.lang.model.element.Element;
 import javax.lang.model.element.ElementKind;
@@ -54,7 +53,6 @@ import org.apache.ignite.network.serialization.MessageSerializer;
  * Annotation processor for generating (de-)serializers for network messages marked with the {@link AutoSerializable}
  * annotation.
  */
-@SupportedSourceVersion(SourceVersion.RELEASE_11)
 public class AutoSerializableProcessor extends AbstractProcessor {
     /** {@inheritDoc} */
     @Override public Set<String> getSupportedAnnotationTypes() {
@@ -245,4 +243,9 @@ public class AutoSerializableProcessor extends AbstractProcessor {
 
         return false;
     }
+
+    /** {@inheritDoc} */
+    @Override public SourceVersion getSupportedSourceVersion() {
+        return SourceVersion.latest();
+    }
 }
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/AbstractClusterService.java b/modules/network-api/src/main/java/org/apache/ignite/network/AbstractClusterService.java
index ec551d3..8185ab4 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/AbstractClusterService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/AbstractClusterService.java
@@ -33,7 +33,13 @@ public abstract class AbstractClusterService implements ClusterService {
     /** Messaging service. */
     private final MessagingService messagingService;
 
-    /** */
+    /**
+     * Constructor.
+     *
+     * @param context Cluster context.
+     * @param topologyService Topology service.
+     * @param messagingService Messaging service.
+     */
     public AbstractClusterService(
         ClusterLocalConfiguration context,
         TopologyService topologyService,
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
index 700faa9..e3d1a4d 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterLocalConfiguration.java
@@ -55,28 +55,28 @@ public class ClusterLocalConfiguration {
     }
 
     /**
-     * Network alias of a node.
+     * @return Network alias of a node.
      */
     public String getName() {
         return name;
     }
 
     /**
-     * Port.
+     * @return Port.
      */
     public int getPort() {
         return port;
     }
 
     /**
-     * Addresses of other nodes.
+     * @return Addresses of other nodes.
      */
     public List<String> getMemberAddresses() {
         return memberAddresses;
     }
 
     /**
-     * Message mapper providers.
+     * @return Message serialization registry.
      */
     public MessageSerializationRegistry getSerializationRegistry() {
         return serializationRegistry;
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterNode.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterNode.java
index 2573a08..a37d723 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterNode.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -40,7 +40,10 @@ public class ClusterNode implements Serializable {
     private String address;
 
     /**
+     * @param id Local id that changes between restarts.
      * @param name Unique name of member in cluster.
+     * @param host Node host.
+     * @param port Node port.
      */
     public ClusterNode(String id, String name, String host, int port) {
         this.id = id;
@@ -49,6 +52,9 @@ public class ClusterNode implements Serializable {
         this.port = port;
     }
 
+    /**
+     * @return Node's local id.
+     */
     public String id() {
         return id;
     }
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterService.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterService.java
index d218bf7..bc5db45 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterService.java
@@ -23,17 +23,17 @@ package org.apache.ignite.network;
  */
 public interface ClusterService {
     /**
-     * Returns the {@link TopologyService} for working with the cluster topology.
+     * @return {@link TopologyService} for working with the cluster topology.
      */
     TopologyService topologyService();
 
     /**
-     * Returns the {@link TopologyService} for sending messages to the cluster members.
+     * @return {@link TopologyService} for sending messages to the cluster members.
      */
     MessagingService messagingService();
 
     /**
-     * Returns the context associated with the current node.
+     * @return Context associated with the current node.
      */
     ClusterLocalConfiguration localConfiguration();
 
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
index b28c42d..c149208 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
@@ -16,10 +16,15 @@
  */
 package org.apache.ignite.network;
 
-/** */
+/**
+ * Cluster service factory.
+ */
 public interface ClusterServiceFactory {
     /**
      * Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
+     *
+     * @param context Cluster context.
+     * @return New cluster service.
      */
     ClusterService createClusterService(ClusterLocalConfiguration context);
 }
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
index 7b55c4d..1968aba 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
@@ -43,6 +43,7 @@ public interface MessagingService {
      *
      * @param recipient Recipient of the message.
      * @param msg Message which should be delivered.
+     * @return Future of the send operation.
      */
     CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg);
 
@@ -52,6 +53,7 @@ public interface MessagingService {
      * @param recipient Recipient of the message.
      * @param msg Message which should be delivered.
      * @param correlationId Correlation id when replying to the request.
+     * @return Future of the send operation.
      */
     CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg, String correlationId);
 
@@ -68,6 +70,8 @@ public interface MessagingService {
 
     /**
      * Registers a handler for network message events.
+     *
+     * @param handler Message handler.
      */
     void addMessageHandler(NetworkMessageHandler handler);
 }
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java b/modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
index 3c4cb36..67c1065 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
@@ -22,12 +22,16 @@ package org.apache.ignite.network;
 public interface TopologyEventHandler {
     /**
      * Called when a new member has been detected joining a cluster.
+     *
+     * @param member Appeared cluster member.
      */
     void onAppeared(ClusterNode member);
 
     /**
      * Indicates that a member has left a cluster. This method is only called when a member leaves permanently (i.e.
      * it is not possible to re-establish a connection to it).
+     *
+     * @param member Disappeared cluster member.
      */
     void onDisappeared(ClusterNode member);
 }
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java b/modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java
index e343bc0..b7fac7d 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java
@@ -38,6 +38,8 @@ public interface TopologyService {
 
     /**
      * Registers a handler for topology change events.
+     *
+     * @param handler Topology events handler.
      */
     void addEventHandler(TopologyEventHandler handler);
 
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageReader.java b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageReader.java
index a313c2f..07c71cd 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageReader.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageReader.java
@@ -232,6 +232,7 @@ public interface MessageReader {
     /**
      * Reads a nested message.
      *
+     * @param <T> Type of a message;
      * @param name Field name.
      * @return Message.
      */
@@ -240,6 +241,7 @@ public interface MessageReader {
     /**
      * Reads an array of objects.
      *
+     * @param <T> Type of an array.
      * @param name Field name.
      * @param itemType A component type of the array.
      * @param itemCls A component class of the array.
@@ -250,6 +252,7 @@ public interface MessageReader {
     /**
      * Reads a collection.
      *
+     * @param <C> Type of a collection.
      * @param name Field name.
      * @param itemType An item type of the Collection.
      * @return Collection.
@@ -259,6 +262,7 @@ public interface MessageReader {
     /**
      * Reads a map.
      *
+     * @param <M> Type of a map.
      * @param name Field name.
      * @param keyType The type of the map's key.
      * @param valType The type of the map's value.
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageSerializationRegistry.java b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageSerializationRegistry.java
index 0c8d559..f9fa06e 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageSerializationRegistry.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageSerializationRegistry.java
@@ -23,16 +23,22 @@ import org.apache.ignite.network.NetworkMessage;
 /**
  * Container that maps message types to {@link MessageSerializationFactory} instances.
  */
-public final class MessageSerializationRegistry {
+public class MessageSerializationRegistry {
     /** message type -> MessageSerializerProvider instance */
     private final MessageSerializationFactory<?>[] factories = new MessageSerializationFactory<?>[Short.MAX_VALUE << 1];
 
     /**
      * Registers message serialization factory by message type.
+     *
+     * @param type Direct type of a message.
+     * @param factory Message's serialization factory.
+     * @return This registry.
+     * @throws NetworkConfigurationException If there is an already registered factory for the given type.
      */
-    public MessageSerializationRegistry registerFactory(short type, MessageSerializationFactory<?> factory) {
+    public MessageSerializationRegistry registerFactory(short type,
+        MessageSerializationFactory<?> factory) {
         if (this.factories[type] != null)
-            throw new NetworkConfigurationException("Message mapper for type " + type + " is already defined");
+            throw new NetworkConfigurationException("Message serialization factory for direct type " + type + " is already defined");
 
         this.factories[type] = factory;
 
@@ -40,7 +46,11 @@ public final class MessageSerializationRegistry {
     }
 
     /**
-     * Returns a {@link MessageSerializationFactory} for the given message type.
+     * Gets a {@link MessageSerializationFactory} for the given message type.
+     *
+     * @param <T> Type of a message.
+     * @param type Direct type of a message.
+     * @return Message's serialization factory.
      */
     private <T extends NetworkMessage> MessageSerializationFactory<T> getFactory(short type) {
         var provider = factories[type];
@@ -56,6 +66,10 @@ public final class MessageSerializationRegistry {
      * {@link MessageSerializationRegistry} does not track the correspondence between the message type and its Java
      * representation, so the actual generic specialization of the returned provider relies on the caller of this
      * method.
+     *
+     * @param <T> Type of a message.
+     * @param type Direct type of a message.
+     * @return Message's serializer.
      */
     public <T extends NetworkMessage> MessageSerializer<T> createSerializer(short type) {
         MessageSerializationFactory<T> factory = getFactory(type);
@@ -68,6 +82,10 @@ public final class MessageSerializationRegistry {
      * {@link MessageSerializationRegistry} does not track the correspondence between the message type and its Java
      * representation, so the actual generic specialization of the returned provider relies on the caller of this
      * method.
+     *
+     * @param <T> Type of a message.
+     * @param type Direct type of a message.
+     * @return Message's deserializer.
      */
     public <T extends NetworkMessage> MessageDeserializer<T> createDeserializer(short type) {
         MessageSerializationFactory<T> factory = getFactory(type);
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageWriter.java b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageWriter.java
index 4f976de..1da4989 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageWriter.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageWriter.java
@@ -266,6 +266,7 @@ public interface MessageWriter {
     /**
      * Writes an array of objects.
      *
+     * @param <T> Type of an array.
      * @param name Field name.
      * @param arr Array of objects.
      * @param itemType A component type of the array.
@@ -276,6 +277,7 @@ public interface MessageWriter {
     /**
      * Writes collection.
      *
+     * @param <T> Type of a collection.
      * @param name Field name.
      * @param col Collection.
      * @param itemType An item type of the collection.
@@ -286,6 +288,8 @@ public interface MessageWriter {
     /**
      * Writes a map.
      *
+     * @param <K> Type of the map's keys.
+     * @param <V> Type of the map's values.
      * @param name Field name.
      * @param map Map.
      * @param keyType The type of the map's key.
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
index dc6de9b..0ed4133 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
@@ -23,7 +23,7 @@ import org.apache.ignite.network.processor.annotations.AutoSerializable;
 @AutoSerializable(messageFactory = TestMessageFactory.class)
 public interface TestMessage extends NetworkMessage {
     /** Visible type for tests. */
-    public static final short TYPE = 3;
+    public static final short TYPE = 4;
 
     String msg();
 
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java
similarity index 58%
rename from modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
rename to modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java
index def8e56..4862080 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/netty/ConnectionManagerTest.java
@@ -15,21 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.network;
+package org.apache.ignite.network.internal.netty;
 
 import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
-import org.apache.ignite.network.internal.netty.ConnectionManager;
-import org.apache.ignite.network.internal.netty.NettyClient;
-import org.apache.ignite.network.internal.netty.NettySender;
-import org.apache.ignite.network.internal.netty.NettyServer;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageFactory;
+import org.apache.ignite.network.TestMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.network.internal.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -72,11 +78,52 @@ public class ConnectionManagerTest {
 
         manager2.addListener((address, message) -> fut.complete(message));
 
-        NettySender sender = manager1.channel(new InetSocketAddress(port2)).get();
+        NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
 
         TestMessage testMessage = TestMessageFactory.testMessage().msg(msgText).build();
 
-        sender.send(testMessage).join();
+        sender.send(testMessage).get(3, TimeUnit.SECONDS);
+
+        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+
+        assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+    }
+
+    /**
+     * Tests that incoming connection is reused for sending messages.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testReuseIncomingConnection() throws Exception {
+        String msgText = "test";
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        ConnectionManager manager1 = startManager(port1);
+        ConnectionManager manager2 = startManager(port2);
+
+        var fut = new CompletableFuture<NetworkMessage>();
+
+        manager1.addListener((address, message) -> fut.complete(message));
+
+        NettySender senderFrom1to2 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+
+        // Ensure a handshake has finished on both sides.
+        senderFrom1to2.send(TestMessageFactory.testMessage().msg("test").build()).get(3, TimeUnit.SECONDS);
+
+        NettySender senderFrom2to1 = manager2.channel(manager1.consistentId(), new InetSocketAddress(port1)).get(3, TimeUnit.SECONDS);
+
+        InetSocketAddress clientLocalAddress = (InetSocketAddress) senderFrom1to2.channel().localAddress();
+
+        InetSocketAddress clientRemoteAddress = (InetSocketAddress) senderFrom2to1.channel().remoteAddress();
+
+        assertEquals(clientLocalAddress, clientRemoteAddress);
+
+        TestMessage testMessage = TestMessageFactory.testMessage().msg("test").build();
+
+        senderFrom2to1.send(testMessage).get(3, TimeUnit.SECONDS);
 
         NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
 
@@ -96,8 +143,8 @@ public class ConnectionManagerTest {
         ConnectionManager manager1 = startManager(port1);
         ConnectionManager manager2 = startManager(port2);
 
-        NettySender sender1 = manager1.channel(new InetSocketAddress(port2)).get();
-        NettySender sender2 = manager2.channel(new InetSocketAddress(port1)).get();
+        NettySender sender1 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
+        NettySender sender2 = manager2.channel(null, new InetSocketAddress(port1)).get(3, TimeUnit.SECONDS);
 
         assertNotNull(sender1);
         assertNotNull(sender2);
@@ -131,7 +178,7 @@ public class ConnectionManagerTest {
         ConnectionManager manager1 = startManager(port1);
         ConnectionManager manager2 = startManager(port2);
 
-        NettySender sender = manager1.channel(new InetSocketAddress(port2)).get();
+        NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
 
         TestMessage testMessage = TestMessageFactory.testMessage().msg(msgText).build();
 
@@ -141,7 +188,7 @@ public class ConnectionManagerTest {
 
         assertThrows(ClosedChannelException.class, () -> {
             try {
-                finalSender.send(testMessage).join();
+                finalSender.send(testMessage).get(3, TimeUnit.SECONDS);
             }
             catch (Exception e) {
                 throw e.getCause();
@@ -154,9 +201,9 @@ public class ConnectionManagerTest {
 
         manager2.addListener((address, message) -> fut.complete(message));
 
-        sender = manager1.channel(new InetSocketAddress(port2)).get();
+        sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
 
-        sender.send(testMessage).join();
+        sender.send(testMessage).get(3, TimeUnit.SECONDS);
 
         NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
 
@@ -171,9 +218,20 @@ public class ConnectionManagerTest {
      */
     private ConnectionManager startManager(int port) {
         var registry = new MessageSerializationRegistry()
+            .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+            .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory())
             .registerFactory(TestMessage.TYPE, new TestMessageSerializationFactory());
 
-        var manager = new ConnectionManager(port, registry);
+        UUID launchId = UUID.randomUUID();
+        String consistentId = UUID.randomUUID().toString();
+
+        var manager = new ConnectionManager(
+            port,
+            registry,
+            consistentId,
+            () -> new RecoveryServerHandshakeManager(launchId, consistentId),
+            () -> new RecoveryClientHandshakeManager(launchId, consistentId)
+        );
 
         manager.start();
 
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java
new file mode 100644
index 0000000..e717951
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/internal/recovery/RecoveryHandshakeTest.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import io.netty.channel.Channel;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageFactory;
+import org.apache.ignite.network.TestMessageSerializationFactory;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_INIT;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryClientHandshakeManager.ClientStageFail.CLIENT_SERVER_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CLIENT_RESPONDED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_CONNECTION_OPENED;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_DOESNT_FAIL;
+import static org.apache.ignite.network.internal.recovery.RecoveryHandshakeTest.FailingRecoveryServerHandshakeManager.ServerStageFail.SERVER_INIT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Recovery protocol handshake tests.
+ */
+public class RecoveryHandshakeTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    final void tearDown() {
+        startedManagers.forEach(ConnectionManager::stop);
+    }
+
+    /**
+     * Tests handshake scenarios in which some of the parts of handshake protocol can fail.
+     *
+     * @param scenario Handshake scenario.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("handshakeScenarios")
+    public void testHandshakeScenario(HandshakeScenario scenario) throws Exception {
+        ConnectionManager manager1 = startManager(
+            4000,
+            scenario.serverFailAt,
+            CLIENT_DOESNT_FAIL
+        );
+
+        ConnectionManager manager2 = startManager(
+            4001,
+            SERVER_DOESNT_FAIL,
+            scenario.clientFailAt
+        );
+
+        NettySender from2to1;
+
+        try {
+            from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
+        }
+        catch (Exception e) {
+            if (scenario.clientFailAt == CLIENT_DOESNT_FAIL &&
+                scenario.serverFailAt == SERVER_DOESNT_FAIL)
+                Assertions.fail(e);
+
+            return;
+        }
+
+        if (scenario.clientFailAt != CLIENT_DOESNT_FAIL || scenario.serverFailAt != SERVER_DOESNT_FAIL)
+            Assertions.fail("Handshake should've failed");
+
+        assertNotNull(from2to1);
+
+        // Ensure the handshake has finished on both sides.
+        from2to1.send(TestMessageFactory.testMessage().msg("test").build()).get(3, TimeUnit.SECONDS);
+
+        NettySender from1to2 = manager1.channel(manager2.consistentId(), manager2.getLocalAddress()).get(3, TimeUnit.SECONDS);
+
+        assertNotNull(from1to2);
+
+        assertEquals(from2to1.channel().localAddress(), from1to2.channel().remoteAddress());
+    }
+
+    /**
+     * Tests special handshake scenario: the client assumes a handshake has been finished, but the server fails
+     * on client's response. The server will then close a connection and the client should get the
+     * "connection closed event".
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testHandshakeFailsOnServerWhenClientResponded() throws Exception {
+        ConnectionManager manager1 = startManager(
+            4000,
+            SERVER_CLIENT_RESPONDED,
+            CLIENT_DOESNT_FAIL
+        );
+
+        ConnectionManager manager2 = startManager(
+            4001,
+            SERVER_DOESNT_FAIL,
+            CLIENT_DOESNT_FAIL
+        );
+
+        NettySender from2to1 = manager2.channel(manager1.consistentId(), manager1.getLocalAddress()).get(3, TimeUnit.SECONDS);
+
+        from2to1.channel().closeFuture().get(3, TimeUnit.SECONDS);
+    }
+
+    /**
+     * @return Generates handshake scenarios.
+     */
+    private static List<HandshakeScenario> handshakeScenarios() {
+        ServerStageFail[] serverOpts = ServerStageFail.values();
+
+        ClientStageFail[] clientOpts = ClientStageFail.values();
+
+        List<HandshakeScenario> res = new ArrayList<>();
+
+        for (ServerStageFail serverOpt : serverOpts)
+            for (ClientStageFail clientOpt : clientOpts)
+                // The case in if statement is handled in separate test
+                if (serverOpt != SERVER_CLIENT_RESPONDED && clientOpt != CLIENT_DOESNT_FAIL)
+                    res.add(new HandshakeScenario(serverOpt, clientOpt));
+
+        return res;
+    }
+
+    /** Handshake scenario. */
+    private static class HandshakeScenario {
+        /** Stage to fail server handshake at. */
+        private final ServerStageFail serverFailAt;
+
+        /** Stage to fail client handshake at. */
+        private final ClientStageFail clientFailAt;
+
+        /** Constructor. */
+        private HandshakeScenario(ServerStageFail serverFailAt, ClientStageFail clientFailAt) {
+            this.serverFailAt = serverFailAt;
+            this.clientFailAt = clientFailAt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return String.format("server=%s, client=%s", serverFailAt, clientFailAt);
+        }
+    }
+
+    /**
+     * {@link RecoveryServerHandshakeManager} that can fail at specific stage of the handshake.
+     */
+    static class FailingRecoveryServerHandshakeManager extends RecoveryServerHandshakeManager {
+        /**
+         * At what stage to fail the handshake.
+         */
+        private final ServerStageFail failAtStage;
+
+        /** Constructor. */
+        private FailingRecoveryServerHandshakeManager(
+            UUID launchId,
+            String consistentId,
+            ServerStageFail failAtStage
+        ) {
+            super(launchId, consistentId);
+            this.failAtStage = failAtStage;
+        }
+
+        /** {@inheritDoc} */
+        @Override public HandshakeAction init(Channel channel) {
+            if (failAtStage == SERVER_INIT) {
+                handshakeFuture().completeExceptionally(new RuntimeException());
+                return HandshakeAction.FAIL;
+            }
+
+            return super.init(channel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public HandshakeAction onConnectionOpen(Channel channel) {
+            if (failAtStage == SERVER_CONNECTION_OPENED) {
+                handshakeFuture().completeExceptionally(new RuntimeException());
+                return HandshakeAction.FAIL;
+            }
+
+            return super.onConnectionOpen(channel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public HandshakeAction onMessage(Channel channel, NetworkMessage message) {
+            if (failAtStage == SERVER_CLIENT_RESPONDED) {
+                handshakeFuture().completeExceptionally(new RuntimeException());
+                return HandshakeAction.FAIL;
+            }
+
+            return super.onMessage(channel, message);
+        }
+
+        /** Server handshake stage to fail at. */
+        enum ServerStageFail {
+            /** Don't fail at all. */
+            SERVER_DOESNT_FAIL,
+
+            /** Fail on init. */
+            SERVER_INIT,
+
+            /** Fail on connection open */
+            SERVER_CONNECTION_OPENED,
+
+            /** Fail on client response. */
+            SERVER_CLIENT_RESPONDED
+        }
+    }
+
+    /**
+     * {@link RecoveryClientHandshakeManager} that can fail at specific stage of the handshake.
+     */
+    static class FailingRecoveryClientHandshakeManager extends RecoveryClientHandshakeManager {
+        /**
+         * At what stage to fail the handshake.
+         */
+        private final ClientStageFail failAtStage;
+
+        /** Constructor. */
+        private FailingRecoveryClientHandshakeManager(
+            UUID launchId,
+            String consistentId,
+            ClientStageFail failAtStage
+        ) {
+            super(launchId, consistentId);
+            this.failAtStage = failAtStage;
+        }
+
+        /** {@inheritDoc} */
+        @Override public HandshakeAction init(Channel channel) {
+            if (failAtStage == CLIENT_INIT) {
+                handshakeFuture().completeExceptionally(new RuntimeException());
+                return HandshakeAction.FAIL;
+            }
+
+            return super.init(channel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public HandshakeAction onConnectionOpen(Channel channel) {
+            if (failAtStage == CLIENT_CONNECTION_OPENED) {
+                handshakeFuture().completeExceptionally(new RuntimeException());
+                return HandshakeAction.FAIL;
+            }
+
+            return super.onConnectionOpen(channel);
+        }
+
+        /** {@inheritDoc} */
+        @Override public HandshakeAction onMessage(Channel channel, NetworkMessage message) {
+            if (failAtStage == CLIENT_SERVER_RESPONDED) {
+                handshakeFuture().completeExceptionally(new RuntimeException());
+                return HandshakeAction.FAIL;
+            }
+
+            return super.onMessage(channel, message);
+        }
+
+        /** Client handshake stage to fail at. */
+        enum ClientStageFail {
+            /** Don't fail at all. */
+            CLIENT_DOESNT_FAIL,
+
+            /** Fail on init. */
+            CLIENT_INIT,
+
+            /** Fail on connection open. */
+            CLIENT_CONNECTION_OPENED,
+
+            /** Fail on server response. */
+            CLIENT_SERVER_RESPONDED
+        }
+    }
+
+    /**
+     * Create and start a {@link ConnectionManager} adding it to the {@link #startedManagers} list.
+     *
+     * @param port Port for the {@link ConnectionManager#server}.
+     * @param serverHandshakeFailAt At what stage to fail server handshake.
+     * @param clientHandshakeFailAt At what stage to fail client handshake.
+     * @return Connection manager.
+     */
+    private ConnectionManager startManager(
+        int port,
+        ServerStageFail serverHandshakeFailAt,
+        ClientStageFail clientHandshakeFailAt
+    ) {
+        var registry = new MessageSerializationRegistry()
+            .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+            .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory())
+            .registerFactory(TestMessage.TYPE, new TestMessageSerializationFactory());
+
+        UUID launchId = UUID.randomUUID();
+        String consistentId = UUID.randomUUID().toString();
+
+        var manager = new ConnectionManager(
+            port,
+            registry,
+            consistentId,
+            () -> new FailingRecoveryServerHandshakeManager(launchId, consistentId, serverHandshakeFailAt),
+            () -> new FailingRecoveryClientHandshakeManager(launchId, consistentId, clientHandshakeFailAt)
+        );
+
+        manager.start();
+
+        startedManagers.add(manager);
+
+        return manager;
+    }
+
+}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
index e7fd486..efd4b79 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITNodeRestartsTest.java
@@ -23,6 +23,10 @@ import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
 import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
 import org.apache.ignite.network.scalecube.message.ScaleCubeMessageSerializationFactory;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
@@ -41,7 +45,9 @@ class ITNodeRestartsTest {
 
     /** */
     private final MessageSerializationRegistry serializationRegistry = new MessageSerializationRegistry()
-        .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory());
+        .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory())
+        .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+        .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory());
 
     /** */
     private final ClusterServiceFactory networkFactory = new TestScaleCubeClusterServiceFactory();
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
index 6a519c2..b24be1b 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
@@ -38,6 +38,10 @@ import org.apache.ignite.network.TestMessage;
 import org.apache.ignite.network.TestMessageFactory;
 import org.apache.ignite.network.TestMessageSerializationFactory;
 import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
 import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
 import org.apache.ignite.network.scalecube.message.ScaleCubeMessageSerializationFactory;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
@@ -266,6 +270,8 @@ class ITScaleCubeNetworkMessagingTest {
         /** */
         private final MessageSerializationRegistry serializationRegistry = new MessageSerializationRegistry()
             .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory())
+            .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+            .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory())
             .registerFactory(TestMessage.TYPE, new TestMessageSerializationFactory());
 
         /** */
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStream.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStream.java
index 1986e4b..4dd0f36 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStream.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStream.java
@@ -167,6 +167,7 @@ public interface DirectByteBufferStream {
     public void writeMessage(NetworkMessage msg, MessageWriter writer);
 
     /**
+     * @param <T> Type of the array.
      * @param arr Array.
      * @param itemType Component type.
      * @param writer Writer.
@@ -174,6 +175,7 @@ public interface DirectByteBufferStream {
     public <T> void writeObjectArray(T[] arr, MessageCollectionItemType itemType, MessageWriter writer);
 
     /**
+     * @param <T> Type of the collection.
      * @param col Collection.
      * @param itemType Component type.
      * @param writer Writer.
@@ -181,6 +183,8 @@ public interface DirectByteBufferStream {
     public <T> void writeCollection(Collection<T> col, MessageCollectionItemType itemType, MessageWriter writer);
 
     /**
+     * @param <K> Type of the map's keys.
+     * @param <V> Type of the map's values.
      * @param map Map.
      * @param keyType Key type.
      * @param valType Value type.
@@ -290,12 +294,14 @@ public interface DirectByteBufferStream {
     public IgniteUuid readIgniteUuid();
 
     /**
+     * @param <T> Type of a message.
      * @param reader Reader.
      * @return Message.
      */
     public <T extends NetworkMessage> T readMessage(MessageReader reader);
 
     /**
+     * @param <T> Type of an array.
      * @param itemType Item type.
      * @param itemCls Item class.
      * @param reader Reader.
@@ -304,6 +310,7 @@ public interface DirectByteBufferStream {
     public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls, MessageReader reader);
 
     /**
+     * @param <C> Type of a collection.
      * @param itemType Item type.
      * @param reader Reader.
      * @return Collection.
@@ -311,6 +318,7 @@ public interface DirectByteBufferStream {
     public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, MessageReader reader);
 
     /**
+     * @param <M> Type of a map.
      * @param keyType Key type.
      * @param valType Value type.
      * @param linked Whether linked map should be created.
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
index 29a5cb3..eaaf834 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/direct/stream/DirectByteBufferStreamImplV1.java
@@ -1321,6 +1321,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     }
 
     /**
+     * @param <T> Type of an array.
      * @param creator Array creator.
      * @param lenShift Array length shift size.
      * @param off Base offset.
@@ -1383,6 +1384,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     }
 
     /**
+     * @param <T> Type of an array.
      * @param creator Array creator.
      * @param typeSize Primitive type size in bytes.
      * @param lenShift Array length shift size.
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeAction.java
similarity index 64%
copy from modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
copy to modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeAction.java
index b28c42d..4900223 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeAction.java
@@ -14,12 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.network;
 
-/** */
-public interface ClusterServiceFactory {
-    /**
-     * Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
-     */
-    ClusterService createClusterService(ClusterLocalConfiguration context);
+package org.apache.ignite.network.internal.handshake;
+
+import org.apache.ignite.network.internal.netty.HandshakeHandler;
+
+/**
+ * Enum representing actions that are propagated from a {@link HandshakeManager} to a {@link HandshakeHandler}.
+ */
+public enum HandshakeAction {
+    /** Fail the handshake operation and close the channel. */
+    FAIL,
+
+    /** Remove the handshake handler. */
+    REMOVE_HANDLER,
+
+    /** Do nothing. */
+    NOOP
 }
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java b/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeException.java
similarity index 62%
copy from modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
copy to modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeException.java
index 3c4cb36..f5fdc59 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/TopologyEventHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeException.java
@@ -14,20 +14,29 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.network;
+
+package org.apache.ignite.network.internal.handshake;
 
 /**
- * Interface for handling events related to topology changes.
+ * Handshake exception.
  */
-public interface TopologyEventHandler {
+public class HandshakeException extends Exception {
     /**
-     * Called when a new member has been detected joining a cluster.
+     * Constructor.
+     *
+     * @param message Handshake error message.
      */
-    void onAppeared(ClusterNode member);
+    public HandshakeException(String message) {
+        super(message);
+    }
 
     /**
-     * Indicates that a member has left a cluster. This method is only called when a member leaves permanently (i.e.
-     * it is not possible to re-establish a connection to it).
+     * Constructor.
+     *
+     * @param message Handshake error message.
+     * @param cause Handshake error cause.
      */
-    void onDisappeared(ClusterNode member);
+    public HandshakeException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeManager.java
new file mode 100644
index 0000000..aaacc9d
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/handshake/HandshakeManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.handshake;
+
+import java.util.concurrent.CompletableFuture;
+import io.netty.channel.Channel;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.netty.HandshakeHandler;
+import org.apache.ignite.network.internal.netty.NettySender;
+
+/**
+ * Handshake operation manager.
+ */
+public interface HandshakeManager {
+    /**
+     * Initialize handshake manager with the channel.
+     *
+     * @param channel Channel.
+     * @return Action to perform by {@link HandshakeHandler}.
+     */
+    HandshakeAction init(Channel channel);
+
+    /**
+     * Handle an event of the connection opening.
+     *
+     * @param channel Channel.
+     * @return Action to perform by {@link HandshakeHandler}.
+     */
+    HandshakeAction onConnectionOpen(Channel channel);
+
+    /**
+     * Handle an incoming message.
+     *
+     * @param channel Channel.
+     * @param message Message to handle.
+     * @return Action to perform by {@link HandshakeHandler}.
+     */
+    HandshakeAction onMessage(Channel channel, NetworkMessage message);
+
+    /**
+     * @return Future that represents the handshake operation.
+     */
+    CompletableFuture<NettySender> handshakeFuture();
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
index 3b3f94c..ae5ae16 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
@@ -28,19 +28,19 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -62,8 +62,8 @@ public class ConnectionManager {
     /** Server. */
     private final NettyServer server;
 
-    /** Channels. */
-    private final Map<SocketAddress, NettySender> channels = new ConcurrentHashMap<>();
+    /** Channels map from consistentId to {@link NettySender}. */
+    private final Map<String, NettySender> channels = new ConcurrentHashMap<>();
 
     /** Clients. */
     private final Map<SocketAddress, NettyClient> clients = new ConcurrentHashMap<>();
@@ -74,16 +74,39 @@ public class ConnectionManager {
     /** Message listeners. */
     private final List<BiConsumer<SocketAddress, NetworkMessage>> listeners = new CopyOnWriteArrayList<>();
 
+    /** Node consistent id. */
+    private final String consistentId;
+
+    /** Client handshake manager factory. */
+    private final Supplier<HandshakeManager> clientHandshakeManagerFactory;
+
     /**
      * Constructor.
      *
      * @param port Server port.
      * @param registry Serialization registry.
+     * @param consistentId Consistent id of this node.
+     * @param serverHandshakeManagerFactory Server handshake manager factory.
+     * @param clientHandshakeManagerFactory Client handshake manager factory.
      */
-    public ConnectionManager(int port, MessageSerializationRegistry registry) {
+    public ConnectionManager(
+        int port,
+        MessageSerializationRegistry registry,
+        String consistentId,
+        Supplier<HandshakeManager> serverHandshakeManagerFactory,
+        Supplier<HandshakeManager> clientHandshakeManagerFactory
+    ) {
         this.serializationRegistry = registry;
-        this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry);
-        this.clientBootstrap = createClientBootstrap(clientWorkerGroup, serializationRegistry, this::onMessage);
+        this.consistentId = consistentId;
+        this.clientHandshakeManagerFactory = clientHandshakeManagerFactory;
+        this.server = new NettyServer(
+            port,
+            serverHandshakeManagerFactory.get(),
+            this::onNewIncomingChannel,
+            this::onMessage,
+            serializationRegistry
+        );
+        this.clientBootstrap = createClientBootstrap(clientWorkerGroup, serializationRegistry);
     }
 
     /**
@@ -110,24 +133,36 @@ public class ConnectionManager {
 
     /**
      * Gets a {@link NettySender}, that sends data from this node to another node with the specified address.
+     * @param consistentId Another node's consistent id.
      * @param address Another node's address.
      * @return Sender.
      */
-    public CompletableFuture<NettySender> channel(SocketAddress address) {
-        NettySender channel = channels.compute(
-            address,
-            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
-        );
-
-        if (channel != null)
-            return CompletableFuture.completedFuture(channel);
+    public CompletableFuture<NettySender> channel(@Nullable String consistentId, SocketAddress address) {
+        if (consistentId != null) {
+            // If consistent id is known, try looking up a channel by consistent id. There can be an outbound connection
+            // or an inbound connection associated with that consistent id.
+            NettySender channel = channels.compute(
+                consistentId,
+                (addr, sender) -> (sender == null || !sender.isOpen()) ? null : sender
+            );
+
+            if (channel != null)
+                return CompletableFuture.completedFuture(channel);
+        }
 
+        // Get an existing client or create a new one. NettyClient provides a CompletableFuture that resolves
+        // when the client is ready for write operations, so previously started client, that didn't establish connection
+        // or didn't perform the handhsake operaton, can be reused.
         NettyClient client = clients.compute(address, (addr, existingClient) ->
             existingClient != null && !existingClient.failedToConnect() && !existingClient.isDisconnected() ?
                 existingClient : connect(addr)
         );
 
-        return client.sender();
+        CompletableFuture<NettySender> sender = client.sender();
+
+        assert sender != null;
+
+        return sender;
     }
 
     /**
@@ -146,8 +181,7 @@ public class ConnectionManager {
      * @param channel Channel from client to this {@link #server}.
      */
     private void onNewIncomingChannel(NettySender channel) {
-        SocketAddress remoteAddress = channel.remoteAddress();
-        channels.put(remoteAddress, channel);
+        channels.put(channel.consistentId(), channel);
     }
 
     /**
@@ -156,14 +190,21 @@ public class ConnectionManager {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
-        NettyClient client = new NettyClient(address, serializationRegistry);
+    private NettyClient connect(
+        SocketAddress address
+    ) {
+        var client = new NettyClient(
+            address,
+            serializationRegistry,
+            clientHandshakeManagerFactory.get(),
+            this::onMessage
+        );
 
         client.start(clientBootstrap).whenComplete((sender, throwable) -> {
-            if (throwable != null)
-                clients.remove(address);
+            if (throwable == null)
+                channels.put(sender.consistentId(), sender);
             else
-                channels.put(address, sender);
+                clients.remove(address);
         });
 
         return client;
@@ -208,6 +249,14 @@ public class ConnectionManager {
     }
 
     /**
+     * @return This node's consistent id.
+     */
+    @TestOnly
+    public String consistentId() {
+        return consistentId;
+    }
+
+    /**
      * @return Collection of all the clients started by this connection manager.
      */
     @TestOnly
@@ -215,18 +264,25 @@ public class ConnectionManager {
         return Collections.unmodifiableCollection(clients.values());
     }
 
+
+    /**
+     * @return Map of the channels.
+     */
+    @TestOnly
+    public Map<String, NettySender> channels() {
+        return Collections.unmodifiableMap(channels);
+    }
+
     /**
      * Creates a {@link Bootstrap} for clients, providing channel handlers and options.
      *
      * @param eventLoopGroup Event loop group for channel handling.
      * @param serializationRegistry Serialization registry.
-     * @param messageListener Message listener.
      * @return Bootstrap for clients.
      */
     public static Bootstrap createClientBootstrap(
         EventLoopGroup eventLoopGroup,
-        MessageSerializationRegistry serializationRegistry,
-        BiConsumer<SocketAddress, NetworkMessage> messageListener
+        MessageSerializationRegistry serializationRegistry
     ) {
         Bootstrap clientBootstrap = new Bootstrap();
 
@@ -234,16 +290,8 @@ public class ConnectionManager {
             .channel(NioSocketChannel.class)
             // See NettyServer#start for netty configuration details.
             .option(ChannelOption.SO_KEEPALIVE, true)
-            .handler(new ChannelInitializer<SocketChannel>() {
-                /** {@inheritDoc} */
-                @Override public void initChannel(SocketChannel ch) {
-                    ch.pipeline().addLast(
-                        new InboundDecoder(serializationRegistry),
-                        new MessageHandler(messageListener),
-                        new ChunkedWriteHandler()
-                    );
-                }
-            });
+            .option(ChannelOption.SO_LINGER, 0)
+            .option(ChannelOption.TCP_NODELAY, true);
 
         return clientBootstrap;
     }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java
new file mode 100644
index 0000000..fdbfa86
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/HandshakeHandler.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.handshake.HandshakeException;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
+
+/**
+ * Netty handler of the handshake operation.
+ */
+public class HandshakeHandler extends ChannelInboundHandlerAdapter {
+    /** Handshake manager. */
+    private final HandshakeManager manager;
+
+    /**
+     * Constructor.
+     *
+     * @param manager Handshake manager.
+     */
+    public HandshakeHandler(HandshakeManager manager) {
+        this.manager = manager;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+        HandshakeAction handshakeAction = manager.init(ctx.channel());
+
+        handleHandshakeAction(handshakeAction, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        HandshakeAction handshakeAction = manager.onConnectionOpen(ctx.channel());
+
+        handleHandshakeAction(handshakeAction, ctx);
+
+        ctx.fireChannelActive();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        HandshakeAction handshakeAction = manager.onMessage(ctx.channel(), (NetworkMessage) msg);
+
+        handleHandshakeAction(handshakeAction, ctx);
+        // No need to forward the message to the next handler as this message only matters for a handshake.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        // If this method is called that means channel has been closed before handshake has finished or handshake
+        // has failed.
+        manager.handshakeFuture().completeExceptionally(
+            new HandshakeException("Channel has been closed before handshake has finished or handshake has failed")
+        );
+
+        ctx.fireChannelInactive();
+    }
+
+    /**
+     * Handle {@link HandshakeAction}.
+     *
+     * @param action Handshake action.
+     * @param ctx Netty channel context.
+     */
+    private void handleHandshakeAction(HandshakeAction action, ChannelHandlerContext ctx) {
+        switch (action) {
+            case REMOVE_HANDLER:
+                ctx.pipeline().remove(this);
+                break;
+
+            case FAIL:
+                ctx.channel().close();
+                break;
+
+            case NOOP:
+                break;
+        }
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
index 196200f..0349056 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.network.internal.netty;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import io.netty.buffer.ByteBuf;
@@ -122,4 +123,15 @@ public class InboundDecoder extends ByteToMessageDecoder {
             }
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        if (LOG.isDebugEnabled())
+            LOG.debug("Exception caught: " + cause.getMessage(), cause);
+
+        // Ignore IOExceptions that are thrown from the Netty's insides. IOExceptions that occured during reads
+        // or writes should be handled elsewhere.
+        if (cause instanceof Exception && !(cause instanceof IOException) )
+            throw (Exception) cause;
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
index de5bf60..240e5c2 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
@@ -17,14 +17,19 @@
 
 package org.apache.ignite.network.internal.netty;
 
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.jetbrains.annotations.Nullable;
 
@@ -41,52 +46,53 @@ public class NettyClient {
     /** Destination address. */
     private final SocketAddress address;
 
-    /** Future that resolves when client channel is opened. */
+    /** Future that resolves when the client finished the handshake. */
     @Nullable
     private volatile CompletableFuture<NettySender> clientFuture = null;
 
+    /** Future that resolves when the client channel is opened. */
+    private CompletableFuture<Void> channelFuture = new CompletableFuture<>();
+
     /** Client channel. */
     @Nullable
     private volatile Channel channel = null;
 
+    /** Message listener. */
+    private final BiConsumer<SocketAddress, NetworkMessage> messageListener;
+
+    /** Handshake manager. */
+    private final HandshakeManager handshakeManager;
+
     /** Flag indicating if {@link #stop()} has been called. */
     private boolean stopped = false;
 
     /**
      * Constructor.
      *
-     * @param host Host.
-     * @param port Port.
-     * @param serializationRegistry Serialization registry.
-     */
-    public NettyClient(
-        String host,
-        int port,
-        MessageSerializationRegistry serializationRegistry
-    ) {
-        this(new InetSocketAddress(host, port), serializationRegistry);
-    }
-
-    /**
-     * Constructor.
-     *
      * @param address Destination address.
      * @param serializationRegistry Serialization registry.
+     * @param manager Client handshake manager.
+     * @param messageListener Message listener.
      */
     public NettyClient(
         SocketAddress address,
-        MessageSerializationRegistry serializationRegistry
+        MessageSerializationRegistry serializationRegistry,
+        HandshakeManager manager,
+        BiConsumer<SocketAddress, NetworkMessage> messageListener
     ) {
         this.address = address;
         this.serializationRegistry = serializationRegistry;
+        this.handshakeManager = manager;
+        this.messageListener = messageListener;
     }
 
     /**
      * Start client.
      *
+     * @param bootstrapTemplate Template client bootstrap.
      * @return Future that resolves when client channel is opened.
      */
-    public CompletableFuture<NettySender> start(Bootstrap bootstrap) {
+    public CompletableFuture<NettySender> start(Bootstrap bootstrapTemplate) {
         synchronized (startStopLock) {
             if (stopped)
                 throw new IgniteInternalException("Attempted to start an already stopped NettyClient");
@@ -94,17 +100,37 @@ public class NettyClient {
             if (clientFuture != null)
                 throw new IgniteInternalException("Attempted to start an already started NettyClient");
 
+            Bootstrap bootstrap = bootstrapTemplate.clone();
+
+            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
+                /** {@inheritDoc} */
+                @Override public void initChannel(SocketChannel ch) {
+                    ch.pipeline().addLast(
+                        new InboundDecoder(serializationRegistry),
+                        new HandshakeHandler(handshakeManager),
+                        new MessageHandler(messageListener),
+                        new ChunkedWriteHandler(),
+                        new OutboundEncoder(serializationRegistry)
+                    );
+                }
+            });
+
             clientFuture = NettyUtils.toChannelCompletableFuture(bootstrap.connect(address))
                 .handle((channel, throwable) -> {
                     synchronized (startStopLock) {
                         this.channel = channel;
 
+                        if (throwable != null)
+                            channelFuture.completeExceptionally(throwable);
+                        else
+                            channelFuture.complete(null);
+
                         if (stopped)
                             return CompletableFuture.<NettySender>failedFuture(new CancellationException("Client was stopped"));
                         else if (throwable != null)
                             return CompletableFuture.<NettySender>failedFuture(throwable);
                         else
-                            return CompletableFuture.completedFuture(new NettySender(channel, serializationRegistry));
+                            return handshakeManager.handshakeFuture();
                     }
                 })
                 .thenCompose(Function.identity());
@@ -137,7 +163,7 @@ public class NettyClient {
             if (clientFuture == null)
                 return CompletableFuture.completedFuture(null);
 
-            return clientFuture
+            return channelFuture
                 .handle((sender, throwable) ->
                     channel == null ?
                         CompletableFuture.<Void>completedFuture(null) :
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
index 649a9e7..e8553fb 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
@@ -17,18 +17,12 @@
 
 package org.apache.ignite.network.internal.netty;
 
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.stream.ChunkedInput;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.internal.direct.DirectMessageWriter;
-import org.apache.ignite.network.serialization.MessageSerializationRegistry;
-import org.apache.ignite.network.serialization.MessageSerializer;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Wrapper for a Netty {@link Channel}, that uses {@link ChunkedInput} and {@link DirectMessageWriter} to send data.
@@ -37,130 +31,68 @@ public class NettySender {
     /** Netty channel. */
     private final Channel channel;
 
-    /** Serialization registry. */
-    private final MessageSerializationRegistry serializationRegistry;
+    /** Launch id of the remote node. */
+    private final String launchId;
+
+    /** Consistent id of the remote node. */
+    private final String consistentId;
 
     /**
      * Constructor.
      *
      * @param channel Netty channel.
-     * @param registry Serialization registry.
+     * @param launchId Launch id of the remote node.
+     * @param consistentId Consistent id of the remote node.
      */
-    public NettySender(Channel channel, MessageSerializationRegistry registry) {
+    public NettySender(Channel channel, String launchId, String consistentId) {
         this.channel = channel;
-        serializationRegistry = registry;
+        this.launchId = launchId;
+        this.consistentId = consistentId;
     }
 
     /**
-     * Send message.
+     * Sends the message.
      *
      * @param msg Network message.
+     * @return Future of the send operation.
      */
     public CompletableFuture<Void> send(NetworkMessage msg) {
-        MessageSerializer<NetworkMessage> serializer = serializationRegistry.createSerializer(msg.directType());
+        return NettyUtils.toCompletableFuture(channel.writeAndFlush(msg));
+    }
 
-        return NettyUtils.toCompletableFuture(
-            channel.writeAndFlush(new NetworkMessageChunkedInput(msg, serializer, serializationRegistry))
-        );
+    /**
+     * @return Launch id of the remote node.
+     */
+    public String launchId() {
+        return launchId;
     }
 
     /**
-     * Close channel.
+     * @return Consistent id of the remote node.
      */
-    public void close() {
-        this.channel.close().awaitUninterruptibly();
+    public String consistentId() {
+        return consistentId;
     }
 
     /**
-     * @return Gets the remote address of the channel.
+     * Closes channel.
      */
-    public SocketAddress remoteAddress() {
-        return this.channel.remoteAddress();
+    public void close() {
+        this.channel.close().awaitUninterruptibly();
     }
 
     /**
-     * @return {@code true} if channel is open, {@code false} otherwise.
+     * @return {@code true} if the channel is open, {@code false} otherwise.
      */
     public boolean isOpen() {
         return this.channel.isOpen();
     }
 
     /**
-     * Chunked input for network message.
+     * @return Channel.
      */
-    private static class NetworkMessageChunkedInput implements ChunkedInput<ByteBuf> {
-        /** Network message. */
-        private final NetworkMessage msg;
-
-        /** Message serializer. */
-        private final MessageSerializer<NetworkMessage> serializer;
-
-        /** Message writer. */
-        private final DirectMessageWriter writer;
-
-        /** Whether the message was fully written. */
-        private boolean finished = false;
-
-        /**
-         * Constructor.
-         *
-         * @param msg Network message.
-         * @param serializer Serializer.
-         */
-        private NetworkMessageChunkedInput(
-            NetworkMessage msg,
-            MessageSerializer<NetworkMessage> serializer,
-            MessageSerializationRegistry registry
-        ) {
-            this.msg = msg;
-            this.serializer = serializer;
-            this.writer = new DirectMessageWriter(registry, ConnectionManager.DIRECT_PROTOCOL_VERSION);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean isEndOfInput() throws Exception {
-            return finished;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws Exception {
-
-        }
-
-        /** {@inheritDoc} */
-        @Deprecated
-        @Override public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
-            return readChunk(ctx.alloc());
-        }
-
-        /** {@inheritDoc} */
-        @Override public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
-            ByteBuf buffer = allocator.ioBuffer();
-            int capacity = buffer.capacity();
-
-            ByteBuffer byteBuffer = buffer.internalNioBuffer(0, capacity);
-
-            int initialPosition = byteBuffer.position();
-
-            writer.setBuffer(byteBuffer);
-
-            finished = serializer.writeMessage(msg, writer);
-
-            buffer.writerIndex(byteBuffer.position() - initialPosition);
-
-            return buffer;
-        }
-
-        /** {@inheritDoc} */
-        @Override public long length() {
-            // Return negative values, because object's size is unknown.
-            return -1;
-        }
-
-        /** {@inheritDoc} */
-        @Override public long progress() {
-            // Not really needed, as there won't be listeners for the write operation's progress.
-            return 0;
-        }
+    @TestOnly
+    public Channel channel() {
+        return channel;
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
index feb99ed..d68a370 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
@@ -33,6 +33,7 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.jetbrains.annotations.Nullable;
@@ -63,6 +64,9 @@ public class NettyServer {
     /** Incoming message listener. */
     private final BiConsumer<SocketAddress, NetworkMessage> messageListener;
 
+    /** Handshake manager. */
+    private final HandshakeManager handshakeManager;
+
     /** Server start future. */
     private CompletableFuture<Void> serverStartFuture;
 
@@ -86,17 +90,19 @@ public class NettyServer {
      * Constructor.
      *
      * @param port Server port.
+     * @param handshakeManager Handshake manager.
      * @param newConnectionListener New connections listener.
      * @param messageListener Message listener.
      * @param serializationRegistry Serialization registry.
      */
     public NettyServer(
         int port,
+        HandshakeManager handshakeManager,
         Consumer<NettySender> newConnectionListener,
         BiConsumer<SocketAddress, NetworkMessage> messageListener,
         MessageSerializationRegistry serializationRegistry
     ) {
-        this(new ServerBootstrap(), port, newConnectionListener, messageListener, serializationRegistry);
+        this(new ServerBootstrap(), port, handshakeManager, newConnectionListener, messageListener, serializationRegistry);
     }
 
     /**
@@ -104,6 +110,7 @@ public class NettyServer {
      *
      * @param bootstrap Server bootstrap.
      * @param port Server port.
+     * @param handshakeManager Handshake manager.
      * @param newConnectionListener New connections listener.
      * @param messageListener Message listener.
      * @param serializationRegistry Serialization registry.
@@ -111,12 +118,14 @@ public class NettyServer {
     public NettyServer(
         ServerBootstrap bootstrap,
         int port,
+        HandshakeManager handshakeManager,
         Consumer<NettySender> newConnectionListener,
         BiConsumer<SocketAddress, NetworkMessage> messageListener,
         MessageSerializationRegistry serializationRegistry
     ) {
         this.bootstrap = bootstrap;
         this.port = port;
+        this.handshakeManager = handshakeManager;
         this.newConnectionListener = newConnectionListener;
         this.messageListener = messageListener;
         this.serializationRegistry = serializationRegistry;
@@ -146,16 +155,23 @@ public class NettyServer {
                              * to read chunked data.
                              */
                             new InboundDecoder(serializationRegistry),
+                            // Handshake handler.
+                            new HandshakeHandler(handshakeManager),
                             // Handles decoded NetworkMessages.
                             new MessageHandler(messageListener),
                             /*
                              * Encoder that uses org.apache.ignite.network.internal.MessageWriter
                              * to write chunked data.
                              */
-                            new ChunkedWriteHandler()
+                            new ChunkedWriteHandler(),
+                            // Converts NetworkMessage to a ChunkedNetworkMessageInput
+                            new OutboundEncoder(serializationRegistry)
                         );
 
-                        newConnectionListener.accept(new NettySender(ch, serializationRegistry));
+                        handshakeManager.handshakeFuture().whenComplete((sender, throwable) -> {
+                            if (sender != null)
+                                newConnectionListener.accept(sender);
+                        });
                     }
                 })
                 /*
@@ -169,7 +185,24 @@ public class NettyServer {
                  * in either direction for 2 hours (NOTE: the actual value is implementation dependent),
                  * TCP automatically sends a keepalive probe to the peer.
                  */
-                .childOption(ChannelOption.SO_KEEPALIVE, true);
+                .childOption(ChannelOption.SO_KEEPALIVE, true)
+                /*
+                 * Specify a linger-on-close timeout. This option disables/enables immediate return from a close()
+                 * of a TCP Socket. Enabling this option with a non-zero Integer timeout means that a close() will
+                 * block pending the transmission and acknowledgement of all data written to the peer, at which point
+                 * the socket is closed gracefully. Upon reaching the linger timeout, the socket is closed forcefully,
+                 * with a TCP RST. Enabling the option with a timeout of zero does a forceful close immediately.
+                 * If the specified timeout value exceeds 65,535 it will be reduced to 65,535.
+                 */
+                .childOption(ChannelOption.SO_LINGER, 0)
+                /*
+                 * Disable Nagle's algorithm for this connection. Written data to the network is not buffered pending
+                 * acknowledgement of previously written data. Valid for TCP only. Setting this option reduces
+                 * network latency and and delivery time for small messages.
+                 * For more information, see Socket#setTcpNoDelay(boolean)
+                 * and https://en.wikipedia.org/wiki/Nagle%27s_algorithm.
+                 */
+                .childOption(ChannelOption.TCP_NODELAY, true);
 
             serverStartFuture = NettyUtils.toChannelCompletableFuture(bootstrap.bind(port))
                 .handle((channel, err) -> {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyUtils.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyUtils.java
index 5c02b1a..3d8e7a0 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyUtils.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyUtils.java
@@ -58,6 +58,7 @@ public class NettyUtils {
     /**
      * Convert a Netty {@link Future} to a {@link CompletableFuture}.
      *
+     * @param <T> Type of the future.
      * @param future Future.
      * @return CompletableFuture.
      */
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/OutboundEncoder.java
similarity index 76%
copy from modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
copy to modules/network/src/main/java/org/apache/ignite/network/internal/netty/OutboundEncoder.java
index 649a9e7..2e249e1 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/netty/OutboundEncoder.java
@@ -17,13 +17,12 @@
 
 package org.apache.ignite.network.internal.netty;
 
-import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
+import java.util.List;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
 import io.netty.handler.stream.ChunkedInput;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.internal.direct.DirectMessageWriter;
@@ -31,58 +30,26 @@ import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.network.serialization.MessageSerializer;
 
 /**
- * Wrapper for a Netty {@link Channel}, that uses {@link ChunkedInput} and {@link DirectMessageWriter} to send data.
+ * An encoder for the outbound messages that uses {@link DirectMessageWriter}.
  */
-public class NettySender {
-    /** Netty channel. */
-    private final Channel channel;
-
+public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
     /** Serialization registry. */
     private final MessageSerializationRegistry serializationRegistry;
 
     /**
      * Constructor.
      *
-     * @param channel Netty channel.
      * @param registry Serialization registry.
      */
-    public NettySender(Channel channel, MessageSerializationRegistry registry) {
-        this.channel = channel;
+    public OutboundEncoder(MessageSerializationRegistry registry) {
         serializationRegistry = registry;
     }
 
-    /**
-     * Send message.
-     *
-     * @param msg Network message.
-     */
-    public CompletableFuture<Void> send(NetworkMessage msg) {
+    /** {@inheritDoc} */
+    @Override protected void encode(ChannelHandlerContext ctx, NetworkMessage msg, List<Object> out) throws Exception {
         MessageSerializer<NetworkMessage> serializer = serializationRegistry.createSerializer(msg.directType());
 
-        return NettyUtils.toCompletableFuture(
-            channel.writeAndFlush(new NetworkMessageChunkedInput(msg, serializer, serializationRegistry))
-        );
-    }
-
-    /**
-     * Close channel.
-     */
-    public void close() {
-        this.channel.close().awaitUninterruptibly();
-    }
-
-    /**
-     * @return Gets the remote address of the channel.
-     */
-    public SocketAddress remoteAddress() {
-        return this.channel.remoteAddress();
-    }
-
-    /**
-     * @return {@code true} if channel is open, {@code false} otherwise.
-     */
-    public boolean isOpen() {
-        return this.channel.isOpen();
+        out.add(new NetworkMessageChunkedInput(msg, serializer, serializationRegistry));
     }
 
     /**
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/RecoveryClientHandshakeManager.java
new file mode 100644
index 0000000..a0bf0e1
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/RecoveryClientHandshakeManager.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.handshake.HandshakeException;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyUtils;
+import org.apache.ignite.network.internal.recovery.message.HandshakeMessageFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+
+/**
+ * Recovery protocol handshake manager for a client.
+ */
+public class RecoveryClientHandshakeManager implements HandshakeManager {
+    /** Launch id. */
+    private final UUID launchId;
+
+    /** Consistent id. */
+    private final String consistentId;
+
+    /** Handshake completion future. */
+    private final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>();
+
+    /**
+     * Constructor.
+     *
+     * @param launchId Launch id.
+     * @param consistentId Consistent id.
+     */
+    public RecoveryClientHandshakeManager(UUID launchId, String consistentId) {
+        this.launchId = launchId;
+        this.consistentId = consistentId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HandshakeAction onMessage(Channel channel, NetworkMessage message) {
+        if (message instanceof HandshakeStartMessage) {
+            HandshakeStartMessage msg = (HandshakeStartMessage) message;
+
+            HandshakeStartResponseMessage response = HandshakeMessageFactory.handshakeStartResponseMessage()
+                .launchId(launchId)
+                .consistentId(consistentId)
+                .receivedCount(0)
+                .connectionsCount(0)
+                .build();
+
+            ChannelFuture sendFuture = channel.writeAndFlush(response);
+
+            NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
+                if (throwable != null)
+                    handshakeCompleteFuture.completeExceptionally(
+                        new HandshakeException("Failed to send handshake response: " + throwable.getMessage(), throwable)
+                    );
+                else
+                    handshakeCompleteFuture.complete(new NettySender(channel, msg.launchId().toString(), msg.consistentId()));
+            });
+
+            return HandshakeAction.REMOVE_HANDLER;
+        }
+
+        handshakeCompleteFuture.completeExceptionally(
+            new HandshakeException("Unexpected message during handshake: " + message.toString())
+        );
+
+        return HandshakeAction.FAIL;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<NettySender> handshakeFuture() {
+        return handshakeCompleteFuture;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HandshakeAction init(Channel channel) {
+        return HandshakeAction.NOOP;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HandshakeAction onConnectionOpen(Channel channel) {
+        return HandshakeAction.NOOP;
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/RecoveryServerHandshakeManager.java
new file mode 100644
index 0000000..61f1a6e
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/RecoveryServerHandshakeManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.handshake.HandshakeException;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyUtils;
+import org.apache.ignite.network.internal.recovery.message.HandshakeMessageFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+
+/**
+ * Recovery protocol handshake manager for a server.
+ */
+public class RecoveryServerHandshakeManager implements HandshakeManager {
+    /** Launch id. */
+    private final UUID launchId;
+
+    /** Consistent id. */
+    private final String consistentId;
+
+    /** Handshake completion future. */
+    private final CompletableFuture<NettySender> handshakeCompleteFuture = new CompletableFuture<>();
+
+    /**
+     * Constructor.
+     *
+     * @param launchId Launch id.
+     * @param consistentId Consistent id.
+     */
+    public RecoveryServerHandshakeManager(UUID launchId, String consistentId) {
+        this.launchId = launchId;
+        this.consistentId = consistentId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HandshakeAction init(Channel channel) {
+        return HandshakeAction.NOOP;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HandshakeAction onConnectionOpen(Channel channel) {
+        HandshakeStartMessage handshakeStartMessage = HandshakeMessageFactory.handshakeStartMessage()
+            .launchId(launchId)
+            .consistentId(consistentId)
+            .build();
+
+        ChannelFuture sendFuture = channel.writeAndFlush(handshakeStartMessage);
+
+        NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
+            if (throwable != null)
+                handshakeCompleteFuture.completeExceptionally(
+                    new HandshakeException("Failed to send handshake start message: " + throwable.getMessage(), throwable)
+                );
+        });
+
+        return HandshakeAction.NOOP;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HandshakeAction onMessage(Channel channel, NetworkMessage message) {
+        if (message instanceof HandshakeStartResponseMessage) {
+            HandshakeStartResponseMessage msg = (HandshakeStartResponseMessage) message;
+
+            handshakeCompleteFuture.complete(new NettySender(channel, msg.launchId().toString(), msg.consistentId()));
+
+            return HandshakeAction.REMOVE_HANDLER;
+        }
+
+        handshakeCompleteFuture.completeExceptionally(
+            new HandshakeException("Unexpected message during handshake: " + message.toString())
+        );
+
+        return HandshakeAction.FAIL;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CompletableFuture<NettySender> handshakeFuture() {
+        return handshakeCompleteFuture;
+    }
+}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeMessageFactory.java
similarity index 67%
copy from modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
copy to modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeMessageFactory.java
index b28c42d..bde8e55 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/ClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeMessageFactory.java
@@ -14,12 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ignite.network;
 
-/** */
-public interface ClusterServiceFactory {
-    /**
-     * Creates a new {@link ClusterService} using the provided context. The created network will not be in the "started" state.
-     */
-    ClusterService createClusterService(ClusterLocalConfiguration context);
+package org.apache.ignite.network.internal.recovery.message;
+
+public class HandshakeMessageFactory {
+
+    public static HandshakeStartMessage.Builder handshakeStartMessage() {
+        return new HandshakeStartMessageImpl();
+    }
+
+    public static HandshakeStartResponseMessage.Builder handshakeStartResponseMessage() {
+        return new HandshakeStartResponseMessageImpl();
+    }
 }
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartMessage.java
similarity index 59%
copy from modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
copy to modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartMessage.java
index dc6de9b..ee4bb2f 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartMessage.java
@@ -15,30 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.network;
+package org.apache.ignite.network.internal.recovery.message;
 
-import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.processor.annotations.AutoSerializable;
 
-@AutoSerializable(messageFactory = TestMessageFactory.class)
-public interface TestMessage extends NetworkMessage {
-    /** Visible type for tests. */
-    public static final short TYPE = 3;
-
-    String msg();
+/**
+ * Handshake start message.
+ */
+@AutoSerializable(messageFactory = HandshakeMessageFactory.class)
+public interface HandshakeStartMessage extends NetworkMessage {
+    /** */
+    public static final byte TYPE = 2;
 
-    Map<Integer, String> map();
+    /** Launch id. */
+    UUID launchId();
 
-    /** {@inheritDoc} */
-    @Override public default short directType() {
-        return TYPE;
-    }
+    /** Consistent id. */
+    String consistentId();
 
     interface Builder {
-        Builder msg(String msg);
+        HandshakeStartMessage.Builder launchId(UUID launchId);
 
-        Builder map(Map<Integer, String> map);
+        HandshakeStartMessage.Builder consistentId(String consistentId);
 
-        TestMessage build();
+        HandshakeStartMessage build();
+    }
+
+    /** {@inheritDoc} */
+    @Override default short directType() {
+        return TYPE;
     }
 }
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartMessageImpl.java
similarity index 50%
copy from modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
copy to modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartMessageImpl.java
index dc6de9b..fdde622 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartMessageImpl.java
@@ -15,30 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.network;
+package org.apache.ignite.network.internal.recovery.message;
 
-import java.util.Map;
-import org.apache.ignite.network.processor.annotations.AutoSerializable;
+import java.util.UUID;
 
-@AutoSerializable(messageFactory = TestMessageFactory.class)
-public interface TestMessage extends NetworkMessage {
-    /** Visible type for tests. */
-    public static final short TYPE = 3;
+public class HandshakeStartMessageImpl implements HandshakeStartMessage, HandshakeStartMessage.Builder {
+    /** */
+    private UUID launchId;
 
-    String msg();
+    /** */
+    private String consistentId;
 
-    Map<Integer, String> map();
+    /** {@inheritDoc} */
+    @Override public UUID launchId() {
+        return launchId;
+    }
 
     /** {@inheritDoc} */
-    @Override public default short directType() {
-        return TYPE;
+    @Override public String consistentId() {
+        return consistentId;
     }
 
-    interface Builder {
-        Builder msg(String msg);
+    /** {@inheritDoc} */
+    @Override public Builder launchId(UUID launchId) {
+        this.launchId = launchId;
+        return this;
+    }
 
-        Builder map(Map<Integer, String> map);
+    /** {@inheritDoc} */
+    @Override public Builder consistentId(String consistentId) {
+        this.consistentId = consistentId;
+        return this;
+    }
 
-        TestMessage build();
+    /** {@inheritDoc} */
+    @Override public HandshakeStartMessage build() {
+        return this;
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessage.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessage.java
new file mode 100644
index 0000000..b7b69a2
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery.message;
+
+import java.util.UUID;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.processor.annotations.AutoSerializable;
+
+/**
+ * Handshake start response message.
+ */
+@AutoSerializable(messageFactory = HandshakeMessageFactory.class)
+public interface HandshakeStartResponseMessage extends NetworkMessage {
+    /** */
+    public static final byte TYPE = 3;
+
+    /** Launch id. */
+    UUID launchId();
+
+    /** Consistent id. */
+    String consistentId();
+
+    /** Number of received messages. */
+    long receivedCount();
+
+    /** Connections count. */
+    long connectionsCount();
+
+    interface Builder {
+        HandshakeStartResponseMessage.Builder launchId(UUID launchId);
+
+        HandshakeStartResponseMessage.Builder consistentId(String consistentId);
+
+        HandshakeStartResponseMessage.Builder receivedCount(long receivedCount);
+
+        HandshakeStartResponseMessage.Builder connectionsCount(long connectionsCount);
+
+        HandshakeStartResponseMessage build();
+    }
+
+    /** {@inheritDoc} */
+    @Override default short directType() {
+        return TYPE;
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessageImpl.java b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessageImpl.java
new file mode 100644
index 0000000..c9bdf5f
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/internal/recovery/message/HandshakeStartResponseMessageImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.recovery.message;
+
+import java.util.UUID;
+
+public class HandshakeStartResponseMessageImpl implements HandshakeStartResponseMessage, HandshakeStartResponseMessage.Builder {
+    /** */
+    private UUID launchId;
+
+    /** */
+    private String consistentId;
+
+    /** */
+    private long rcvCnt;
+
+    /** */
+    private long connCnt;
+
+    /** {@inheritDoc} */
+    @Override public UUID launchId() {
+        return launchId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String consistentId() {
+        return consistentId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long receivedCount() {
+        return rcvCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long connectionsCount() {
+        return connCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Builder launchId(UUID launchId) {
+        this.launchId = launchId;
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Builder consistentId(String consistentId) {
+        this.consistentId = consistentId;
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Builder receivedCount(long rcvCnt) {
+        this.rcvCnt = rcvCnt;
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Builder connectionsCount(long connectCnt) {
+        this.connCnt = connectCnt;
+        return this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HandshakeStartResponseMessage build() {
+        return this;
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 25de258..396a3d0 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -19,6 +19,7 @@ package org.apache.ignite.network.scalecube;
 
 import java.lang.management.ManagementFactory;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import javax.management.InstanceNotFoundException;
 import javax.management.MBeanRegistrationException;
@@ -37,6 +38,8 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.ClusterServiceFactory;
 import org.apache.ignite.network.NetworkConfigurationException;
 import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.network.internal.recovery.RecoveryServerHandshakeManager;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 
 /**
@@ -46,15 +49,25 @@ import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
     /** {@inheritDoc} */
     @Override public ClusterService createClusterService(ClusterLocalConfiguration context) {
+        String consistentId = context.getName();
+
         var topologyService = new ScaleCubeTopologyService();
 
         var messagingService = new ScaleCubeMessagingService(topologyService);
 
         MessageSerializationRegistry registry = context.getSerializationRegistry();
 
-        var connectionManager = new ConnectionManager(context.getPort(), registry);
+        UUID launchId = UUID.randomUUID();
+
+        var connectionManager = new ConnectionManager(
+            context.getPort(),
+            registry,
+            consistentId,
+            () -> new RecoveryServerHandshakeManager(launchId, consistentId),
+            () -> new RecoveryClientHandshakeManager(launchId, consistentId)
+        );
 
-        ScaleCubeDirectMarshallerTransport transport = new ScaleCubeDirectMarshallerTransport(connectionManager);
+        ScaleCubeDirectMarshallerTransport transport = new ScaleCubeDirectMarshallerTransport(connectionManager, topologyService);
 
         var cluster = new ClusterImpl(defaultConfig())
             .handler(cl -> new ClusterMessageHandler() {
@@ -68,7 +81,7 @@ public class ScaleCubeClusterServiceFactory implements ClusterServiceFactory {
                     topologyService.onMembershipEvent(event);
                 }
             })
-            .config(opts -> opts.memberAlias(context.getName()))
+            .config(opts -> opts.memberAlias(consistentId))
             .transport(opts -> opts.transportFactory(new DelegatingTransportFactory(messagingService, config -> transport)))
             .membership(opts -> opts.seedMembers(parseAddresses(context.getMemberAddresses())));
 
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index ffa075a..235ef04 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -33,6 +33,7 @@ import io.scalecube.net.Address;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.internal.netty.ConnectionManager;
 import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
 import org.apache.ignite.network.scalecube.message.ScaleCubeMessageFactory;
@@ -67,6 +68,9 @@ public class ScaleCubeDirectMarshallerTransport implements Transport {
     /** Connection manager. */
     private final ConnectionManager connectionManager;
 
+    /** */
+    private final ScaleCubeTopologyService topologyService;
+
     /** Node address. */
     private Address address;
 
@@ -74,9 +78,12 @@ public class ScaleCubeDirectMarshallerTransport implements Transport {
      * Constructor.
      *
      * @param connectionManager Connection manager.
+     * @param topologyService Topology service.
      */
-    public ScaleCubeDirectMarshallerTransport(ConnectionManager connectionManager) {
+    public ScaleCubeDirectMarshallerTransport(ConnectionManager connectionManager, ScaleCubeTopologyService topologyService) {
         this.connectionManager = connectionManager;
+        this.topologyService = topologyService;
+
         this.connectionManager.addListener(this::onMessage);
         // Setup cleanup
         stop.then(doStop())
@@ -150,7 +157,11 @@ public class ScaleCubeDirectMarshallerTransport implements Transport {
         var addr = InetSocketAddress.createUnresolved(address.host(), address.port());
 
         return Mono.fromFuture(() -> {
-            return connectionManager.channel(addr).thenCompose(client -> client.send(fromMessage(message)));
+            ClusterNode node = topologyService.getByAddress(address.toString());
+
+            String consistentId = node != null ? node.name() : null;
+
+            return connectionManager.channel(consistentId, addr).thenCompose(client -> client.send(fromMessage(message)));
         });
     }
 
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
index 0dab0e1..c7db910 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessagingService.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.network.scalecube;
 
-import java.time.Duration;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import io.scalecube.cluster.Cluster;
 import io.scalecube.cluster.transport.api.Message;
 import io.scalecube.net.Address;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.network.AbstractMessagingService;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.MessagingService;
@@ -43,13 +43,19 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
      */
     private ScaleCubeTopologyService topologyService;
 
-    /** */
+    /**
+     * Constructor.
+     *
+     * @param topologyService Topology service.
+     */
     ScaleCubeMessagingService(ScaleCubeTopologyService topologyService) {
         this.topologyService = topologyService;
     }
 
     /**
      * Sets the ScaleCube's {@link Cluster}. Needed for cyclic dependency injection.
+     *
+     * @param cluster Cluster.
      */
     void setCluster(Cluster cluster) {
         this.cluster = cluster;
@@ -57,6 +63,8 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
 
     /**
      * Delegates the received message to the registered message handlers.
+     *
+     * @param message Received message.
      */
     void fireEvent(Message message) {
         NetworkMessage msg = message.data();
@@ -110,6 +118,9 @@ final class ScaleCubeMessagingService extends AbstractMessagingService {
 
     /**
      * Extracts the given node's {@link Address}.
+     *
+     * @param node Node.
+     * @return Node's address.
      */
     private static Address clusterNodeAddress(ClusterNode node) {
         return Address.create(node.host(), node.port());
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
index 4f41fd0..4886b61 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeTopologyService.java
@@ -39,6 +39,8 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
 
     /**
      * Sets the ScaleCube's local {@link Member}.
+     *
+     * @param member Local member.
      */
     void setLocalMember(Member member) {
         localMember = fromMember(member);
@@ -49,6 +51,8 @@ final class ScaleCubeTopologyService extends AbstractTopologyService {
 
     /**
      * Delegates the received topology event to the registered event handlers.
+     *
+     * @param event Membership event.
      */
     void onMembershipEvent(MembershipEvent event) {
         ClusterNode member = fromMember(event.member());
diff --git a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/InboundDecoderTest.java b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/InboundDecoderTest.java
index 664771d..135857f 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/InboundDecoderTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/InboundDecoderTest.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.TestMessage;
 import org.apache.ignite.network.TestMessageFactory;
 import org.apache.ignite.network.TestMessageSerializationFactory;
@@ -35,7 +36,6 @@ import org.apache.ignite.network.internal.AllTypesMessage;
 import org.apache.ignite.network.internal.AllTypesMessageGenerator;
 import org.apache.ignite.network.internal.AllTypesMessageSerializationFactory;
 import org.apache.ignite.network.internal.direct.DirectMessageWriter;
-import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.network.serialization.MessageSerializer;
 import org.junit.jupiter.api.Test;
@@ -194,7 +194,7 @@ public class InboundDecoderTest {
     }
 
     /**
-     * Source of parameters for the {@link #test(long)} method.
+     * Source of parameters for the {@link #testAllTypes(long)} method.
      * Creates seeds for a {@link AllTypesMessage} generation.
      * @return Random seeds.
      */
diff --git a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyClientTest.java
index 40436f6..e6adffa 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyClientTest.java
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.network.internal.netty;
 
-import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.embedded.EmbeddedChannel;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
@@ -27,7 +24,14 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
@@ -156,11 +160,16 @@ public class NettyClientTest {
     public void testStartTwice() throws Exception {
         var channel = new EmbeddedChannel();
 
-        Bootstrap bootstrap = Mockito.mock(Bootstrap.class);
+        Bootstrap bootstrap = mockBootstrap();
 
         Mockito.doReturn(channel.newSucceededFuture()).when(bootstrap).connect(Mockito.any());
 
-        client = new NettyClient(address, null);
+        client = new NettyClient(
+            address,
+            null,
+            new MockClientHandshakeManager(channel),
+            (address1, message) -> {}
+        );
 
         client.start(bootstrap);
 
@@ -176,9 +185,14 @@ public class NettyClientTest {
      * @return Client and a NettySender future.
      */
     private ClientAndSender createClientAndSenderFromChannelFuture(ChannelFuture future) {
-        var client = new NettyClient(address, null);
+        var client = new NettyClient(
+            address,
+            null,
+            new MockClientHandshakeManager(future.channel()),
+            (address1, message) -> {}
+        );
 
-        Bootstrap bootstrap = Mockito.mock(Bootstrap.class);
+        Bootstrap bootstrap = mockBootstrap();
 
         Mockito.doReturn(future).when(bootstrap).connect(Mockito.any());
 
@@ -186,6 +200,19 @@ public class NettyClientTest {
     }
 
     /**
+     * Create mock of a {@link Bootstrap} that implements {@link Bootstrap#clone()}.
+     *
+     * @return Mocked bootstrap.
+     */
+    private Bootstrap mockBootstrap() {
+        Bootstrap bootstrap = Mockito.mock(Bootstrap.class);
+
+        Mockito.doReturn(bootstrap).when(bootstrap).clone();
+
+        return bootstrap;
+    }
+
+    /**
      * Tuple for a NettyClient and a future of a NettySender.
      */
     private static class ClientAndSender {
@@ -206,4 +233,37 @@ public class NettyClientTest {
             this.sender = sender;
         }
     }
+
+    /**
+     * Client handshake manager that doesn't do any actual handshaking.
+     */
+    private static class MockClientHandshakeManager implements HandshakeManager {
+        /** Sender. */
+        private final NettySender sender;
+
+        /** Constructor. */
+        private MockClientHandshakeManager(Channel channel) {
+            this.sender = new NettySender(channel, "", "");
+        }
+
+        /** {@inheritDoc} */
+        @Override public HandshakeAction onMessage(Channel channel, NetworkMessage message) {
+            return HandshakeAction.REMOVE_HANDLER;
+        }
+
+        /** {@inheritDoc} */
+        @Override public CompletableFuture<NettySender> handshakeFuture() {
+            return CompletableFuture.completedFuture(sender);
+        }
+
+        /** {@inheritDoc} */
+        @Override public HandshakeAction init(Channel channel) {
+            return HandshakeAction.NOOP;
+        }
+
+        /** {@inheritDoc} */
+        @Override public HandshakeAction onConnectionOpen(Channel channel) {
+            return HandshakeAction.NOOP;
+        }
+    }
 }
diff --git a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
index 1a3af63..fc87d35 100644
--- a/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
@@ -17,18 +17,34 @@
 
 package org.apache.ignite.network.internal.netty;
 
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.ServerChannel;
 import io.netty.channel.embedded.EmbeddedChannel;
-import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.internal.handshake.HandshakeAction;
+import org.apache.ignite.network.internal.handshake.HandshakeManager;
+import org.apache.ignite.network.serialization.MessageDeserializer;
+import org.apache.ignite.network.serialization.MessageMappingException;
+import org.apache.ignite.network.serialization.MessageReader;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
 import org.mockito.Mockito;
+import org.mockito.verification.VerificationMode;
 
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -148,9 +164,101 @@ public class NettyServerTest {
     }
 
     /**
+     * Tests that handshake manager is invoked upon a client connecting to a server.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testHandshakeManagerInvoked() throws Exception {
+        HandshakeManager handshakeManager = Mockito.mock(HandshakeManager.class);
+
+        Mockito.doReturn(CompletableFuture.completedFuture(Mockito.mock(NettySender.class)))
+            .when(handshakeManager).handshakeFuture();
+
+        Mockito.doReturn(HandshakeAction.NOOP)
+            .when(handshakeManager).init(Mockito.any());
+
+        Mockito.doReturn(HandshakeAction.NOOP)
+            .when(handshakeManager).onConnectionOpen(Mockito.any());
+
+        Mockito.doReturn(HandshakeAction.NOOP)
+            .when(handshakeManager).onMessage(Mockito.any(), Mockito.any());
+
+        MessageSerializationRegistry registry = new MessageSerializationRegistry() {
+            /** {@inheritDoc} */
+            @Override public <T extends NetworkMessage> MessageDeserializer<T> createDeserializer(short type) {
+                return (MessageDeserializer<T>) new MessageDeserializer<>() {
+                    /** {@inheritDoc} */
+                    @Override public boolean readMessage(MessageReader reader) throws MessageMappingException {
+                        return true;
+                    }
+
+                    /** {@inheritDoc} */
+                    @Override public Class<NetworkMessage> klass() {
+                        return NetworkMessage.class;
+                    }
+
+                    /** {@inheritDoc} */
+                    @Override public NetworkMessage getMessage() {
+                        return new NetworkMessage() {
+                            /** {@inheritDoc} */
+                            @Override public short directType() {
+                                return 0;
+                            }
+                        };
+                    }
+                };
+            }
+        };
+
+        server = new NettyServer(4000, handshakeManager, sender -> {}, (socketAddress, message) -> {}, registry);
+
+        server.start().get(3, TimeUnit.SECONDS);
+
+        CompletableFuture<Channel> connectFut = NettyUtils.toChannelCompletableFuture(
+            new Bootstrap()
+                .channel(NioSocketChannel.class)
+                .group(new NioEventLoopGroup())
+                .handler(new ChannelInitializer<>() {
+                    /** {@inheritDoc} */
+                    @Override protected void initChannel(Channel ch) throws Exception {
+                        // No-op.
+                    }
+                })
+                .connect(server.address())
+        );
+
+        Channel channel = connectFut.get(3, TimeUnit.SECONDS);
+
+        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+
+        // One message only.
+        for (int i = 0; i < (NetworkMessage.DIRECT_TYPE_SIZE + 1); i++)
+            buffer.writeByte(1);
+
+        channel.writeAndFlush(buffer).get(3, TimeUnit.SECONDS);
+
+        channel.close().get(3, TimeUnit.SECONDS);
+
+        InOrder order = Mockito.inOrder(handshakeManager);
+
+        order.verify(handshakeManager, timeout()).init(Mockito.any());
+        order.verify(handshakeManager, timeout()).handshakeFuture();
+        order.verify(handshakeManager, timeout()).onConnectionOpen(Mockito.any());
+        order.verify(handshakeManager, timeout()).onMessage(Mockito.any(), Mockito.any());
+    }
+
+    /**
+     * @return Verification mode for a one call with a 3 second timeout.
+     */
+    private static VerificationMode timeout() {
+        return Mockito.timeout(TimeUnit.SECONDS.toMillis(3));
+    }
+
+    /**
      * Creates a server from a backing {@link ChannelFuture}.
      *
-     * @param channel Server channel.
+     * @param future Server channel future.
      * @param shouldStart {@code true} if a server should start successfully
      * @return NettyServer.
      * @throws Exception If failed.
@@ -160,7 +268,7 @@ public class NettyServerTest {
 
         Mockito.doReturn(future).when(bootstrap).bind(Mockito.anyInt());
 
-        var server = new NettyServer(bootstrap, 0, null, null, null);
+        var server = new NettyServer(bootstrap, 0, Mockito.mock(HandshakeManager.class), null, null, null);
 
         try {
             server.start().get(3, TimeUnit.SECONDS);
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
index 6d04917..ae3074e 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
@@ -23,6 +23,10 @@ import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
 import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
 import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
 import org.apache.ignite.network.scalecube.message.ScaleCubeMessageSerializationFactory;
@@ -55,7 +59,9 @@ class ITRaftCounterServerTest {
 
     /** */
     private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry()
-        .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory());
+        .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory())
+        .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+        .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory());
 
     /** */
     private RaftServer server;
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index be41858..8e65067 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -46,8 +46,8 @@ import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryInitializer;
 import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
-import org.apache.ignite.network.scalecube.message.MessageSerializationRegistryInitializer;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.utils.IgniteProperties;
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index c8864ec..a626ded 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -44,6 +44,10 @@ import org.apache.ignite.network.ClusterLocalConfiguration;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
+import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
 import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
 import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
 import org.apache.ignite.network.scalecube.message.ScaleCubeMessageSerializationFactory;
@@ -91,7 +95,9 @@ public class ITDistributedTableTest {
 
     /** */
     private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry()
-        .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory());
+        .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory())
+        .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
+        .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory());
 
     /** Client. */
     private ClusterService client;