You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/03/03 15:46:23 UTC

[ignite-3] 01/07: .

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 4869c137990aa0af767b8c9d87806d266494a986
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Mar 2 18:09:25 2022 +0300

    .
---
 .../messages/MessageBuilderGenerator.java          |  73 +++---
 .../processor/messages/MessageImplGenerator.java   | 259 ++++++++++++++++++++-
 .../serialization/BaseMethodNameResolver.java      |   6 +-
 .../MessageDeserializerGenerator.java              |   9 +-
 .../serialization/MessageReaderMethodResolver.java |   3 +-
 .../serialization/MessageSerializerGenerator.java  |   4 +-
 .../serialization/MessageWriterMethodResolver.java |   7 +-
 .../org/apache/ignite/network/NetworkMessage.java  |  11 +
 .../network/netty/ItConnectionManagerTest.java     |  20 +-
 .../network/recovery/ItRecoveryHandshakeTest.java  |   6 +-
 .../scalecube/ItScaleCubeNetworkMessagingTest.java |  10 +-
 .../internal/network/NetworkMessageTypes.java      |   6 +
 .../message/ClassDescriptorListMessage.java        |  30 +++
 .../internal/network/netty/ConnectionManager.java  |  12 +-
 .../{MessageHandler.java => InNetworkObject.java}  |  41 ++--
 .../internal/network/netty/InboundDecoder.java     |  13 +-
 .../internal/network/netty/MessageHandler.java     |  16 +-
 .../ignite/internal/network/netty/NettyClient.java |  11 +-
 .../ignite/internal/network/netty/NettySender.java |   6 +-
 .../ignite/internal/network/netty/NettyServer.java |  10 +-
 .../internal/network/netty/OutboundEncoder.java    |  41 +++-
 .../recovery/RecoveryClientHandshakeManager.java   |   4 +-
 .../recovery/RecoveryServerHandshakeManager.java   |   4 +-
 .../PerSessionSerializationService.java            |  16 +-
 .../ignite/network/DefaultMessagingService.java    | 165 ++++++++++++-
 .../ignite/network/NettyBootstrapFactory.java      |  16 ++
 .../org/apache/ignite/network/NetworkObject.java   |  40 ++++
 .../scalecube/ScaleCubeClusterServiceFactory.java  |   6 +-
 .../ScaleCubeDirectMarshallerTransport.java        |  26 ++-
 .../internal/network/netty/NettyClientTest.java    |   4 +-
 .../internal/network/netty/NettyServerTest.java    |   2 +-
 modules/raft/pom.xml                               |   1 -
 modules/transactions/pom.xml                       |   5 +
 33 files changed, 747 insertions(+), 136 deletions(-)

diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java
index 650296e..7f23723 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java
@@ -17,17 +17,20 @@
 
 package org.apache.ignite.internal.network.processor.messages;
 
+import com.squareup.javapoet.ArrayTypeName;
 import com.squareup.javapoet.ClassName;
 import com.squareup.javapoet.MethodSpec;
 import com.squareup.javapoet.TypeName;
 import com.squareup.javapoet.TypeSpec;
-import java.util.List;
-import java.util.stream.Collectors;
+import java.util.ArrayList;
 import javax.annotation.processing.ProcessingEnvironment;
 import javax.lang.model.element.Modifier;
+import javax.lang.model.type.TypeMirror;
 import javax.tools.Diagnostic;
 import org.apache.ignite.internal.network.processor.MessageClass;
 import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
+import org.apache.ignite.internal.network.processor.TypeUtils;
+import org.apache.ignite.network.annotations.Marshallable;
 
 /**
  * Class for generating Builder interfaces for Network Messages.
@@ -39,6 +42,8 @@ public class MessageBuilderGenerator {
     /** Message group. */
     private final MessageGroupWrapper messageGroup;
 
+    private final TypeUtils typeUtils;
+
     /**
      * Constructor.
      *
@@ -48,6 +53,7 @@ public class MessageBuilderGenerator {
     public MessageBuilderGenerator(ProcessingEnvironment processingEnvironment, MessageGroupWrapper messageGroup) {
         this.processingEnvironment = processingEnvironment;
         this.messageGroup = messageGroup;
+        this.typeUtils = new TypeUtils(processingEnvironment);
     }
 
     /**
@@ -62,30 +68,45 @@ public class MessageBuilderGenerator {
         processingEnvironment.getMessager()
                 .printMessage(Diagnostic.Kind.NOTE, "Generating " + builderName, message.element());
 
-        // generate a setter for each getter in the original interface
-        List<MethodSpec> setters = message.getters().stream()
-                .map(getter -> {
-                    String getterName = getter.getSimpleName().toString();
-
-                    return MethodSpec.methodBuilder(getterName)
-                            .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
-                            .addParameter(TypeName.get(getter.getReturnType()), getterName)
-                            .returns(builderName)
-                            .build();
-                })
-                .collect(Collectors.toList());
-
-        // generate a getter for each getter in the original interface
-        List<MethodSpec> getters = message.getters().stream()
-                .map(getter -> {
-                    String getterName = getter.getSimpleName().toString();
-
-                    return MethodSpec.methodBuilder(getterName)
-                            .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
-                            .returns(TypeName.get(getter.getReturnType()))
-                            .build();
-                })
-                .collect(Collectors.toList());
+        var setters = new ArrayList<MethodSpec>(message.getters().size());
+        var getters = new ArrayList<MethodSpec>(message.getters().size());
+        message.getters().forEach(getter -> {
+            String getterName = getter.getSimpleName().toString();
+
+            TypeMirror type = getter.getReturnType();
+
+            // generate a setter for each getter in the original interface
+            MethodSpec setterSpec = MethodSpec.methodBuilder(getterName)
+                    .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+                    .addParameter(TypeName.get(type), getterName)
+                    .returns(builderName)
+                    .build();
+
+            // generate a getter for each getter in the original interface
+            MethodSpec getterSpec = MethodSpec.methodBuilder(getterName)
+                    .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+                    .returns(TypeName.get(type))
+                    .build();
+
+            setters.add(setterSpec);
+            getters.add(getterSpec);
+
+            if (getter.getAnnotation(Marshallable.class) != null) {
+                MethodSpec baSetter = MethodSpec.methodBuilder(getterName + "ByteArray")
+                        .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+                        .addParameter(ArrayTypeName.of(TypeName.BYTE), getterName + "ByteArray")
+                        .returns(builderName)
+                        .build();
+
+                MethodSpec baGetter = MethodSpec.methodBuilder(getterName + "ByteArray")
+                        .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+                        .returns(ArrayTypeName.of(TypeName.BYTE))
+                        .build();
+
+                setters.add(baSetter);
+                getters.add(baGetter);
+            }
+        });
 
         MethodSpec buildMethod = MethodSpec.methodBuilder("build")
                 .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index 9694785..835894b 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -17,26 +17,38 @@
 
 package org.apache.ignite.internal.network.processor.messages;
 
+import com.squareup.javapoet.ArrayTypeName;
 import com.squareup.javapoet.ClassName;
 import com.squareup.javapoet.CodeBlock;
 import com.squareup.javapoet.FieldSpec;
 import com.squareup.javapoet.MethodSpec;
+import com.squareup.javapoet.MethodSpec.Builder;
+import com.squareup.javapoet.ParameterizedTypeName;
 import com.squareup.javapoet.TypeName;
 import com.squareup.javapoet.TypeSpec;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.processing.ProcessingEnvironment;
 import javax.lang.model.element.ExecutableElement;
 import javax.lang.model.element.Modifier;
+import javax.lang.model.type.ArrayType;
+import javax.lang.model.type.DeclaredType;
 import javax.lang.model.type.TypeKind;
+import javax.lang.model.type.TypeMirror;
 import javax.tools.Diagnostic;
 import org.apache.ignite.internal.network.processor.MessageClass;
 import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
+import org.apache.ignite.internal.network.processor.TypeUtils;
+import org.apache.ignite.internal.network.processor.serialization.BaseMethodNameResolver;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
 
 /**
  * Class for generating implementations of the {@link NetworkMessage} interfaces and their builders, generated by a {@link
@@ -49,6 +61,10 @@ public class MessageImplGenerator {
     /** Message group. */
     private final MessageGroupWrapper messageGroup;
 
+    private final TypeUtils typeUtils;
+
+    private final BaseMethodNameResolver methodNameResolver;
+
     /**
      * Constructor.
      *
@@ -58,6 +74,8 @@ public class MessageImplGenerator {
     public MessageImplGenerator(ProcessingEnvironment processingEnv, MessageGroupWrapper messageGroup) {
         this.processingEnv = processingEnv;
         this.messageGroup = messageGroup;
+        this.typeUtils = new TypeUtils(processingEnv);
+        this.methodNameResolver = new BaseMethodNameResolver(processingEnv);
     }
 
     /**
@@ -76,6 +94,7 @@ public class MessageImplGenerator {
         List<ExecutableElement> getters = message.getters();
 
         var fields = new ArrayList<FieldSpec>(getters.size());
+        var allFields = new ArrayList<FieldSpec>(getters.size());
         var getterImpls = new ArrayList<MethodSpec>(getters.size());
 
         // create a field and a getter implementation for every getter in the message interface
@@ -85,24 +104,42 @@ public class MessageImplGenerator {
             String getterName = getter.getSimpleName().toString();
 
             FieldSpec field = FieldSpec.builder(getterReturnType, getterName)
-                    .addModifiers(Modifier.PRIVATE, Modifier.FINAL)
+                    .addModifiers(Modifier.PRIVATE)
                     .build();
 
             fields.add(field);
+            allFields.add(field);
 
             MethodSpec getterImpl = MethodSpec.overriding(getter)
                     .addStatement("return $N", field)
                     .build();
 
             getterImpls.add(getterImpl);
+
+            if (getter.getAnnotation(Marshallable.class) != null) {
+                ArrayTypeName arrayTypeName = ArrayTypeName.of(TypeName.BYTE);
+                String name = getterName + "ByteArray";
+                FieldSpec marshallableFieldArray = FieldSpec.builder(arrayTypeName, name)
+                        .addModifiers(Modifier.PRIVATE)
+                        .build();
+
+                allFields.add(marshallableFieldArray);
+
+                MethodSpec baGetterImpl = MethodSpec.methodBuilder(name)
+                        .returns(arrayTypeName)
+                        .addStatement("return $N", marshallableFieldArray)
+                        .build();
+
+                getterImpls.add(baGetterImpl);
+            }
         }
 
         TypeSpec.Builder messageImpl = TypeSpec.classBuilder(messageImplClassName)
                 .addModifiers(Modifier.PUBLIC)
                 .addSuperinterface(message.className())
-                .addFields(fields)
+                .addFields(allFields)
                 .addMethods(getterImpls)
-                .addMethod(constructor(fields));
+                .addMethod(constructor(allFields));
 
         // group type constant and getter
         FieldSpec groupTypeField = FieldSpec.builder(short.class, "GROUP_TYPE")
@@ -156,6 +193,9 @@ public class MessageImplGenerator {
 
         messageImpl.addMethod(builderMethod);
 
+        generateBeforeSend(messageImpl, message);
+        generateAfterReceive(messageImpl, message);
+
         messageImpl
                 .addOriginatingElement(message.element())
                 .addOriginatingElement(messageGroup.element());
@@ -163,6 +203,183 @@ public class MessageImplGenerator {
         return messageImpl.build();
     }
 
+    private void generateBeforeSend(TypeSpec.Builder messageImplBuild, MessageClass message) {
+        ParameterizedTypeName returnType = ParameterizedTypeName.get(ClassName.get(Set.class), TypeName.INT.box());
+        ParameterizedTypeName returnTypeImpl = ParameterizedTypeName.get(ClassName.get(HashSet.class), TypeName.INT.box());
+
+        Builder beforeRead = MethodSpec.methodBuilder("beforeRead")
+                .addAnnotation(Override.class)
+                .addModifiers(Modifier.PUBLIC)
+                .addException(Exception.class)
+                .addParameter(returnType, "set")
+                .addParameter(Object.class, "marshallerObj")
+                .returns(returnType);
+
+        ClassName marshallerClass = ClassName.get("org.apache.ignite.internal.network.serialization.marshal", "UserObjectMarshaller");
+        ClassName marshalledObjectClass = ClassName.get("org.apache.ignite.internal.network.serialization.marshal", "MarshalledObject");
+
+        beforeRead.addStatement("$T marshaller = ($T) marshallerObj", marshallerClass, marshallerClass);
+
+        message.getters().forEach(executableElement -> {
+            TypeMirror type = executableElement.getReturnType();
+            String objectName = executableElement.getSimpleName().toString();
+
+            if (executableElement.getAnnotation(Marshallable.class) != null) {
+                String baName = objectName + "ByteArray";
+                String moName = baName + "mo";
+                beforeRead.addStatement("$T $N = marshaller.marshal($N)", marshalledObjectClass, moName, objectName);
+                beforeRead.addStatement("set.addAll($N.usedDescriptorIds())", moName);
+                beforeRead.addStatement("$N = $N.bytes()", baName, moName).addCode("\n");
+            } else if (typeUtils.isSubType(type, NetworkMessage.class)) {
+                beforeRead.addStatement("if ($N != null) $N.beforeRead(set, marshallerObj)", objectName, objectName);
+            } else {
+                String methodType = methodNameResolver.resolveBaseMethodName(type);
+                switch (methodType) {
+                    case "ObjectArray":
+                        ArrayType arrayType = (ArrayType) type;
+                        if (typeUtils.isSubType(arrayType.getComponentType(), NetworkMessage.class)) {
+                            beforeRead.beginControlFlow("if ($N != null)", objectName);
+                            beforeRead.beginControlFlow("for (int i = 0; i < $N.length; i++)", objectName);
+                            beforeRead.addStatement("if ($N[i] != null) $N[i].beforeRead(set, marshallerObj)", objectName, objectName);
+                            beforeRead.endControlFlow();
+                            beforeRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    case "Collection":
+                        DeclaredType declaredType = (DeclaredType) type;
+                        TypeMirror elementType = declaredType.getTypeArguments().get(0);
+                        if (typeUtils.isSubType(elementType, NetworkMessage.class)) {
+                            beforeRead.beginControlFlow("if ($N != null)", objectName);
+                            beforeRead.beginControlFlow("for ($T obj : $N)", elementType, objectName);
+                            beforeRead.addStatement("if (obj != null) obj.beforeRead(set, marshallerObj)", objectName);
+                            beforeRead.endControlFlow();
+                            beforeRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    case "Map":
+                        DeclaredType mapType = (DeclaredType) type;
+                        TypeMirror keyType = mapType.getTypeArguments().get(0);
+                        boolean keyIsMessage = typeUtils.isSubType(keyType, NetworkMessage.class);
+                        TypeMirror valueType = mapType.getTypeArguments().get(1);
+                        boolean valueIsMessage = typeUtils.isSubType(valueType, NetworkMessage.class);
+
+                        if (keyIsMessage || valueIsMessage) {
+                            ParameterizedTypeName entryType = ParameterizedTypeName.get(ClassName.get(Map.Entry.class), TypeName.get(keyType), TypeName.get(valueType));
+                            ParameterizedTypeName entrySetType = ParameterizedTypeName.get(ClassName.get(Set.class), entryType);
+                            String entrySetName = objectName + "EntrySet";
+
+                            beforeRead.beginControlFlow("if ($N != null)", objectName);
+                            beforeRead.addStatement("$T $N = $N.entrySet()", entrySetType, entrySetName, objectName);
+                            beforeRead.beginControlFlow("for ($T entry : $N)", entryType, entrySetName);
+                            beforeRead.addStatement("$T key = entry.getKey()", keyType);
+                            beforeRead.addStatement("$T value = entry.getValue()", valueType);
+                            if (keyIsMessage) {
+                                beforeRead.addStatement("if (key != null) key.beforeRead(set, marshallerObj)");
+                            }
+                            if (valueIsMessage) {
+                                beforeRead.addStatement("if (value != null) value.beforeRead(set, marshallerObj)");
+                            }
+                            beforeRead.endControlFlow().addCode("\n");
+                            beforeRead.endControlFlow();
+                        }
+                        break;
+                    default:
+                        break;
+                }
+            }
+        });
+
+        beforeRead.addStatement("return set");
+
+        messageImplBuild.addMethod(beforeRead.build());
+    }
+
+    private void generateAfterReceive(TypeSpec.Builder messageImplBuild, MessageClass message) {
+        ParameterizedTypeName returnType = ParameterizedTypeName.get(ClassName.get(Set.class), TypeName.INT.box());
+        ParameterizedTypeName returnTypeImpl = ParameterizedTypeName.get(ClassName.get(HashSet.class), TypeName.INT.box());
+
+        Builder afterRead = MethodSpec.methodBuilder("afterRead")
+                .addAnnotation(Override.class)
+                .addModifiers(Modifier.PUBLIC)
+                .addException(Exception.class)
+                .addParameter(Object.class, "marshallerObj")
+                .addParameter(Object.class, "descriptorsObj");
+
+        ClassName marshallerClass = ClassName.get("org.apache.ignite.internal.network.serialization.marshal", "UserObjectMarshaller");
+        ClassName descriptorRegistryClass = ClassName.get("org.apache.ignite.internal.network.serialization", "DescriptorRegistry");
+
+        afterRead.addStatement("$T marshaller = ($T) marshallerObj", marshallerClass, marshallerClass);
+        afterRead.addStatement("$T descriptorRegistry = ($T) descriptorsObj", descriptorRegistryClass, descriptorRegistryClass);
+
+        message.getters().forEach(executableElement -> {
+            TypeMirror type = executableElement.getReturnType();
+            String objectName = executableElement.getSimpleName().toString();
+
+            if (executableElement.getAnnotation(Marshallable.class) != null) {
+                String baName = objectName + "ByteArray";
+                afterRead.addStatement("$N = marshaller.unmarshal($N, descriptorRegistry)", objectName, baName);
+            } else if (typeUtils.isSubType(type, NetworkMessage.class)) {
+                afterRead.addStatement("if ($N != null) $N.afterRead(marshallerObj, descriptorsObj)", objectName, objectName);
+            } else {
+                String methodType = methodNameResolver.resolveBaseMethodName(type);
+                switch (methodType) {
+                    case "ObjectArray":
+                        ArrayType arrayType = (ArrayType) type;
+                        if (typeUtils.isSubType(arrayType.getComponentType(), NetworkMessage.class)) {
+                            afterRead.beginControlFlow("if ($N != null)", objectName);
+                            afterRead.beginControlFlow("for (int i = 0; i < $N.length; i++)", objectName);
+                            afterRead.addStatement("if ($N[i] != null) $N[i].afterRead(marshallerObj, descriptorsObj)", objectName, objectName);
+                            afterRead.endControlFlow();
+                            afterRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    case "Collection":
+                        DeclaredType declaredType = (DeclaredType) type;
+                        TypeMirror elementType = declaredType.getTypeArguments().get(0);
+                        if (typeUtils.isSubType(elementType, NetworkMessage.class)) {
+                            afterRead.beginControlFlow("if ($N != null)", objectName);
+                            afterRead.beginControlFlow("for ($T obj : $N)", elementType, objectName);
+                            afterRead.addStatement("if (obj != null) obj.afterRead(marshallerObj, descriptorsObj)", objectName);
+                            afterRead.endControlFlow();
+                            afterRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    case "Map":
+                        DeclaredType mapType = (DeclaredType) type;
+                        TypeMirror keyType = mapType.getTypeArguments().get(0);
+                        boolean keyIsMessage = typeUtils.isSubType(keyType, NetworkMessage.class);
+                        TypeMirror valueType = mapType.getTypeArguments().get(1);
+                        boolean valueIsMessage = typeUtils.isSubType(valueType, NetworkMessage.class);
+
+                        if (keyIsMessage || valueIsMessage) {
+                            ParameterizedTypeName entryType = ParameterizedTypeName.get(ClassName.get(Map.Entry.class), TypeName.get(keyType), TypeName.get(valueType));
+                            ParameterizedTypeName entrySetType = ParameterizedTypeName.get(ClassName.get(Set.class), entryType);
+                            String entrySetName = objectName + "EntrySet";
+
+                            afterRead.beginControlFlow("if ($N != null)", objectName);
+                            afterRead.addStatement("$T $N = $N.entrySet()", entrySetType, entrySetName, objectName);
+                            afterRead.beginControlFlow("for ($T entry : $N)", entryType, entrySetName);
+                            afterRead.addStatement("$T key = entry.getKey()", keyType);
+                            afterRead.addStatement("$T value = entry.getValue()", valueType);
+                            if (keyIsMessage) {
+                                afterRead.addStatement("if (key != null) key.afterRead(marshallerObj, descriptorsObj)");
+                            }
+                            if (valueIsMessage) {
+                                afterRead.addStatement("if (value != null) value.afterRead(marshallerObj, descriptorsObj)");
+                            }
+                            afterRead.endControlFlow();
+                            afterRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    default:
+                        break;
+                }
+            }
+        });
+
+        messageImplBuild.addMethod(afterRead.build());
+    }
+
     /**
      * Generates implementations of {@link #hashCode} and {@link #equals} for the provided {@code message} and adds them to the provided
      * builder.
@@ -322,6 +539,7 @@ public class MessageImplGenerator {
         List<ExecutableElement> messageGetters = message.getters();
 
         var fields = new ArrayList<FieldSpec>(messageGetters.size());
+        var allFields = new ArrayList<FieldSpec>(messageGetters.size());
         var setters = new ArrayList<MethodSpec>(messageGetters.size());
         var getters = new ArrayList<MethodSpec>(messageGetters.size());
 
@@ -335,6 +553,7 @@ public class MessageImplGenerator {
                     .build();
 
             fields.add(field);
+            allFields.add(field);
 
             MethodSpec setter = MethodSpec.methodBuilder(getterName)
                     .addAnnotation(Override.class)
@@ -355,15 +574,45 @@ public class MessageImplGenerator {
                     .build();
 
             getters.add(getter);
+
+            if (messageGetter.getAnnotation(Marshallable.class) != null) {
+                String name = getterName + "ByteArray";
+                ArrayTypeName type = ArrayTypeName.of(TypeName.BYTE);
+                FieldSpec baField = FieldSpec.builder(type, name)
+                        .addModifiers(Modifier.PRIVATE)
+                        .build();
+
+                allFields.add(baField);
+
+                MethodSpec baSetter = MethodSpec.methodBuilder(name)
+                        .addAnnotation(Override.class)
+                        .addModifiers(Modifier.PUBLIC)
+                        .returns(builderName)
+                        .addParameter(type, name)
+                        .addStatement("this.$N = $L", baField, name)
+                        .addStatement("return this")
+                        .build();
+
+                setters.add(baSetter);
+
+                MethodSpec baGetter = MethodSpec.methodBuilder(name)
+                        .addAnnotation(Override.class)
+                        .addModifiers(Modifier.PUBLIC)
+                        .returns(type)
+                        .addStatement("return $N", baField)
+                        .build();
+
+                getters.add(baGetter);
+            }
         }
 
         return TypeSpec.classBuilder("Builder")
                 .addModifiers(Modifier.PRIVATE, Modifier.STATIC)
                 .addSuperinterface(builderName)
-                .addFields(fields)
+                .addFields(allFields)
                 .addMethods(setters)
                 .addMethods(getters)
-                .addMethod(buildMethod(message, messageImplClass, fields))
+                .addMethod(buildMethod(message, messageImplClass, allFields))
                 .build();
     }
 
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
index 00783d0..3a87c4a 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
@@ -38,7 +38,7 @@ import org.apache.ignite.network.NetworkMessage;
  * @see MessageReaderMethodResolver
  * @see MessageWriterMethodResolver
  */
-class BaseMethodNameResolver {
+public class BaseMethodNameResolver {
     /** Processing environment. */
     private final ProcessingEnvironment processingEnvironment;
 
@@ -47,7 +47,7 @@ class BaseMethodNameResolver {
      *
      * @param processingEnvironment Processing environment.
      */
-    BaseMethodNameResolver(ProcessingEnvironment processingEnvironment) {
+    public BaseMethodNameResolver(ProcessingEnvironment processingEnvironment) {
         this.processingEnvironment = processingEnvironment;
     }
 
@@ -57,7 +57,7 @@ class BaseMethodNameResolver {
      * @param parameterType parameter of the method to resolve
      * @return part of the method name, depending on the parameter type
      */
-    String resolveBaseMethodName(TypeMirror parameterType) {
+    public String resolveBaseMethodName(TypeMirror parameterType) {
         if (parameterType.getKind().isPrimitive()) {
             return resolvePrimitiveMethodName(parameterType);
         } else if (parameterType.getKind() == TypeKind.ARRAY) {
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
index 8ea3d12..93ae44b 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
@@ -30,6 +30,7 @@ import javax.lang.model.element.Modifier;
 import javax.tools.Diagnostic;
 import org.apache.ignite.internal.network.processor.MessageClass;
 import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
+import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.serialization.MessageDeserializer;
 import org.apache.ignite.network.serialization.MessageMappingException;
 import org.apache.ignite.network.serialization.MessageReader;
@@ -153,8 +154,14 @@ public class MessageDeserializerGenerator {
     private CodeBlock readMessageCodeBlock(ExecutableElement getter, FieldSpec msgField) {
         var methodResolver = new MessageReaderMethodResolver(processingEnv);
 
+        String name = getter.getSimpleName().toString();
+
+        if (getter.getAnnotation(Marshallable.class) != null) {
+            name += "ByteArray";
+        }
+
         return CodeBlock.builder()
-                .add("$N.$N(reader.", msgField, getter.getSimpleName())
+                .add("$N.$N(reader.", msgField, name)
                 .add(methodResolver.resolveReadMethod(getter))
                 .add(")")
                 .build();
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
index 13e9ae2..925c3f0 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
@@ -60,8 +60,9 @@ class MessageReaderMethodResolver {
         String parameterName = getter.getSimpleName().toString();
 
         if (getter.getAnnotation(Marshallable.class) != null) {
+            parameterName += "ByteArray";
             return CodeBlock.builder()
-                    .add("readMarshallable($S)", parameterName)
+                    .add("readByteArray($S)", parameterName)
                     .build();
         }
 
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
index 1926e8e..7d579e4 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
@@ -81,10 +81,12 @@ public class MessageSerializerGenerator {
                 .addAnnotation(Override.class)
                 .addModifiers(Modifier.PUBLIC)
                 .returns(boolean.class)
-                .addParameter(message.className(), "message")
+                .addParameter(message.className(), "msg")
                 .addParameter(MessageWriter.class, "writer")
                 .addException(MessageMappingException.class);
 
+        method.addStatement("$T message = ($T) msg", message.implClassName(), message.implClassName()).addCode("\n");
+
         List<ExecutableElement> getters = message.getters();
 
         method
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java
index 38aeb5e..29d973c 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java
@@ -35,7 +35,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
 /**
  * Class for resolving {@link MessageWriter} "write*" methods for the corresponding message field type.
  */
-class MessageWriterMethodResolver {
+public class MessageWriterMethodResolver {
     /** Method name resolver. */
     private final BaseMethodNameResolver methodNameResolver;
 
@@ -47,7 +47,7 @@ class MessageWriterMethodResolver {
      *
      * @param processingEnvironment Processing environment.
      */
-    MessageWriterMethodResolver(ProcessingEnvironment processingEnvironment) {
+    public MessageWriterMethodResolver(ProcessingEnvironment processingEnvironment) {
         methodNameResolver = new BaseMethodNameResolver(processingEnvironment);
         typeConverter = new MessageCollectionItemTypeConverter(processingEnvironment);
     }
@@ -72,8 +72,9 @@ class MessageWriterMethodResolver {
         String parameterName = getter.getSimpleName().toString();
 
         if (getter.getAnnotation(Marshallable.class) != null) {
+            parameterName += "ByteArray";
             return CodeBlock.builder()
-                    .add("writeMarshallable($S, message.$L())", parameterName, parameterName)
+                    .add("writeByteArray($S, message.$L())", parameterName, parameterName)
                     .build();
         }
 
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
index d0becd8..08c2534 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.network;
 
+import java.util.Collections;
+import java.util.Set;
+
 /**
  * Message for exchanging information in a cluster.
  */
@@ -41,4 +44,12 @@ public interface NetworkMessage {
      * @return group type.
      */
     short groupType();
+
+    default Set<Integer> beforeRead(Set<Integer> ids, Object marshaller) throws Exception {
+        return Collections.emptySet();
+    }
+
+    default void afterRead(Object marshaller, Object descriptors) throws Exception {
+        // No-op.
+    }
 }
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 447089b..9d92908 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -34,6 +34,7 @@ import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -53,6 +54,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 import org.apache.ignite.network.TestMessage;
 import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
 import org.apache.ignite.network.TestMessagesFactory;
@@ -108,13 +110,13 @@ public class ItConnectionManagerTest {
 
         var fut = new CompletableFuture<NetworkMessage>();
 
-        manager2.addListener((consistentId, message) -> fut.complete(message));
+        manager2.addListener((obj) -> fut.complete(obj.getMessage()));
 
         NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
 
         TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
 
-        sender.send(testMessage).get(3, TimeUnit.SECONDS);
+        sender.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
         NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
 
@@ -140,7 +142,7 @@ public class ItConnectionManagerTest {
 
         var fut = new CompletableFuture<NetworkMessage>();
 
-        manager1.addListener((consistentId, message) -> fut.complete(message));
+        manager1.addListener((obj) -> fut.complete(obj.getMessage()));
 
         NettySender senderFrom1to2 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
 
@@ -150,9 +152,9 @@ public class ItConnectionManagerTest {
         var messageReceivedOn2 = new CompletableFuture<Void>();
 
         // If the message is received, that means that the handshake was successfully performed.
-        manager2.addListener((consistentId, message) -> messageReceivedOn2.complete(null));
+        manager2.addListener((message) -> messageReceivedOn2.complete(null));
 
-        senderFrom1to2.send(testMessage);
+        senderFrom1to2.send(new NetworkObject(testMessage, Collections.emptyList()));
 
         messageReceivedOn2.get(3, TimeUnit.SECONDS);
 
@@ -164,7 +166,7 @@ public class ItConnectionManagerTest {
 
         assertEquals(clientLocalAddress, clientRemoteAddress);
 
-        senderFrom2to1.send(testMessage).get(3, TimeUnit.SECONDS);
+        senderFrom2to1.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
         NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
 
@@ -231,7 +233,7 @@ public class ItConnectionManagerTest {
 
         assertThrows(ClosedChannelException.class, () -> {
             try {
-                finalSender.send(testMessage).get(3, TimeUnit.SECONDS);
+                finalSender.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
             } catch (Exception e) {
                 throw e.getCause();
             }
@@ -241,11 +243,11 @@ public class ItConnectionManagerTest {
 
         var fut = new CompletableFuture<NetworkMessage>();
 
-        manager2.get1().addListener((consistentId, message) -> fut.complete(message));
+        manager2.get1().addListener((obj) -> fut.complete(obj.getMessage()));
 
         sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
 
-        sender.send(testMessage).get(3, TimeUnit.SECONDS);
+        sender.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
         NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
 
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
index 6ad6231..53b2463 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.mock;
 
 import io.netty.channel.Channel;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -52,6 +53,8 @@ import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
+import org.apache.ignite.network.TestMessage;
 import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
 import org.apache.ignite.network.TestMessagesFactory;
 import org.junit.jupiter.api.AfterEach;
@@ -130,7 +133,8 @@ public class ItRecoveryHandshakeTest {
         assertNotNull(from2to1);
 
         // Ensure the handshake has finished on both sides.
-        from2to1.send(messageFactory.testMessage().msg("test").build()).get(3, TimeUnit.SECONDS);
+        TestMessage msg = messageFactory.testMessage().msg("test").build();
+        from2to1.send(new NetworkObject(msg, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
         NettySender from1to2 = manager1.channel(manager2.consistentId(), manager2.getLocalAddress()).get(3, TimeUnit.SECONDS);
 
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index 87c215f..fd52931 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.network.NetworkMessageTypes;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
@@ -294,12 +294,16 @@ class ItScaleCubeNetworkMessagingTest {
         // register a different handle for the second group
         node1.messagingService().addMessageHandler(
                 NetworkMessageTypes.class,
-                (message, senderAddr, correlationId) -> assertTrue(networkMessageFuture.complete(message))
+                (message, senderAddr, correlationId) -> {
+                    if (message instanceof FieldDescriptorMessage) {
+                        assertTrue(networkMessageFuture.complete(message));
+                    }
+                }
         );
 
         var testMessage = messageFactory.testMessage().msg("foo").build();
 
-        ClassDescriptorMessage networkMessage = new NetworkMessagesFactory().classDescriptorMessage().build();
+        FieldDescriptorMessage networkMessage = new NetworkMessagesFactory().fieldDescriptorMessage().build();
 
         // test that a message gets delivered to both handlers
         node2.messagingService()
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
index bae7c7d..ee0df8b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.network;
 
+import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
 import org.apache.ignite.internal.network.message.InvokeRequest;
@@ -65,4 +66,9 @@ public class NetworkMessageTypes {
      * Type for {@link FieldDescriptorMessage}.
      */
     public static final short FIELD_DESCRIPTOR_MESSAGE = 6;
+
+    /**
+     * Type for {@link ClassDescriptorListMessage}.
+     */
+    public static final short CLASS_DESCRIPTOR_LIST_MESSAGE = 7;
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClassDescriptorListMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClassDescriptorListMessage.java
new file mode 100644
index 0000000..bfc7b97
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClassDescriptorListMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.message;
+
+import java.util.Collection;
+import org.apache.ignite.internal.network.NetworkMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+@Transferable(NetworkMessageTypes.CLASS_DESCRIPTOR_LIST_MESSAGE)
+public interface ClassDescriptorListMessage extends NetworkMessage {
+
+    Collection<ClassDescriptorMessage> messages();
+
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index d8f8967..a95d285 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -31,7 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 import org.apache.ignite.configuration.schemas.network.NetworkView;
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -71,7 +70,7 @@ public class ConnectionManager {
     private final SerializationService serializationService;
 
     /** Message listeners. */
-    private final List<BiConsumer<String, NetworkMessage>> listeners = new CopyOnWriteArrayList<>();
+    private final List<Consumer<InNetworkObject>> listeners = new CopyOnWriteArrayList<>();
 
     /** Node consistent id. */
     private final String consistentId;
@@ -197,11 +196,10 @@ public class ConnectionManager {
     /**
      * Callback that is called upon receiving a new message.
      *
-     * @param consistentId Consistent id of the message's sender.
      * @param message New message.
      */
-    private void onMessage(String consistentId, NetworkMessage message) {
-        listeners.forEach(consumer -> consumer.accept(consistentId, message));
+    private void onMessage(InNetworkObject message) {
+        listeners.forEach(consumer -> consumer.accept(message));
     }
 
     /**
@@ -243,7 +241,7 @@ public class ConnectionManager {
      *
      * @param listener Message listener.
      */
-    public void addListener(BiConsumer<String, NetworkMessage> listener) {
+    public void addListener(Consumer<InNetworkObject> listener) {
         listeners.add(listener);
     }
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
similarity index 51%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
index cc72427..85fb883 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
@@ -17,37 +17,32 @@
 
 package org.apache.ignite.internal.network.netty;
 
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import java.util.function.BiConsumer;
+import org.apache.ignite.internal.network.serialization.CompositeDescriptorRegistry;
 import org.apache.ignite.network.NetworkMessage;
 
-/**
- * Network message handler that delegates handling to {@link #messageListener}.
- */
-public class MessageHandler extends ChannelInboundHandlerAdapter {
-    /** Message listener. */
-    private final BiConsumer<String, NetworkMessage> messageListener;
+public class InNetworkObject {
+
+    private final NetworkMessage message;
 
-    /** Consistent id of the remote node. */
     private final String consistentId;
 
-    /**
-     * Constructor.
-     *
-     * @param messageListener Message listener.
-     * @param consistentId Consistent id of the remote node.
-     */
-    public MessageHandler(BiConsumer<String, NetworkMessage> messageListener, String consistentId) {
-        this.messageListener = messageListener;
+    private final CompositeDescriptorRegistry registry;
+    
+    public InNetworkObject(NetworkMessage message, String consistentId, CompositeDescriptorRegistry registry) {
+        this.message = message;
         this.consistentId = consistentId;
+        this.registry = registry;
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        NetworkMessage message = (NetworkMessage) msg;
+    public NetworkMessage getMessage() {
+        return message;
+    }
+
+    public String getConsistentId() {
+        return consistentId;
+    }
 
-        messageListener.accept(consistentId, message);
+    public CompositeDescriptorRegistry getRegistry() {
+        return registry;
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
index 49c2ed0..cd1bc2b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.ignite.internal.network.direct.DirectMarshallingUtils;
 import org.apache.ignite.internal.network.direct.DirectMessageReader;
+import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
 import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NetworkMessage;
@@ -107,7 +108,13 @@ public class InboundDecoder extends ByteToMessageDecoder {
                     reader.reset();
                     messageAttr.set(null);
 
-                    out.add(msg.getMessage());
+                    NetworkMessage message = msg.getMessage();
+
+                    if (message instanceof ClassDescriptorListMessage) {
+                        onClassDescriptorMessage((ClassDescriptorListMessage) message);
+                    } else {
+                        out.add(message);
+                    }
                 } else {
                     messageAttr.set(msg);
                 }
@@ -133,4 +140,8 @@ public class InboundDecoder extends ByteToMessageDecoder {
             }
         }
     }
+
+    private void onClassDescriptorMessage(ClassDescriptorListMessage msg) {
+        serializationService.mergeDescriptors(msg.messages());
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
index cc72427..c587071 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
@@ -19,7 +19,8 @@ package org.apache.ignite.internal.network.netty;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.network.NetworkMessage;
 
 /**
@@ -27,20 +28,23 @@ import org.apache.ignite.network.NetworkMessage;
  */
 public class MessageHandler extends ChannelInboundHandlerAdapter {
     /** Message listener. */
-    private final BiConsumer<String, NetworkMessage> messageListener;
+    private final Consumer<InNetworkObject> messageListener;
 
     /** Consistent id of the remote node. */
     private final String consistentId;
 
+    private final PerSessionSerializationService serializationService;
+
     /**
      * Constructor.
-     *
-     * @param messageListener Message listener.
+     *  @param messageListener Message listener.
      * @param consistentId Consistent id of the remote node.
+     * @param serializationService
      */
-    public MessageHandler(BiConsumer<String, NetworkMessage> messageListener, String consistentId) {
+    public MessageHandler(Consumer<InNetworkObject> messageListener, String consistentId, PerSessionSerializationService serializationService) {
         this.messageListener = messageListener;
         this.consistentId = consistentId;
+        this.serializationService = serializationService;
     }
 
     /** {@inheritDoc} */
@@ -48,6 +52,6 @@ public class MessageHandler extends ChannelInboundHandlerAdapter {
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
         NetworkMessage message = (NetworkMessage) msg;
 
-        messageListener.accept(consistentId, message);
+        messageListener.accept(new InNetworkObject(message, consistentId, serializationService.compositeDescriptorRegistry()));
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
index 1b1e4c7..7983e40 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
@@ -25,13 +25,12 @@ import io.netty.handler.stream.ChunkedWriteHandler;
 import java.net.SocketAddress;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -59,7 +58,7 @@ public class NettyClient {
     private volatile Channel channel = null;
 
     /** Message listener. */
-    private final BiConsumer<String, NetworkMessage> messageListener;
+    private final Consumer<InNetworkObject> messageListener;
 
     /** Handshake manager. */
     private final HandshakeManager handshakeManager;
@@ -79,7 +78,7 @@ public class NettyClient {
             SocketAddress address,
             SerializationService serializationService,
             HandshakeManager manager,
-            BiConsumer<String, NetworkMessage> messageListener
+            Consumer<InNetworkObject> messageListener
     ) {
         this.address = address;
         this.serializationService = serializationService;
@@ -113,7 +112,9 @@ public class NettyClient {
 
                     ch.pipeline().addLast(
                             new InboundDecoder(sessionSerializationService),
-                            new HandshakeHandler(handshakeManager, (consistentId) -> new MessageHandler(messageListener, consistentId)),
+                            new HandshakeHandler(handshakeManager,
+                                    (consistentId) -> new MessageHandler(messageListener, consistentId, sessionSerializationService)
+                            ),
                             new ChunkedWriteHandler(),
                             new OutboundEncoder(sessionSerializationService),
                             new IoExceptionSuppressingHandler()
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
index 6185994..46a598f 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
@@ -21,7 +21,7 @@ import io.netty.channel.Channel;
 import io.netty.handler.stream.ChunkedInput;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.direct.DirectMessageWriter;
-import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -56,8 +56,8 @@ public class NettySender {
      * @param msg Network message.
      * @return Future of the send operation.
      */
-    public CompletableFuture<Void> send(NetworkMessage msg) {
-        return NettyUtils.toCompletableFuture(channel.writeAndFlush(msg));
+    public CompletableFuture<Void> send(NetworkObject obj) {
+        return NettyUtils.toCompletableFuture(channel.writeAndFlush(obj));
     }
 
     /**
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index 89e7f8c..7050d8e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -27,7 +27,6 @@ import io.netty.handler.stream.ChunkedWriteHandler;
 import java.net.SocketAddress;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -37,7 +36,6 @@ import org.apache.ignite.internal.network.serialization.PerSessionSerializationS
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -58,7 +56,7 @@ public class NettyServer {
     private final SerializationService serializationService;
 
     /** Incoming message listener. */
-    private final BiConsumer<String, NetworkMessage> messageListener;
+    private final Consumer<InNetworkObject> messageListener;
 
     /** Handshake manager. */
     private final Supplier<HandshakeManager> handshakeManager;
@@ -94,7 +92,7 @@ public class NettyServer {
             NetworkView configuration,
             Supplier<HandshakeManager> handshakeManager,
             Consumer<NettySender> newConnectionListener,
-            BiConsumer<String, NetworkMessage> messageListener,
+            Consumer<InNetworkObject> messageListener,
             SerializationService serializationService,
             NettyBootstrapFactory bootstrapFactory
     ) {
@@ -139,7 +137,9 @@ public class NettyServer {
                                      */
                                     new InboundDecoder(sessionSerializationService),
                                     // Handshake handler.
-                                    new HandshakeHandler(manager, (consistentId) -> new MessageHandler(messageListener, consistentId)),
+                                    new HandshakeHandler(manager,
+                                            (consistentId) -> new MessageHandler(messageListener, consistentId, sessionSerializationService)
+                                    ),
                                     /*
                                      * Encoder that uses the MessageWriter
                                      * to write chunked data.
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index c629084..fa6f04b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -24,15 +24,20 @@ import io.netty.handler.codec.MessageToMessageEncoder;
 import io.netty.handler.stream.ChunkedInput;
 import java.nio.ByteBuffer;
 import java.util.List;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
 import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 import org.apache.ignite.network.serialization.MessageSerializer;
 
 /**
  * An encoder for the outbound messages that uses {@link DirectMessageWriter}.
  */
-public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
+public class OutboundEncoder extends MessageToMessageEncoder<NetworkObject> {
+    private static final NetworkMessagesFactory MSG_FACTORY = new NetworkMessagesFactory();
+
     /** Serialization registry. */
     private final PerSessionSerializationService serializationService;
 
@@ -47,7 +52,7 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
 
     /** {@inheritDoc} */
     @Override
-    protected void encode(ChannelHandlerContext ctx, NetworkMessage msg, List<Object> out) throws Exception {
+    protected void encode(ChannelHandlerContext ctx, NetworkObject msg, List<Object> out) throws Exception {
         out.add(new NetworkMessageChunkedInput(msg, serializationService));
     }
 
@@ -61,11 +66,17 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
         /** Message serializer. */
         private final MessageSerializer<NetworkMessage> serializer;
 
+        private final MessageSerializer<ClassDescriptorListMessage> descriptorSerializer;
+
         /** Message writer. */
         private final DirectMessageWriter writer;
 
+        private final ClassDescriptorListMessage descriptors;
+        private final PerSessionSerializationService serializationService;
+
         /** Whether the message was fully written. */
         private boolean finished = false;
+        private boolean descriptorsFinished = false;
 
         /**
          * Constructor.
@@ -74,10 +85,23 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
          * @param serializationService Serialization service.
          */
         private NetworkMessageChunkedInput(
-                NetworkMessage msg,
+                NetworkObject object,
                 PerSessionSerializationService serializationService
         ) {
-            this.msg = msg;
+            this.serializationService = serializationService;
+            this.msg = object.getNetworkMessage();
+
+            if (!object.getDescriptors().isEmpty()) {
+                descriptors = MSG_FACTORY.classDescriptorListMessage().messages(object.getDescriptors()).build();
+                short groupType = descriptors.groupType();
+                short messageType = descriptors.messageType();
+                descriptorSerializer = serializationService.createMessageSerializer(groupType, messageType);
+            } else {
+                descriptors = null;
+                descriptorSerializer = null;
+                descriptorsFinished = true;
+            }
+
             this.serializer = serializationService.createMessageSerializer(msg.groupType(), msg.messageType());
             this.writer = new DirectMessageWriter(serializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
         }
@@ -113,7 +137,14 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
 
             writer.setBuffer(byteBuffer);
 
-            finished = serializer.writeMessage(msg, writer);
+            if (!descriptorsFinished) {
+                descriptorsFinished = descriptorSerializer.writeMessage(descriptors, writer);
+                if (descriptorsFinished) {
+                    writer.reset();
+                }
+            } else {
+                finished = serializer.writeMessage(msg, writer);
+            }
 
             buffer.writerIndex(byteBuffer.position() - initialPosition);
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 5f37db8..8c29293 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.recovery;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 
 /**
  * Recovery protocol handshake manager for a client.
@@ -78,7 +80,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
                     .connectionsCount(0)
                     .build();
 
-            ChannelFuture sendFuture = channel.writeAndFlush(response);
+            ChannelFuture sendFuture = channel.writeAndFlush(new NetworkObject(response, Collections.emptyList()));
 
             NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
                 if (throwable != null) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index 4954981..bb0b5f3 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.recovery;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 
 /**
  * Recovery protocol handshake manager for a server.
@@ -76,7 +78,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
                 .consistentId(consistentId)
                 .build();
 
-        ChannelFuture sendFuture = channel.writeAndFlush(handshakeStartMessage);
+        ChannelFuture sendFuture = channel.writeAndFlush(new NetworkObject(handshakeStartMessage, Collections.emptyList()));
 
         NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
             if (throwable != null) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
index 288b51e..4ce1381 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
@@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -184,6 +185,14 @@ public class PerSessionSerializationService {
         return messages;
     }
 
+    public boolean isSent(int descriptorId) {
+        return sentDescriptors.contains(descriptorId);
+    }
+
+    public void addSent(int descriptorId) {
+        sentDescriptors.add(descriptorId);
+    }
+
     private byte fieldFlags(FieldDescriptor fieldDescriptor) {
         int bits = condMask(fieldDescriptor.isUnshared(), FieldDescriptorMessage.UNSHARED_MASK)
                 | condMask(fieldDescriptor.isPrimitive(), FieldDescriptorMessage.IS_PRIMITIVE)
@@ -232,7 +241,7 @@ public class PerSessionSerializationService {
         return id;
     }
 
-    private void mergeDescriptors(List<ClassDescriptorMessage> remoteDescriptors) {
+    public void mergeDescriptors(Collection<ClassDescriptorMessage> remoteDescriptors) {
         List<ClassDescriptorMessage> leftToProcess = remoteDescriptors.stream()
                 .filter(classMessage -> !knownMergedDescriptor(classMessage.descriptorId()))
                 .collect(toCollection(LinkedList::new));
@@ -378,6 +387,11 @@ public class PerSessionSerializationService {
         }
     }
 
+    public CompositeDescriptorRegistry compositeDescriptorRegistry() {
+        return descriptors;
+    }
+
+
     @TestOnly
     Map<Integer, ClassDescriptor> getDescriptorMapView() {
         return mergedIdToDescriptorMap;
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 9d8965d..2ee7961 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -18,22 +18,40 @@
 package org.apache.ignite.network;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.network.NettyBootstrapFactory.isInNetworkThread;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
 import org.apache.ignite.internal.network.message.InvokeRequest;
 import org.apache.ignite.internal.network.message.InvokeResponse;
-import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.InNetworkObject;
 import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.internal.network.serialization.ClassDescriptor;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.CompositeDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.FieldDescriptor;
+import org.apache.ignite.internal.network.serialization.Serialization;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.jetbrains.annotations.Nullable;
 
@@ -42,9 +60,15 @@ public class DefaultMessagingService extends AbstractMessagingService {
     /** Network messages factory. */
     private final NetworkMessagesFactory factory;
 
+    /** Integer value that is sent when there is no descriptor. */
+    private static final int NO_DESCRIPTOR_ID = Integer.MIN_VALUE;
+
     /** Topology service. */
     private final TopologyService topologyService;
 
+    private final UserObjectMarshaller marshaller;
+    private final ClassDescriptorRegistry classDescriptorRegistry;
+
     /** Connection manager that provides access to {@link NettySender}. */
     private volatile ConnectionManager connectionManager;
 
@@ -75,6 +99,9 @@ public class DefaultMessagingService extends AbstractMessagingService {
     public DefaultMessagingService(NetworkMessagesFactory factory, TopologyService topologyService) {
         this.factory = factory;
         this.topologyService = topologyService;
+        UserObjectSerializationContext userObjectSerializationContext = createUserObjectSerializationContext();
+        this.marshaller = userObjectSerializationContext.marshaller();
+        this.classDescriptorRegistry = userObjectSerializationContext.descriptorRegistry();
     }
 
     /**
@@ -158,7 +185,24 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         String recipientConsistentId = recipient != null ? recipient.name() : address.consistentId();
 
-        return connectionManager.channel(recipientConsistentId, addr).thenCompose(sender -> sender.send(message));
+        return this.sendMessage0(message, recipientConsistentId, addr);
+    }
+
+    private CompletableFuture<Void> sendMessage0(NetworkMessage message, String recipientConsistentId, InetSocketAddress addr) {
+        if (isInNetworkThread()) {
+            return CompletableFuture.supplyAsync(() -> sendMessage0(message, recipientConsistentId, addr), svc).thenCompose(fut -> fut);
+        }
+
+        List<ClassDescriptorMessage> descriptors;
+
+        try {
+            descriptors = beforeRead(message);
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        }
+
+        return connectionManager.channel(recipientConsistentId, addr).thenCompose(
+                sender -> sender.send(new NetworkObject(message, descriptors)));
     }
 
     /**
@@ -194,8 +238,95 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         String recipientConsistentId = recipient != null ? recipient.name() : addr.consistentId();
 
-        return connectionManager.channel(recipientConsistentId, address).thenCompose(sender -> sender.send(message))
-                .thenCompose(unused -> responseFuture);
+        return sendMessage0(message, recipientConsistentId, address).thenCompose(unused -> responseFuture);
+    }
+
+    private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws Exception {
+        Set<Integer> ids = msg.beforeRead(new HashSet<>(), marshaller);
+
+        return createClassDescriptorsMessages(ids);
+    }
+
+    public List<ClassDescriptorMessage> createClassDescriptorsMessages(Set<Integer> descriptorIds) {
+        List<ClassDescriptorMessage> messages = descriptorIds.stream()
+                .map(classDescriptorRegistry::getDescriptor)
+                .map(descriptor -> {
+                    List<FieldDescriptorMessage> fields = descriptor.fields().stream()
+                            .map(d -> {
+                                return factory.fieldDescriptorMessage()
+                                    .name(d.name())
+                                    .typeDescriptorId(d.typeDescriptorId())
+                                    .className(d.typeName())
+                                    .flags(fieldFlags(d))
+                                    .build();
+                            })
+                            .collect(toList());
+
+                    Serialization serialization = descriptor.serialization();
+
+                    return factory.classDescriptorMessage()
+                        .fields(fields)
+                        .serializationType((byte) serialization.type().value())
+                        .serializationFlags(serializationAttributeFlags(serialization))
+                        .descriptorId(descriptor.descriptorId())
+                        .className(descriptor.className())
+                        .superClassDescriptorId(superClassDescriptorIdForMessage(descriptor))
+                        .superClassName(descriptor.superClassName())
+                        .componentTypeDescriptorId(componentTypeDescriptorIdForMessage(descriptor))
+                        .componentTypeName(descriptor.componentTypeName())
+                        .attributes(classDescriptorAttributeFlags(descriptor))
+                        .build();
+                }).collect(toList());
+
+        return messages;
+    }
+
+    private byte fieldFlags(FieldDescriptor fieldDescriptor) {
+        int bits = condMask(fieldDescriptor.isUnshared(), FieldDescriptorMessage.UNSHARED_MASK)
+                | condMask(fieldDescriptor.isPrimitive(), FieldDescriptorMessage.IS_PRIMITIVE)
+                | condMask(fieldDescriptor.isRuntimeTypeKnownUpfront(), FieldDescriptorMessage.IS_RUNTIME_TYPE_KNOWN_UPFRONT);
+        return (byte) bits;
+    }
+
+    private byte serializationAttributeFlags(Serialization serialization) {
+        int bits = condMask(serialization.hasWriteObject(), ClassDescriptorMessage.HAS_WRITE_OBJECT_MASK)
+                | condMask(serialization.hasReadObject(), ClassDescriptorMessage.HAS_READ_OBJECT_MASK)
+                | condMask(serialization.hasReadObjectNoData(), ClassDescriptorMessage.HAS_READ_OBJECT_NO_DATA_MASK)
+                | condMask(serialization.hasWriteReplace(), ClassDescriptorMessage.HAS_WRITE_REPLACE_MASK)
+                | condMask(serialization.hasReadResolve(), ClassDescriptorMessage.HAS_READ_RESOLVE_MASK);
+        return (byte) bits;
+    }
+
+    private int condMask(boolean value, int mask) {
+        return value ? mask : 0;
+    }
+
+    private byte classDescriptorAttributeFlags(ClassDescriptor descriptor) {
+        int bits = condMask(descriptor.isPrimitive(), ClassDescriptorMessage.IS_PRIMITIVE_MASK)
+                | condMask(descriptor.isArray(), ClassDescriptorMessage.IS_ARRAY_MASK)
+                | condMask(descriptor.isRuntimeEnum(), ClassDescriptorMessage.IS_RUNTIME_ENUM_MASK)
+                | condMask(descriptor.isRuntimeTypeKnownUpfront(), ClassDescriptorMessage.IS_RUNTIME_TYPE_KNOWN_UPFRONT_MASK);
+        return (byte) bits;
+    }
+
+    private int superClassDescriptorIdForMessage(ClassDescriptor descriptor) {
+        Integer id = descriptor.superClassDescriptorId();
+
+        if (id == null) {
+            return NO_DESCRIPTOR_ID;
+        }
+
+        return id;
+    }
+
+    private int componentTypeDescriptorIdForMessage(ClassDescriptor descriptor) {
+        Integer id = descriptor.componentTypeDescriptorId();
+
+        if (id == null) {
+            return NO_DESCRIPTOR_ID;
+        }
+
+        return id;
     }
 
     /**
@@ -212,18 +343,28 @@ public class DefaultMessagingService extends AbstractMessagingService {
         }
     }
 
+    private final ExecutorService svc = Executors.newSingleThreadExecutor();
+
     /**
      * Handles an incoming messages.
      *
      * @param consistentId Sender's consistent id.
      * @param msg Incoming message.
      */
-    private void onMessage(String consistentId, NetworkMessage msg) {
-        if (msg instanceof ScaleCubeMessage) {
-            // ScaleCube messages are handled in the ScaleCubeTransport
+    private void onMessage(InNetworkObject obj) {
+        if (isInNetworkThread()) {
+            svc.submit(() -> onMessage(obj));
             return;
         }
 
+        NetworkMessage msg = obj.getMessage();
+        CompositeDescriptorRegistry registry = obj.getRegistry();
+        String consistentId = obj.getConsistentId();
+        try {
+            msg.afterRead(marshaller, registry);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
         if (msg instanceof InvokeResponse) {
             InvokeResponse response = (InvokeResponse) msg;
             onInvokeResponse(response.message(), response.correlationId());
@@ -345,4 +486,14 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         requestsMap.clear();
     }
+
+    private UserObjectSerializationContext createUserObjectSerializationContext() {
+        var userObjectDescriptorRegistry = new ClassDescriptorRegistry();
+        var userObjectDescriptorFactory = new ClassDescriptorFactory(userObjectDescriptorRegistry);
+
+        var userObjectMarshaller = new DefaultUserObjectMarshaller(userObjectDescriptorRegistry, userObjectDescriptorFactory);
+
+        return new UserObjectSerializationContext(userObjectDescriptorRegistry, userObjectDescriptorFactory,
+            userObjectMarshaller);
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
index 0d11ccf..a4b7fa8 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
@@ -139,6 +139,22 @@ public class NettyBootstrapFactory implements IgniteComponent {
         clientWorkerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-client");
     }
 
+    public static boolean isInNetworkThread() {
+        String name = Thread.currentThread().getName();
+
+        if (name.contains("-srv-worker")) {
+            return true;
+        }
+        if (name.contains("-client")) {
+            return true;
+        }
+        if (name.contains("-srv-accept")) {
+            return true;
+        }
+
+        return false;
+    }
+
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkObject.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkObject.java
new file mode 100644
index 0000000..9b0d7fb
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkObject.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.List;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+
+public class NetworkObject {
+
+    private final NetworkMessage networkMessage;
+    private final List<ClassDescriptorMessage> descriptors;
+
+    public NetworkObject(NetworkMessage networkMessage, List<ClassDescriptorMessage> descriptors) {
+        this.networkMessage = networkMessage;
+        this.descriptors = descriptors;
+    }
+
+    public NetworkMessage getNetworkMessage() {
+        return networkMessage;
+    }
+
+    public List<ClassDescriptorMessage> getDescriptors() {
+        return descriptors;
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index e1faa56..f8f77be 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -104,7 +104,9 @@ public class ScaleCubeClusterServiceFactory {
                         nettyBootstrapFactory
                 );
 
-                var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr, topologyService, messageFactory);
+                connectionMgr.start();
+
+                var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr.getLocalAddress(), messagingService, topologyService, messageFactory);
 
                 NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder());
 
@@ -122,8 +124,6 @@ public class ScaleCubeClusterServiceFactory {
 
                 shutdownFuture = cluster.onShutdown().toFuture();
 
-                connectionMgr.start();
-
                 // resolve cyclic dependencies
                 topologyService.setCluster(cluster);
                 messagingService.setConnectionManager(connectionMgr);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index ad763de..e263467 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Map;
 import java.util.Objects;
+import org.apache.ignite.internal.network.NetworkMessageTypes;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import org.apache.ignite.internal.network.message.ScaleCubeMessageBuilder;
@@ -32,6 +33,7 @@ import org.apache.ignite.internal.network.netty.ConnectionManager;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
@@ -63,7 +65,7 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
     private final MonoProcessor<Void> onStop = MonoProcessor.create();
 
     /** Connection manager. */
-    private final ConnectionManager connectionManager;
+    private final MessagingService messagingService;
 
     /** Message factory. */
     private final NetworkMessagesFactory messageFactory;
@@ -82,15 +84,17 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
      * @param messageFactory    message factory
      */
     ScaleCubeDirectMarshallerTransport(
-            ConnectionManager connectionManager,
+            SocketAddress localAddress,
+            MessagingService messagingService,
             ScaleCubeTopologyService topologyService,
             NetworkMessagesFactory messageFactory
     ) {
-        this.connectionManager = connectionManager;
+        this.address = prepareAddress(localAddress);
+        this.messagingService = messagingService;
         this.topologyService = topologyService;
         this.messageFactory = messageFactory;
 
-        this.connectionManager.addListener(this::onMessage);
+        this.messagingService.addMessageHandler(NetworkMessageTypes.class, (message, senderAddr, correlationId) -> onMessage(message));
         // Setup cleanup
         stop.then(doStop())
                 .doFinally(s -> onStop.onComplete())
@@ -145,8 +149,6 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
     /** {@inheritDoc} */
     @Override
     public Mono<Transport> start() {
-        address = prepareAddress(connectionManager.getLocalAddress());
-
         return Mono.just(this);
     }
 
@@ -171,21 +173,23 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
         var addr = InetSocketAddress.createUnresolved(address.host(), address.port());
 
         return Mono.fromFuture(() -> {
-            ClusterNode node = topologyService.getByAddress(NetworkAddress.from(addr));
+            NetworkAddress networkAddress = NetworkAddress.from(addr);
+            ClusterNode node = topologyService.getByAddress(networkAddress);
 
-            String consistentId = node != null ? node.name() : null;
+            if (node == null) {
+                node = new ClusterNode(null, null, networkAddress);
+            }
 
-            return connectionManager.channel(consistentId, addr).thenCompose(client -> client.send(fromMessage(message)));
+            return messagingService.send(node, fromMessage(message));
         });
     }
 
     /**
      * Handles new network messages from {@link #connectionManager}.
      *
-     * @param senderConsistentId Sender's consistent id.
      * @param msg    Network message.
      */
-    private void onMessage(String senderConsistentId, NetworkMessage msg) {
+    private void onMessage(NetworkMessage msg) {
         Message message = fromNetworkMessage(msg);
 
         if (message != null) {
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index cc6cfb0..54ab776 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -168,7 +168,7 @@ public class NettyClientTest {
                 address,
                 null,
                 new MockClientHandshakeManager(channel),
-                (address1, message) -> {
+                (message) -> {
                 }
         );
 
@@ -190,7 +190,7 @@ public class NettyClientTest {
                 address,
                 null,
                 new MockClientHandshakeManager(future.channel()),
-                (address1, message) -> {
+                (message) -> {
                 }
         );
 
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index e59db1a..4e84a89 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -183,7 +183,7 @@ public class NettyServerTest {
                 () -> handshakeManager,
                 sender -> {
                 },
-                (socketAddress, message) -> {
+                (message) -> {
                 },
                 new SerializationService(registry, mock(UserObjectSerializationContext.class)),
                 bootstrapFactory
diff --git a/modules/raft/pom.xml b/modules/raft/pom.xml
index 5d88ad5..2a3e2bc 100644
--- a/modules/raft/pom.xml
+++ b/modules/raft/pom.xml
@@ -118,7 +118,6 @@
         <dependency>
             <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-network</artifactId>
-            <scope>test</scope>
         </dependency>
 
         <dependency>
diff --git a/modules/transactions/pom.xml b/modules/transactions/pom.xml
index 7ffc632..26155ec 100644
--- a/modules/transactions/pom.xml
+++ b/modules/transactions/pom.xml
@@ -50,6 +50,11 @@
 
     <dependency>
       <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-network</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
       <artifactId>ignite-raft-client</artifactId>
     </dependency>