You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/03/02 15:11:58 UTC

[ignite-3] branch ignite-16393 created (now f0dd146)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a change to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git.


      at f0dd146  .

This branch includes the following new commits:

     new f0dd146  .

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[ignite-3] 01/01: .

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit f0dd14659c4260fbbb4e3507c6b1fdb4efaaaab9
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Mar 2 18:09:25 2022 +0300

    .
---
 .../messages/MessageBuilderGenerator.java          |  73 +++---
 .../processor/messages/MessageImplGenerator.java   | 259 ++++++++++++++++++++-
 .../serialization/BaseMethodNameResolver.java      |   6 +-
 .../MessageDeserializerGenerator.java              |   9 +-
 .../serialization/MessageReaderMethodResolver.java |   3 +-
 .../serialization/MessageSerializerGenerator.java  |   4 +-
 .../serialization/MessageWriterMethodResolver.java |   7 +-
 .../org/apache/ignite/network/NetworkMessage.java  |  11 +
 .../network/netty/ItConnectionManagerTest.java     |  20 +-
 .../network/recovery/ItRecoveryHandshakeTest.java  |   6 +-
 .../scalecube/ItScaleCubeNetworkMessagingTest.java |  10 +-
 .../internal/network/NetworkMessageTypes.java      |   6 +
 .../message/ClassDescriptorListMessage.java        |  30 +++
 .../internal/network/netty/ConnectionManager.java  |  12 +-
 .../{MessageHandler.java => InNetworkObject.java}  |  41 ++--
 .../internal/network/netty/InboundDecoder.java     |  13 +-
 .../internal/network/netty/MessageHandler.java     |  16 +-
 .../ignite/internal/network/netty/NettyClient.java |  11 +-
 .../ignite/internal/network/netty/NettySender.java |   6 +-
 .../ignite/internal/network/netty/NettyServer.java |  10 +-
 .../internal/network/netty/OutboundEncoder.java    |  41 +++-
 .../recovery/RecoveryClientHandshakeManager.java   |   4 +-
 .../recovery/RecoveryServerHandshakeManager.java   |   4 +-
 .../PerSessionSerializationService.java            |  16 +-
 .../ignite/network/DefaultMessagingService.java    | 165 ++++++++++++-
 .../ignite/network/NettyBootstrapFactory.java      |  16 ++
 .../org/apache/ignite/network/NetworkObject.java   |  40 ++++
 .../scalecube/ScaleCubeClusterServiceFactory.java  |   6 +-
 .../ScaleCubeDirectMarshallerTransport.java        |  26 ++-
 .../internal/network/netty/NettyClientTest.java    |   4 +-
 .../internal/network/netty/NettyServerTest.java    |   2 +-
 modules/raft/pom.xml                               |   1 -
 modules/transactions/pom.xml                       |   5 +
 33 files changed, 747 insertions(+), 136 deletions(-)

diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java
index 650296e..7f23723 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java
@@ -17,17 +17,20 @@
 
 package org.apache.ignite.internal.network.processor.messages;
 
+import com.squareup.javapoet.ArrayTypeName;
 import com.squareup.javapoet.ClassName;
 import com.squareup.javapoet.MethodSpec;
 import com.squareup.javapoet.TypeName;
 import com.squareup.javapoet.TypeSpec;
-import java.util.List;
-import java.util.stream.Collectors;
+import java.util.ArrayList;
 import javax.annotation.processing.ProcessingEnvironment;
 import javax.lang.model.element.Modifier;
+import javax.lang.model.type.TypeMirror;
 import javax.tools.Diagnostic;
 import org.apache.ignite.internal.network.processor.MessageClass;
 import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
+import org.apache.ignite.internal.network.processor.TypeUtils;
+import org.apache.ignite.network.annotations.Marshallable;
 
 /**
  * Class for generating Builder interfaces for Network Messages.
@@ -39,6 +42,8 @@ public class MessageBuilderGenerator {
     /** Message group. */
     private final MessageGroupWrapper messageGroup;
 
+    private final TypeUtils typeUtils;
+
     /**
      * Constructor.
      *
@@ -48,6 +53,7 @@ public class MessageBuilderGenerator {
     public MessageBuilderGenerator(ProcessingEnvironment processingEnvironment, MessageGroupWrapper messageGroup) {
         this.processingEnvironment = processingEnvironment;
         this.messageGroup = messageGroup;
+        this.typeUtils = new TypeUtils(processingEnvironment);
     }
 
     /**
@@ -62,30 +68,45 @@ public class MessageBuilderGenerator {
         processingEnvironment.getMessager()
                 .printMessage(Diagnostic.Kind.NOTE, "Generating " + builderName, message.element());
 
-        // generate a setter for each getter in the original interface
-        List<MethodSpec> setters = message.getters().stream()
-                .map(getter -> {
-                    String getterName = getter.getSimpleName().toString();
-
-                    return MethodSpec.methodBuilder(getterName)
-                            .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
-                            .addParameter(TypeName.get(getter.getReturnType()), getterName)
-                            .returns(builderName)
-                            .build();
-                })
-                .collect(Collectors.toList());
-
-        // generate a getter for each getter in the original interface
-        List<MethodSpec> getters = message.getters().stream()
-                .map(getter -> {
-                    String getterName = getter.getSimpleName().toString();
-
-                    return MethodSpec.methodBuilder(getterName)
-                            .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
-                            .returns(TypeName.get(getter.getReturnType()))
-                            .build();
-                })
-                .collect(Collectors.toList());
+        var setters = new ArrayList<MethodSpec>(message.getters().size());
+        var getters = new ArrayList<MethodSpec>(message.getters().size());
+        message.getters().forEach(getter -> {
+            String getterName = getter.getSimpleName().toString();
+
+            TypeMirror type = getter.getReturnType();
+
+            // generate a setter for each getter in the original interface
+            MethodSpec setterSpec = MethodSpec.methodBuilder(getterName)
+                    .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+                    .addParameter(TypeName.get(type), getterName)
+                    .returns(builderName)
+                    .build();
+
+            // generate a getter for each getter in the original interface
+            MethodSpec getterSpec = MethodSpec.methodBuilder(getterName)
+                    .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+                    .returns(TypeName.get(type))
+                    .build();
+
+            setters.add(setterSpec);
+            getters.add(getterSpec);
+
+            if (getter.getAnnotation(Marshallable.class) != null) {
+                MethodSpec baSetter = MethodSpec.methodBuilder(getterName + "ByteArray")
+                        .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+                        .addParameter(ArrayTypeName.of(TypeName.BYTE), getterName + "ByteArray")
+                        .returns(builderName)
+                        .build();
+
+                MethodSpec baGetter = MethodSpec.methodBuilder(getterName + "ByteArray")
+                        .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+                        .returns(ArrayTypeName.of(TypeName.BYTE))
+                        .build();
+
+                setters.add(baSetter);
+                getters.add(baGetter);
+            }
+        });
 
         MethodSpec buildMethod = MethodSpec.methodBuilder("build")
                 .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index 9694785..835894b 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -17,26 +17,38 @@
 
 package org.apache.ignite.internal.network.processor.messages;
 
+import com.squareup.javapoet.ArrayTypeName;
 import com.squareup.javapoet.ClassName;
 import com.squareup.javapoet.CodeBlock;
 import com.squareup.javapoet.FieldSpec;
 import com.squareup.javapoet.MethodSpec;
+import com.squareup.javapoet.MethodSpec.Builder;
+import com.squareup.javapoet.ParameterizedTypeName;
 import com.squareup.javapoet.TypeName;
 import com.squareup.javapoet.TypeSpec;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.processing.ProcessingEnvironment;
 import javax.lang.model.element.ExecutableElement;
 import javax.lang.model.element.Modifier;
+import javax.lang.model.type.ArrayType;
+import javax.lang.model.type.DeclaredType;
 import javax.lang.model.type.TypeKind;
+import javax.lang.model.type.TypeMirror;
 import javax.tools.Diagnostic;
 import org.apache.ignite.internal.network.processor.MessageClass;
 import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
+import org.apache.ignite.internal.network.processor.TypeUtils;
+import org.apache.ignite.internal.network.processor.serialization.BaseMethodNameResolver;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
 
 /**
  * Class for generating implementations of the {@link NetworkMessage} interfaces and their builders, generated by a {@link
@@ -49,6 +61,10 @@ public class MessageImplGenerator {
     /** Message group. */
     private final MessageGroupWrapper messageGroup;
 
+    private final TypeUtils typeUtils;
+
+    private final BaseMethodNameResolver methodNameResolver;
+
     /**
      * Constructor.
      *
@@ -58,6 +74,8 @@ public class MessageImplGenerator {
     public MessageImplGenerator(ProcessingEnvironment processingEnv, MessageGroupWrapper messageGroup) {
         this.processingEnv = processingEnv;
         this.messageGroup = messageGroup;
+        this.typeUtils = new TypeUtils(processingEnv);
+        this.methodNameResolver = new BaseMethodNameResolver(processingEnv);
     }
 
     /**
@@ -76,6 +94,7 @@ public class MessageImplGenerator {
         List<ExecutableElement> getters = message.getters();
 
         var fields = new ArrayList<FieldSpec>(getters.size());
+        var allFields = new ArrayList<FieldSpec>(getters.size());
         var getterImpls = new ArrayList<MethodSpec>(getters.size());
 
         // create a field and a getter implementation for every getter in the message interface
@@ -85,24 +104,42 @@ public class MessageImplGenerator {
             String getterName = getter.getSimpleName().toString();
 
             FieldSpec field = FieldSpec.builder(getterReturnType, getterName)
-                    .addModifiers(Modifier.PRIVATE, Modifier.FINAL)
+                    .addModifiers(Modifier.PRIVATE)
                     .build();
 
             fields.add(field);
+            allFields.add(field);
 
             MethodSpec getterImpl = MethodSpec.overriding(getter)
                     .addStatement("return $N", field)
                     .build();
 
             getterImpls.add(getterImpl);
+
+            if (getter.getAnnotation(Marshallable.class) != null) {
+                ArrayTypeName arrayTypeName = ArrayTypeName.of(TypeName.BYTE);
+                String name = getterName + "ByteArray";
+                FieldSpec marshallableFieldArray = FieldSpec.builder(arrayTypeName, name)
+                        .addModifiers(Modifier.PRIVATE)
+                        .build();
+
+                allFields.add(marshallableFieldArray);
+
+                MethodSpec baGetterImpl = MethodSpec.methodBuilder(name)
+                        .returns(arrayTypeName)
+                        .addStatement("return $N", marshallableFieldArray)
+                        .build();
+
+                getterImpls.add(baGetterImpl);
+            }
         }
 
         TypeSpec.Builder messageImpl = TypeSpec.classBuilder(messageImplClassName)
                 .addModifiers(Modifier.PUBLIC)
                 .addSuperinterface(message.className())
-                .addFields(fields)
+                .addFields(allFields)
                 .addMethods(getterImpls)
-                .addMethod(constructor(fields));
+                .addMethod(constructor(allFields));
 
         // group type constant and getter
         FieldSpec groupTypeField = FieldSpec.builder(short.class, "GROUP_TYPE")
@@ -156,6 +193,9 @@ public class MessageImplGenerator {
 
         messageImpl.addMethod(builderMethod);
 
+        generateBeforeSend(messageImpl, message);
+        generateAfterReceive(messageImpl, message);
+
         messageImpl
                 .addOriginatingElement(message.element())
                 .addOriginatingElement(messageGroup.element());
@@ -163,6 +203,183 @@ public class MessageImplGenerator {
         return messageImpl.build();
     }
 
+    private void generateBeforeSend(TypeSpec.Builder messageImplBuild, MessageClass message) {
+        ParameterizedTypeName returnType = ParameterizedTypeName.get(ClassName.get(Set.class), TypeName.INT.box());
+        ParameterizedTypeName returnTypeImpl = ParameterizedTypeName.get(ClassName.get(HashSet.class), TypeName.INT.box());
+
+        Builder beforeRead = MethodSpec.methodBuilder("beforeRead")
+                .addAnnotation(Override.class)
+                .addModifiers(Modifier.PUBLIC)
+                .addException(Exception.class)
+                .addParameter(returnType, "set")
+                .addParameter(Object.class, "marshallerObj")
+                .returns(returnType);
+
+        ClassName marshallerClass = ClassName.get("org.apache.ignite.internal.network.serialization.marshal", "UserObjectMarshaller");
+        ClassName marshalledObjectClass = ClassName.get("org.apache.ignite.internal.network.serialization.marshal", "MarshalledObject");
+
+        beforeRead.addStatement("$T marshaller = ($T) marshallerObj", marshallerClass, marshallerClass);
+
+        message.getters().forEach(executableElement -> {
+            TypeMirror type = executableElement.getReturnType();
+            String objectName = executableElement.getSimpleName().toString();
+
+            if (executableElement.getAnnotation(Marshallable.class) != null) {
+                String baName = objectName + "ByteArray";
+                String moName = baName + "mo";
+                beforeRead.addStatement("$T $N = marshaller.marshal($N)", marshalledObjectClass, moName, objectName);
+                beforeRead.addStatement("set.addAll($N.usedDescriptorIds())", moName);
+                beforeRead.addStatement("$N = $N.bytes()", baName, moName).addCode("\n");
+            } else if (typeUtils.isSubType(type, NetworkMessage.class)) {
+                beforeRead.addStatement("if ($N != null) $N.beforeRead(set, marshallerObj)", objectName, objectName);
+            } else {
+                String methodType = methodNameResolver.resolveBaseMethodName(type);
+                switch (methodType) {
+                    case "ObjectArray":
+                        ArrayType arrayType = (ArrayType) type;
+                        if (typeUtils.isSubType(arrayType.getComponentType(), NetworkMessage.class)) {
+                            beforeRead.beginControlFlow("if ($N != null)", objectName);
+                            beforeRead.beginControlFlow("for (int i = 0; i < $N.length; i++)", objectName);
+                            beforeRead.addStatement("if ($N[i] != null) $N[i].beforeRead(set, marshallerObj)", objectName, objectName);
+                            beforeRead.endControlFlow();
+                            beforeRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    case "Collection":
+                        DeclaredType declaredType = (DeclaredType) type;
+                        TypeMirror elementType = declaredType.getTypeArguments().get(0);
+                        if (typeUtils.isSubType(elementType, NetworkMessage.class)) {
+                            beforeRead.beginControlFlow("if ($N != null)", objectName);
+                            beforeRead.beginControlFlow("for ($T obj : $N)", elementType, objectName);
+                            beforeRead.addStatement("if (obj != null) obj.beforeRead(set, marshallerObj)", objectName);
+                            beforeRead.endControlFlow();
+                            beforeRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    case "Map":
+                        DeclaredType mapType = (DeclaredType) type;
+                        TypeMirror keyType = mapType.getTypeArguments().get(0);
+                        boolean keyIsMessage = typeUtils.isSubType(keyType, NetworkMessage.class);
+                        TypeMirror valueType = mapType.getTypeArguments().get(1);
+                        boolean valueIsMessage = typeUtils.isSubType(valueType, NetworkMessage.class);
+
+                        if (keyIsMessage || valueIsMessage) {
+                            ParameterizedTypeName entryType = ParameterizedTypeName.get(ClassName.get(Map.Entry.class), TypeName.get(keyType), TypeName.get(valueType));
+                            ParameterizedTypeName entrySetType = ParameterizedTypeName.get(ClassName.get(Set.class), entryType);
+                            String entrySetName = objectName + "EntrySet";
+
+                            beforeRead.beginControlFlow("if ($N != null)", objectName);
+                            beforeRead.addStatement("$T $N = $N.entrySet()", entrySetType, entrySetName, objectName);
+                            beforeRead.beginControlFlow("for ($T entry : $N)", entryType, entrySetName);
+                            beforeRead.addStatement("$T key = entry.getKey()", keyType);
+                            beforeRead.addStatement("$T value = entry.getValue()", valueType);
+                            if (keyIsMessage) {
+                                beforeRead.addStatement("if (key != null) key.beforeRead(set, marshallerObj)");
+                            }
+                            if (valueIsMessage) {
+                                beforeRead.addStatement("if (value != null) value.beforeRead(set, marshallerObj)");
+                            }
+                            beforeRead.endControlFlow().addCode("\n");
+                            beforeRead.endControlFlow();
+                        }
+                        break;
+                    default:
+                        break;
+                }
+            }
+        });
+
+        beforeRead.addStatement("return set");
+
+        messageImplBuild.addMethod(beforeRead.build());
+    }
+
+    private void generateAfterReceive(TypeSpec.Builder messageImplBuild, MessageClass message) {
+        ParameterizedTypeName returnType = ParameterizedTypeName.get(ClassName.get(Set.class), TypeName.INT.box());
+        ParameterizedTypeName returnTypeImpl = ParameterizedTypeName.get(ClassName.get(HashSet.class), TypeName.INT.box());
+
+        Builder afterRead = MethodSpec.methodBuilder("afterRead")
+                .addAnnotation(Override.class)
+                .addModifiers(Modifier.PUBLIC)
+                .addException(Exception.class)
+                .addParameter(Object.class, "marshallerObj")
+                .addParameter(Object.class, "descriptorsObj");
+
+        ClassName marshallerClass = ClassName.get("org.apache.ignite.internal.network.serialization.marshal", "UserObjectMarshaller");
+        ClassName descriptorRegistryClass = ClassName.get("org.apache.ignite.internal.network.serialization", "DescriptorRegistry");
+
+        afterRead.addStatement("$T marshaller = ($T) marshallerObj", marshallerClass, marshallerClass);
+        afterRead.addStatement("$T descriptorRegistry = ($T) descriptorsObj", descriptorRegistryClass, descriptorRegistryClass);
+
+        message.getters().forEach(executableElement -> {
+            TypeMirror type = executableElement.getReturnType();
+            String objectName = executableElement.getSimpleName().toString();
+
+            if (executableElement.getAnnotation(Marshallable.class) != null) {
+                String baName = objectName + "ByteArray";
+                afterRead.addStatement("$N = marshaller.unmarshal($N, descriptorRegistry)", objectName, baName);
+            } else if (typeUtils.isSubType(type, NetworkMessage.class)) {
+                afterRead.addStatement("if ($N != null) $N.afterRead(marshallerObj, descriptorsObj)", objectName, objectName);
+            } else {
+                String methodType = methodNameResolver.resolveBaseMethodName(type);
+                switch (methodType) {
+                    case "ObjectArray":
+                        ArrayType arrayType = (ArrayType) type;
+                        if (typeUtils.isSubType(arrayType.getComponentType(), NetworkMessage.class)) {
+                            afterRead.beginControlFlow("if ($N != null)", objectName);
+                            afterRead.beginControlFlow("for (int i = 0; i < $N.length; i++)", objectName);
+                            afterRead.addStatement("if ($N[i] != null) $N[i].afterRead(marshallerObj, descriptorsObj)", objectName, objectName);
+                            afterRead.endControlFlow();
+                            afterRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    case "Collection":
+                        DeclaredType declaredType = (DeclaredType) type;
+                        TypeMirror elementType = declaredType.getTypeArguments().get(0);
+                        if (typeUtils.isSubType(elementType, NetworkMessage.class)) {
+                            afterRead.beginControlFlow("if ($N != null)", objectName);
+                            afterRead.beginControlFlow("for ($T obj : $N)", elementType, objectName);
+                            afterRead.addStatement("if (obj != null) obj.afterRead(marshallerObj, descriptorsObj)", objectName);
+                            afterRead.endControlFlow();
+                            afterRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    case "Map":
+                        DeclaredType mapType = (DeclaredType) type;
+                        TypeMirror keyType = mapType.getTypeArguments().get(0);
+                        boolean keyIsMessage = typeUtils.isSubType(keyType, NetworkMessage.class);
+                        TypeMirror valueType = mapType.getTypeArguments().get(1);
+                        boolean valueIsMessage = typeUtils.isSubType(valueType, NetworkMessage.class);
+
+                        if (keyIsMessage || valueIsMessage) {
+                            ParameterizedTypeName entryType = ParameterizedTypeName.get(ClassName.get(Map.Entry.class), TypeName.get(keyType), TypeName.get(valueType));
+                            ParameterizedTypeName entrySetType = ParameterizedTypeName.get(ClassName.get(Set.class), entryType);
+                            String entrySetName = objectName + "EntrySet";
+
+                            afterRead.beginControlFlow("if ($N != null)", objectName);
+                            afterRead.addStatement("$T $N = $N.entrySet()", entrySetType, entrySetName, objectName);
+                            afterRead.beginControlFlow("for ($T entry : $N)", entryType, entrySetName);
+                            afterRead.addStatement("$T key = entry.getKey()", keyType);
+                            afterRead.addStatement("$T value = entry.getValue()", valueType);
+                            if (keyIsMessage) {
+                                afterRead.addStatement("if (key != null) key.afterRead(marshallerObj, descriptorsObj)");
+                            }
+                            if (valueIsMessage) {
+                                afterRead.addStatement("if (value != null) value.afterRead(marshallerObj, descriptorsObj)");
+                            }
+                            afterRead.endControlFlow();
+                            afterRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    default:
+                        break;
+                }
+            }
+        });
+
+        messageImplBuild.addMethod(afterRead.build());
+    }
+
     /**
      * Generates implementations of {@link #hashCode} and {@link #equals} for the provided {@code message} and adds them to the provided
      * builder.
@@ -322,6 +539,7 @@ public class MessageImplGenerator {
         List<ExecutableElement> messageGetters = message.getters();
 
         var fields = new ArrayList<FieldSpec>(messageGetters.size());
+        var allFields = new ArrayList<FieldSpec>(messageGetters.size());
         var setters = new ArrayList<MethodSpec>(messageGetters.size());
         var getters = new ArrayList<MethodSpec>(messageGetters.size());
 
@@ -335,6 +553,7 @@ public class MessageImplGenerator {
                     .build();
 
             fields.add(field);
+            allFields.add(field);
 
             MethodSpec setter = MethodSpec.methodBuilder(getterName)
                     .addAnnotation(Override.class)
@@ -355,15 +574,45 @@ public class MessageImplGenerator {
                     .build();
 
             getters.add(getter);
+
+            if (messageGetter.getAnnotation(Marshallable.class) != null) {
+                String name = getterName + "ByteArray";
+                ArrayTypeName type = ArrayTypeName.of(TypeName.BYTE);
+                FieldSpec baField = FieldSpec.builder(type, name)
+                        .addModifiers(Modifier.PRIVATE)
+                        .build();
+
+                allFields.add(baField);
+
+                MethodSpec baSetter = MethodSpec.methodBuilder(name)
+                        .addAnnotation(Override.class)
+                        .addModifiers(Modifier.PUBLIC)
+                        .returns(builderName)
+                        .addParameter(type, name)
+                        .addStatement("this.$N = $L", baField, name)
+                        .addStatement("return this")
+                        .build();
+
+                setters.add(baSetter);
+
+                MethodSpec baGetter = MethodSpec.methodBuilder(name)
+                        .addAnnotation(Override.class)
+                        .addModifiers(Modifier.PUBLIC)
+                        .returns(type)
+                        .addStatement("return $N", baField)
+                        .build();
+
+                getters.add(baGetter);
+            }
         }
 
         return TypeSpec.classBuilder("Builder")
                 .addModifiers(Modifier.PRIVATE, Modifier.STATIC)
                 .addSuperinterface(builderName)
-                .addFields(fields)
+                .addFields(allFields)
                 .addMethods(setters)
                 .addMethods(getters)
-                .addMethod(buildMethod(message, messageImplClass, fields))
+                .addMethod(buildMethod(message, messageImplClass, allFields))
                 .build();
     }
 
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
index 00783d0..3a87c4a 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
@@ -38,7 +38,7 @@ import org.apache.ignite.network.NetworkMessage;
  * @see MessageReaderMethodResolver
  * @see MessageWriterMethodResolver
  */
-class BaseMethodNameResolver {
+public class BaseMethodNameResolver {
     /** Processing environment. */
     private final ProcessingEnvironment processingEnvironment;
 
@@ -47,7 +47,7 @@ class BaseMethodNameResolver {
      *
      * @param processingEnvironment Processing environment.
      */
-    BaseMethodNameResolver(ProcessingEnvironment processingEnvironment) {
+    public BaseMethodNameResolver(ProcessingEnvironment processingEnvironment) {
         this.processingEnvironment = processingEnvironment;
     }
 
@@ -57,7 +57,7 @@ class BaseMethodNameResolver {
      * @param parameterType parameter of the method to resolve
      * @return part of the method name, depending on the parameter type
      */
-    String resolveBaseMethodName(TypeMirror parameterType) {
+    public String resolveBaseMethodName(TypeMirror parameterType) {
         if (parameterType.getKind().isPrimitive()) {
             return resolvePrimitiveMethodName(parameterType);
         } else if (parameterType.getKind() == TypeKind.ARRAY) {
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
index 8ea3d12..93ae44b 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
@@ -30,6 +30,7 @@ import javax.lang.model.element.Modifier;
 import javax.tools.Diagnostic;
 import org.apache.ignite.internal.network.processor.MessageClass;
 import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
+import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.serialization.MessageDeserializer;
 import org.apache.ignite.network.serialization.MessageMappingException;
 import org.apache.ignite.network.serialization.MessageReader;
@@ -153,8 +154,14 @@ public class MessageDeserializerGenerator {
     private CodeBlock readMessageCodeBlock(ExecutableElement getter, FieldSpec msgField) {
         var methodResolver = new MessageReaderMethodResolver(processingEnv);
 
+        String name = getter.getSimpleName().toString();
+
+        if (getter.getAnnotation(Marshallable.class) != null) {
+            name += "ByteArray";
+        }
+
         return CodeBlock.builder()
-                .add("$N.$N(reader.", msgField, getter.getSimpleName())
+                .add("$N.$N(reader.", msgField, name)
                 .add(methodResolver.resolveReadMethod(getter))
                 .add(")")
                 .build();
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
index 13e9ae2..925c3f0 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
@@ -60,8 +60,9 @@ class MessageReaderMethodResolver {
         String parameterName = getter.getSimpleName().toString();
 
         if (getter.getAnnotation(Marshallable.class) != null) {
+            parameterName += "ByteArray";
             return CodeBlock.builder()
-                    .add("readMarshallable($S)", parameterName)
+                    .add("readByteArray($S)", parameterName)
                     .build();
         }
 
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
index 1926e8e..7d579e4 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
@@ -81,10 +81,12 @@ public class MessageSerializerGenerator {
                 .addAnnotation(Override.class)
                 .addModifiers(Modifier.PUBLIC)
                 .returns(boolean.class)
-                .addParameter(message.className(), "message")
+                .addParameter(message.className(), "msg")
                 .addParameter(MessageWriter.class, "writer")
                 .addException(MessageMappingException.class);
 
+        method.addStatement("$T message = ($T) msg", message.implClassName(), message.implClassName()).addCode("\n");
+
         List<ExecutableElement> getters = message.getters();
 
         method
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java
index 38aeb5e..29d973c 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java
@@ -35,7 +35,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
 /**
  * Class for resolving {@link MessageWriter} "write*" methods for the corresponding message field type.
  */
-class MessageWriterMethodResolver {
+public class MessageWriterMethodResolver {
     /** Method name resolver. */
     private final BaseMethodNameResolver methodNameResolver;
 
@@ -47,7 +47,7 @@ class MessageWriterMethodResolver {
      *
      * @param processingEnvironment Processing environment.
      */
-    MessageWriterMethodResolver(ProcessingEnvironment processingEnvironment) {
+    public MessageWriterMethodResolver(ProcessingEnvironment processingEnvironment) {
         methodNameResolver = new BaseMethodNameResolver(processingEnvironment);
         typeConverter = new MessageCollectionItemTypeConverter(processingEnvironment);
     }
@@ -72,8 +72,9 @@ class MessageWriterMethodResolver {
         String parameterName = getter.getSimpleName().toString();
 
         if (getter.getAnnotation(Marshallable.class) != null) {
+            parameterName += "ByteArray";
             return CodeBlock.builder()
-                    .add("writeMarshallable($S, message.$L())", parameterName, parameterName)
+                    .add("writeByteArray($S, message.$L())", parameterName, parameterName)
                     .build();
         }
 
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
index d0becd8..08c2534 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.network;
 
+import java.util.Collections;
+import java.util.Set;
+
 /**
  * Message for exchanging information in a cluster.
  */
@@ -41,4 +44,12 @@ public interface NetworkMessage {
      * @return group type.
      */
     short groupType();
+
+    default Set<Integer> beforeRead(Set<Integer> ids, Object marshaller) throws Exception {
+        return Collections.emptySet();
+    }
+
+    default void afterRead(Object marshaller, Object descriptors) throws Exception {
+        // No-op.
+    }
 }
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 447089b..9d92908 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -34,6 +34,7 @@ import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -53,6 +54,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 import org.apache.ignite.network.TestMessage;
 import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
 import org.apache.ignite.network.TestMessagesFactory;
@@ -108,13 +110,13 @@ public class ItConnectionManagerTest {
 
         var fut = new CompletableFuture<NetworkMessage>();
 
-        manager2.addListener((consistentId, message) -> fut.complete(message));
+        manager2.addListener((obj) -> fut.complete(obj.getMessage()));
 
         NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
 
         TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
 
-        sender.send(testMessage).get(3, TimeUnit.SECONDS);
+        sender.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
         NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
 
@@ -140,7 +142,7 @@ public class ItConnectionManagerTest {
 
         var fut = new CompletableFuture<NetworkMessage>();
 
-        manager1.addListener((consistentId, message) -> fut.complete(message));
+        manager1.addListener((obj) -> fut.complete(obj.getMessage()));
 
         NettySender senderFrom1to2 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
 
@@ -150,9 +152,9 @@ public class ItConnectionManagerTest {
         var messageReceivedOn2 = new CompletableFuture<Void>();
 
         // If the message is received, that means that the handshake was successfully performed.
-        manager2.addListener((consistentId, message) -> messageReceivedOn2.complete(null));
+        manager2.addListener((message) -> messageReceivedOn2.complete(null));
 
-        senderFrom1to2.send(testMessage);
+        senderFrom1to2.send(new NetworkObject(testMessage, Collections.emptyList()));
 
         messageReceivedOn2.get(3, TimeUnit.SECONDS);
 
@@ -164,7 +166,7 @@ public class ItConnectionManagerTest {
 
         assertEquals(clientLocalAddress, clientRemoteAddress);
 
-        senderFrom2to1.send(testMessage).get(3, TimeUnit.SECONDS);
+        senderFrom2to1.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
         NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
 
@@ -231,7 +233,7 @@ public class ItConnectionManagerTest {
 
         assertThrows(ClosedChannelException.class, () -> {
             try {
-                finalSender.send(testMessage).get(3, TimeUnit.SECONDS);
+                finalSender.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
             } catch (Exception e) {
                 throw e.getCause();
             }
@@ -241,11 +243,11 @@ public class ItConnectionManagerTest {
 
         var fut = new CompletableFuture<NetworkMessage>();
 
-        manager2.get1().addListener((consistentId, message) -> fut.complete(message));
+        manager2.get1().addListener((obj) -> fut.complete(obj.getMessage()));
 
         sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
 
-        sender.send(testMessage).get(3, TimeUnit.SECONDS);
+        sender.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
         NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
 
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
index 6ad6231..53b2463 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.mock;
 
 import io.netty.channel.Channel;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -52,6 +53,8 @@ import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
+import org.apache.ignite.network.TestMessage;
 import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
 import org.apache.ignite.network.TestMessagesFactory;
 import org.junit.jupiter.api.AfterEach;
@@ -130,7 +133,8 @@ public class ItRecoveryHandshakeTest {
         assertNotNull(from2to1);
 
         // Ensure the handshake has finished on both sides.
-        from2to1.send(messageFactory.testMessage().msg("test").build()).get(3, TimeUnit.SECONDS);
+        TestMessage msg = messageFactory.testMessage().msg("test").build();
+        from2to1.send(new NetworkObject(msg, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
         NettySender from1to2 = manager1.channel(manager2.consistentId(), manager2.getLocalAddress()).get(3, TimeUnit.SECONDS);
 
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index 87c215f..fd52931 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.network.NetworkMessageTypes;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
@@ -294,12 +294,16 @@ class ItScaleCubeNetworkMessagingTest {
         // register a different handle for the second group
         node1.messagingService().addMessageHandler(
                 NetworkMessageTypes.class,
-                (message, senderAddr, correlationId) -> assertTrue(networkMessageFuture.complete(message))
+                (message, senderAddr, correlationId) -> {
+                    if (message instanceof FieldDescriptorMessage) {
+                        assertTrue(networkMessageFuture.complete(message));
+                    }
+                }
         );
 
         var testMessage = messageFactory.testMessage().msg("foo").build();
 
-        ClassDescriptorMessage networkMessage = new NetworkMessagesFactory().classDescriptorMessage().build();
+        FieldDescriptorMessage networkMessage = new NetworkMessagesFactory().fieldDescriptorMessage().build();
 
         // test that a message gets delivered to both handlers
         node2.messagingService()
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
index bae7c7d..ee0df8b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.network;
 
+import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
 import org.apache.ignite.internal.network.message.InvokeRequest;
@@ -65,4 +66,9 @@ public class NetworkMessageTypes {
      * Type for {@link FieldDescriptorMessage}.
      */
     public static final short FIELD_DESCRIPTOR_MESSAGE = 6;
+
+    /**
+     * Type for {@link ClassDescriptorListMessage}.
+     */
+    public static final short CLASS_DESCRIPTOR_LIST_MESSAGE = 7;
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClassDescriptorListMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClassDescriptorListMessage.java
new file mode 100644
index 0000000..bfc7b97
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClassDescriptorListMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.message;
+
+import java.util.Collection;
+import org.apache.ignite.internal.network.NetworkMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+@Transferable(NetworkMessageTypes.CLASS_DESCRIPTOR_LIST_MESSAGE)
+public interface ClassDescriptorListMessage extends NetworkMessage {
+
+    Collection<ClassDescriptorMessage> messages();
+
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index d8f8967..a95d285 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -31,7 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 import org.apache.ignite.configuration.schemas.network.NetworkView;
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -71,7 +70,7 @@ public class ConnectionManager {
     private final SerializationService serializationService;
 
     /** Message listeners. */
-    private final List<BiConsumer<String, NetworkMessage>> listeners = new CopyOnWriteArrayList<>();
+    private final List<Consumer<InNetworkObject>> listeners = new CopyOnWriteArrayList<>();
 
     /** Node consistent id. */
     private final String consistentId;
@@ -197,11 +196,10 @@ public class ConnectionManager {
     /**
      * Callback that is called upon receiving a new message.
      *
-     * @param consistentId Consistent id of the message's sender.
      * @param message New message.
      */
-    private void onMessage(String consistentId, NetworkMessage message) {
-        listeners.forEach(consumer -> consumer.accept(consistentId, message));
+    private void onMessage(InNetworkObject message) {
+        listeners.forEach(consumer -> consumer.accept(message));
     }
 
     /**
@@ -243,7 +241,7 @@ public class ConnectionManager {
      *
      * @param listener Message listener.
      */
-    public void addListener(BiConsumer<String, NetworkMessage> listener) {
+    public void addListener(Consumer<InNetworkObject> listener) {
         listeners.add(listener);
     }
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
similarity index 51%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
index cc72427..85fb883 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
@@ -17,37 +17,32 @@
 
 package org.apache.ignite.internal.network.netty;
 
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import java.util.function.BiConsumer;
+import org.apache.ignite.internal.network.serialization.CompositeDescriptorRegistry;
 import org.apache.ignite.network.NetworkMessage;
 
-/**
- * Network message handler that delegates handling to {@link #messageListener}.
- */
-public class MessageHandler extends ChannelInboundHandlerAdapter {
-    /** Message listener. */
-    private final BiConsumer<String, NetworkMessage> messageListener;
+public class InNetworkObject {
+
+    private final NetworkMessage message;
 
-    /** Consistent id of the remote node. */
     private final String consistentId;
 
-    /**
-     * Constructor.
-     *
-     * @param messageListener Message listener.
-     * @param consistentId Consistent id of the remote node.
-     */
-    public MessageHandler(BiConsumer<String, NetworkMessage> messageListener, String consistentId) {
-        this.messageListener = messageListener;
+    private final CompositeDescriptorRegistry registry;
+    
+    public InNetworkObject(NetworkMessage message, String consistentId, CompositeDescriptorRegistry registry) {
+        this.message = message;
         this.consistentId = consistentId;
+        this.registry = registry;
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        NetworkMessage message = (NetworkMessage) msg;
+    public NetworkMessage getMessage() {
+        return message;
+    }
+
+    public String getConsistentId() {
+        return consistentId;
+    }
 
-        messageListener.accept(consistentId, message);
+    public CompositeDescriptorRegistry getRegistry() {
+        return registry;
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
index 49c2ed0..cd1bc2b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.ignite.internal.network.direct.DirectMarshallingUtils;
 import org.apache.ignite.internal.network.direct.DirectMessageReader;
+import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
 import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NetworkMessage;
@@ -107,7 +108,13 @@ public class InboundDecoder extends ByteToMessageDecoder {
                     reader.reset();
                     messageAttr.set(null);
 
-                    out.add(msg.getMessage());
+                    NetworkMessage message = msg.getMessage();
+
+                    if (message instanceof ClassDescriptorListMessage) {
+                        onClassDescriptorMessage((ClassDescriptorListMessage) message);
+                    } else {
+                        out.add(message);
+                    }
                 } else {
                     messageAttr.set(msg);
                 }
@@ -133,4 +140,8 @@ public class InboundDecoder extends ByteToMessageDecoder {
             }
         }
     }
+
+    private void onClassDescriptorMessage(ClassDescriptorListMessage msg) {
+        serializationService.mergeDescriptors(msg.messages());
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
index cc72427..c587071 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
@@ -19,7 +19,8 @@ package org.apache.ignite.internal.network.netty;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.network.NetworkMessage;
 
 /**
@@ -27,20 +28,23 @@ import org.apache.ignite.network.NetworkMessage;
  */
 public class MessageHandler extends ChannelInboundHandlerAdapter {
     /** Message listener. */
-    private final BiConsumer<String, NetworkMessage> messageListener;
+    private final Consumer<InNetworkObject> messageListener;
 
     /** Consistent id of the remote node. */
     private final String consistentId;
 
+    private final PerSessionSerializationService serializationService;
+
     /**
      * Constructor.
-     *
-     * @param messageListener Message listener.
+     *  @param messageListener Message listener.
      * @param consistentId Consistent id of the remote node.
+     * @param serializationService
      */
-    public MessageHandler(BiConsumer<String, NetworkMessage> messageListener, String consistentId) {
+    public MessageHandler(Consumer<InNetworkObject> messageListener, String consistentId, PerSessionSerializationService serializationService) {
         this.messageListener = messageListener;
         this.consistentId = consistentId;
+        this.serializationService = serializationService;
     }
 
     /** {@inheritDoc} */
@@ -48,6 +52,6 @@ public class MessageHandler extends ChannelInboundHandlerAdapter {
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
         NetworkMessage message = (NetworkMessage) msg;
 
-        messageListener.accept(consistentId, message);
+        messageListener.accept(new InNetworkObject(message, consistentId, serializationService.compositeDescriptorRegistry()));
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
index 1b1e4c7..7983e40 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
@@ -25,13 +25,12 @@ import io.netty.handler.stream.ChunkedWriteHandler;
 import java.net.SocketAddress;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -59,7 +58,7 @@ public class NettyClient {
     private volatile Channel channel = null;
 
     /** Message listener. */
-    private final BiConsumer<String, NetworkMessage> messageListener;
+    private final Consumer<InNetworkObject> messageListener;
 
     /** Handshake manager. */
     private final HandshakeManager handshakeManager;
@@ -79,7 +78,7 @@ public class NettyClient {
             SocketAddress address,
             SerializationService serializationService,
             HandshakeManager manager,
-            BiConsumer<String, NetworkMessage> messageListener
+            Consumer<InNetworkObject> messageListener
     ) {
         this.address = address;
         this.serializationService = serializationService;
@@ -113,7 +112,9 @@ public class NettyClient {
 
                     ch.pipeline().addLast(
                             new InboundDecoder(sessionSerializationService),
-                            new HandshakeHandler(handshakeManager, (consistentId) -> new MessageHandler(messageListener, consistentId)),
+                            new HandshakeHandler(handshakeManager,
+                                    (consistentId) -> new MessageHandler(messageListener, consistentId, sessionSerializationService)
+                            ),
                             new ChunkedWriteHandler(),
                             new OutboundEncoder(sessionSerializationService),
                             new IoExceptionSuppressingHandler()
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
index 6185994..46a598f 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
@@ -21,7 +21,7 @@ import io.netty.channel.Channel;
 import io.netty.handler.stream.ChunkedInput;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.direct.DirectMessageWriter;
-import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -56,8 +56,8 @@ public class NettySender {
      * @param msg Network message.
      * @return Future of the send operation.
      */
-    public CompletableFuture<Void> send(NetworkMessage msg) {
-        return NettyUtils.toCompletableFuture(channel.writeAndFlush(msg));
+    public CompletableFuture<Void> send(NetworkObject obj) {
+        return NettyUtils.toCompletableFuture(channel.writeAndFlush(obj));
     }
 
     /**
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index 89e7f8c..7050d8e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -27,7 +27,6 @@ import io.netty.handler.stream.ChunkedWriteHandler;
 import java.net.SocketAddress;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -37,7 +36,6 @@ import org.apache.ignite.internal.network.serialization.PerSessionSerializationS
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -58,7 +56,7 @@ public class NettyServer {
     private final SerializationService serializationService;
 
     /** Incoming message listener. */
-    private final BiConsumer<String, NetworkMessage> messageListener;
+    private final Consumer<InNetworkObject> messageListener;
 
     /** Handshake manager. */
     private final Supplier<HandshakeManager> handshakeManager;
@@ -94,7 +92,7 @@ public class NettyServer {
             NetworkView configuration,
             Supplier<HandshakeManager> handshakeManager,
             Consumer<NettySender> newConnectionListener,
-            BiConsumer<String, NetworkMessage> messageListener,
+            Consumer<InNetworkObject> messageListener,
             SerializationService serializationService,
             NettyBootstrapFactory bootstrapFactory
     ) {
@@ -139,7 +137,9 @@ public class NettyServer {
                                      */
                                     new InboundDecoder(sessionSerializationService),
                                     // Handshake handler.
-                                    new HandshakeHandler(manager, (consistentId) -> new MessageHandler(messageListener, consistentId)),
+                                    new HandshakeHandler(manager,
+                                            (consistentId) -> new MessageHandler(messageListener, consistentId, sessionSerializationService)
+                                    ),
                                     /*
                                      * Encoder that uses the MessageWriter
                                      * to write chunked data.
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index c629084..fa6f04b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -24,15 +24,20 @@ import io.netty.handler.codec.MessageToMessageEncoder;
 import io.netty.handler.stream.ChunkedInput;
 import java.nio.ByteBuffer;
 import java.util.List;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
 import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 import org.apache.ignite.network.serialization.MessageSerializer;
 
 /**
  * An encoder for the outbound messages that uses {@link DirectMessageWriter}.
  */
-public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
+public class OutboundEncoder extends MessageToMessageEncoder<NetworkObject> {
+    private static final NetworkMessagesFactory MSG_FACTORY = new NetworkMessagesFactory();
+
     /** Serialization registry. */
     private final PerSessionSerializationService serializationService;
 
@@ -47,7 +52,7 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
 
     /** {@inheritDoc} */
     @Override
-    protected void encode(ChannelHandlerContext ctx, NetworkMessage msg, List<Object> out) throws Exception {
+    protected void encode(ChannelHandlerContext ctx, NetworkObject msg, List<Object> out) throws Exception {
         out.add(new NetworkMessageChunkedInput(msg, serializationService));
     }
 
@@ -61,11 +66,17 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
         /** Message serializer. */
         private final MessageSerializer<NetworkMessage> serializer;
 
+        private final MessageSerializer<ClassDescriptorListMessage> descriptorSerializer;
+
         /** Message writer. */
         private final DirectMessageWriter writer;
 
+        private final ClassDescriptorListMessage descriptors;
+        private final PerSessionSerializationService serializationService;
+
         /** Whether the message was fully written. */
         private boolean finished = false;
+        private boolean descriptorsFinished = false;
 
         /**
          * Constructor.
@@ -74,10 +85,23 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
          * @param serializationService Serialization service.
          */
         private NetworkMessageChunkedInput(
-                NetworkMessage msg,
+                NetworkObject object,
                 PerSessionSerializationService serializationService
         ) {
-            this.msg = msg;
+            this.serializationService = serializationService;
+            this.msg = object.getNetworkMessage();
+
+            if (!object.getDescriptors().isEmpty()) {
+                descriptors = MSG_FACTORY.classDescriptorListMessage().messages(object.getDescriptors()).build();
+                short groupType = descriptors.groupType();
+                short messageType = descriptors.messageType();
+                descriptorSerializer = serializationService.createMessageSerializer(groupType, messageType);
+            } else {
+                descriptors = null;
+                descriptorSerializer = null;
+                descriptorsFinished = true;
+            }
+
             this.serializer = serializationService.createMessageSerializer(msg.groupType(), msg.messageType());
             this.writer = new DirectMessageWriter(serializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
         }
@@ -113,7 +137,14 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
 
             writer.setBuffer(byteBuffer);
 
-            finished = serializer.writeMessage(msg, writer);
+            if (!descriptorsFinished) {
+                descriptorsFinished = descriptorSerializer.writeMessage(descriptors, writer);
+                if (descriptorsFinished) {
+                    writer.reset();
+                }
+            } else {
+                finished = serializer.writeMessage(msg, writer);
+            }
 
             buffer.writerIndex(byteBuffer.position() - initialPosition);
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 5f37db8..8c29293 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.recovery;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 
 /**
  * Recovery protocol handshake manager for a client.
@@ -78,7 +80,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
                     .connectionsCount(0)
                     .build();
 
-            ChannelFuture sendFuture = channel.writeAndFlush(response);
+            ChannelFuture sendFuture = channel.writeAndFlush(new NetworkObject(response, Collections.emptyList()));
 
             NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
                 if (throwable != null) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index 4954981..bb0b5f3 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.recovery;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 
 /**
  * Recovery protocol handshake manager for a server.
@@ -76,7 +78,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
                 .consistentId(consistentId)
                 .build();
 
-        ChannelFuture sendFuture = channel.writeAndFlush(handshakeStartMessage);
+        ChannelFuture sendFuture = channel.writeAndFlush(new NetworkObject(handshakeStartMessage, Collections.emptyList()));
 
         NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
             if (throwable != null) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
index 288b51e..4ce1381 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
@@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -184,6 +185,14 @@ public class PerSessionSerializationService {
         return messages;
     }
 
+    public boolean isSent(int descriptorId) {
+        return sentDescriptors.contains(descriptorId);
+    }
+
+    public void addSent(int descriptorId) {
+        sentDescriptors.add(descriptorId);
+    }
+
     private byte fieldFlags(FieldDescriptor fieldDescriptor) {
         int bits = condMask(fieldDescriptor.isUnshared(), FieldDescriptorMessage.UNSHARED_MASK)
                 | condMask(fieldDescriptor.isPrimitive(), FieldDescriptorMessage.IS_PRIMITIVE)
@@ -232,7 +241,7 @@ public class PerSessionSerializationService {
         return id;
     }
 
-    private void mergeDescriptors(List<ClassDescriptorMessage> remoteDescriptors) {
+    public void mergeDescriptors(Collection<ClassDescriptorMessage> remoteDescriptors) {
         List<ClassDescriptorMessage> leftToProcess = remoteDescriptors.stream()
                 .filter(classMessage -> !knownMergedDescriptor(classMessage.descriptorId()))
                 .collect(toCollection(LinkedList::new));
@@ -378,6 +387,11 @@ public class PerSessionSerializationService {
         }
     }
 
+    public CompositeDescriptorRegistry compositeDescriptorRegistry() {
+        return descriptors;
+    }
+
+
     @TestOnly
     Map<Integer, ClassDescriptor> getDescriptorMapView() {
         return mergedIdToDescriptorMap;
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 9d8965d..2ee7961 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -18,22 +18,40 @@
 package org.apache.ignite.network;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.network.NettyBootstrapFactory.isInNetworkThread;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
 import org.apache.ignite.internal.network.message.InvokeRequest;
 import org.apache.ignite.internal.network.message.InvokeResponse;
-import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.InNetworkObject;
 import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.internal.network.serialization.ClassDescriptor;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.CompositeDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.FieldDescriptor;
+import org.apache.ignite.internal.network.serialization.Serialization;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.jetbrains.annotations.Nullable;
 
@@ -42,9 +60,15 @@ public class DefaultMessagingService extends AbstractMessagingService {
     /** Network messages factory. */
     private final NetworkMessagesFactory factory;
 
+    /** Integer value that is sent when there is no descriptor. */
+    private static final int NO_DESCRIPTOR_ID = Integer.MIN_VALUE;
+
     /** Topology service. */
     private final TopologyService topologyService;
 
+    private final UserObjectMarshaller marshaller;
+    private final ClassDescriptorRegistry classDescriptorRegistry;
+
     /** Connection manager that provides access to {@link NettySender}. */
     private volatile ConnectionManager connectionManager;
 
@@ -75,6 +99,9 @@ public class DefaultMessagingService extends AbstractMessagingService {
     public DefaultMessagingService(NetworkMessagesFactory factory, TopologyService topologyService) {
         this.factory = factory;
         this.topologyService = topologyService;
+        UserObjectSerializationContext userObjectSerializationContext = createUserObjectSerializationContext();
+        this.marshaller = userObjectSerializationContext.marshaller();
+        this.classDescriptorRegistry = userObjectSerializationContext.descriptorRegistry();
     }
 
     /**
@@ -158,7 +185,24 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         String recipientConsistentId = recipient != null ? recipient.name() : address.consistentId();
 
-        return connectionManager.channel(recipientConsistentId, addr).thenCompose(sender -> sender.send(message));
+        return this.sendMessage0(message, recipientConsistentId, addr);
+    }
+
+    private CompletableFuture<Void> sendMessage0(NetworkMessage message, String recipientConsistentId, InetSocketAddress addr) {
+        if (isInNetworkThread()) {
+            return CompletableFuture.supplyAsync(() -> sendMessage0(message, recipientConsistentId, addr), svc).thenCompose(fut -> fut);
+        }
+
+        List<ClassDescriptorMessage> descriptors;
+
+        try {
+            descriptors = beforeRead(message);
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        }
+
+        return connectionManager.channel(recipientConsistentId, addr).thenCompose(
+                sender -> sender.send(new NetworkObject(message, descriptors)));
     }
 
     /**
@@ -194,8 +238,95 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         String recipientConsistentId = recipient != null ? recipient.name() : addr.consistentId();
 
-        return connectionManager.channel(recipientConsistentId, address).thenCompose(sender -> sender.send(message))
-                .thenCompose(unused -> responseFuture);
+        return sendMessage0(message, recipientConsistentId, address).thenCompose(unused -> responseFuture);
+    }
+
+    private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws Exception {
+        Set<Integer> ids = msg.beforeRead(new HashSet<>(), marshaller);
+
+        return createClassDescriptorsMessages(ids);
+    }
+
+    public List<ClassDescriptorMessage> createClassDescriptorsMessages(Set<Integer> descriptorIds) {
+        List<ClassDescriptorMessage> messages = descriptorIds.stream()
+                .map(classDescriptorRegistry::getDescriptor)
+                .map(descriptor -> {
+                    List<FieldDescriptorMessage> fields = descriptor.fields().stream()
+                            .map(d -> {
+                                return factory.fieldDescriptorMessage()
+                                    .name(d.name())
+                                    .typeDescriptorId(d.typeDescriptorId())
+                                    .className(d.typeName())
+                                    .flags(fieldFlags(d))
+                                    .build();
+                            })
+                            .collect(toList());
+
+                    Serialization serialization = descriptor.serialization();
+
+                    return factory.classDescriptorMessage()
+                        .fields(fields)
+                        .serializationType((byte) serialization.type().value())
+                        .serializationFlags(serializationAttributeFlags(serialization))
+                        .descriptorId(descriptor.descriptorId())
+                        .className(descriptor.className())
+                        .superClassDescriptorId(superClassDescriptorIdForMessage(descriptor))
+                        .superClassName(descriptor.superClassName())
+                        .componentTypeDescriptorId(componentTypeDescriptorIdForMessage(descriptor))
+                        .componentTypeName(descriptor.componentTypeName())
+                        .attributes(classDescriptorAttributeFlags(descriptor))
+                        .build();
+                }).collect(toList());
+
+        return messages;
+    }
+
+    private byte fieldFlags(FieldDescriptor fieldDescriptor) {
+        int bits = condMask(fieldDescriptor.isUnshared(), FieldDescriptorMessage.UNSHARED_MASK)
+                | condMask(fieldDescriptor.isPrimitive(), FieldDescriptorMessage.IS_PRIMITIVE)
+                | condMask(fieldDescriptor.isRuntimeTypeKnownUpfront(), FieldDescriptorMessage.IS_RUNTIME_TYPE_KNOWN_UPFRONT);
+        return (byte) bits;
+    }
+
+    private byte serializationAttributeFlags(Serialization serialization) {
+        int bits = condMask(serialization.hasWriteObject(), ClassDescriptorMessage.HAS_WRITE_OBJECT_MASK)
+                | condMask(serialization.hasReadObject(), ClassDescriptorMessage.HAS_READ_OBJECT_MASK)
+                | condMask(serialization.hasReadObjectNoData(), ClassDescriptorMessage.HAS_READ_OBJECT_NO_DATA_MASK)
+                | condMask(serialization.hasWriteReplace(), ClassDescriptorMessage.HAS_WRITE_REPLACE_MASK)
+                | condMask(serialization.hasReadResolve(), ClassDescriptorMessage.HAS_READ_RESOLVE_MASK);
+        return (byte) bits;
+    }
+
+    private int condMask(boolean value, int mask) {
+        return value ? mask : 0;
+    }
+
+    private byte classDescriptorAttributeFlags(ClassDescriptor descriptor) {
+        int bits = condMask(descriptor.isPrimitive(), ClassDescriptorMessage.IS_PRIMITIVE_MASK)
+                | condMask(descriptor.isArray(), ClassDescriptorMessage.IS_ARRAY_MASK)
+                | condMask(descriptor.isRuntimeEnum(), ClassDescriptorMessage.IS_RUNTIME_ENUM_MASK)
+                | condMask(descriptor.isRuntimeTypeKnownUpfront(), ClassDescriptorMessage.IS_RUNTIME_TYPE_KNOWN_UPFRONT_MASK);
+        return (byte) bits;
+    }
+
+    private int superClassDescriptorIdForMessage(ClassDescriptor descriptor) {
+        Integer id = descriptor.superClassDescriptorId();
+
+        if (id == null) {
+            return NO_DESCRIPTOR_ID;
+        }
+
+        return id;
+    }
+
+    private int componentTypeDescriptorIdForMessage(ClassDescriptor descriptor) {
+        Integer id = descriptor.componentTypeDescriptorId();
+
+        if (id == null) {
+            return NO_DESCRIPTOR_ID;
+        }
+
+        return id;
     }
 
     /**
@@ -212,18 +343,28 @@ public class DefaultMessagingService extends AbstractMessagingService {
         }
     }
 
+    private final ExecutorService svc = Executors.newSingleThreadExecutor();
+
     /**
      * Handles an incoming messages.
      *
      * @param consistentId Sender's consistent id.
      * @param msg Incoming message.
      */
-    private void onMessage(String consistentId, NetworkMessage msg) {
-        if (msg instanceof ScaleCubeMessage) {
-            // ScaleCube messages are handled in the ScaleCubeTransport
+    private void onMessage(InNetworkObject obj) {
+        if (isInNetworkThread()) {
+            svc.submit(() -> onMessage(obj));
             return;
         }
 
+        NetworkMessage msg = obj.getMessage();
+        CompositeDescriptorRegistry registry = obj.getRegistry();
+        String consistentId = obj.getConsistentId();
+        try {
+            msg.afterRead(marshaller, registry);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
         if (msg instanceof InvokeResponse) {
             InvokeResponse response = (InvokeResponse) msg;
             onInvokeResponse(response.message(), response.correlationId());
@@ -345,4 +486,14 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         requestsMap.clear();
     }
+
+    private UserObjectSerializationContext createUserObjectSerializationContext() {
+        var userObjectDescriptorRegistry = new ClassDescriptorRegistry();
+        var userObjectDescriptorFactory = new ClassDescriptorFactory(userObjectDescriptorRegistry);
+
+        var userObjectMarshaller = new DefaultUserObjectMarshaller(userObjectDescriptorRegistry, userObjectDescriptorFactory);
+
+        return new UserObjectSerializationContext(userObjectDescriptorRegistry, userObjectDescriptorFactory,
+            userObjectMarshaller);
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
index 0d11ccf..a4b7fa8 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
@@ -139,6 +139,22 @@ public class NettyBootstrapFactory implements IgniteComponent {
         clientWorkerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-client");
     }
 
+    public static boolean isInNetworkThread() {
+        String name = Thread.currentThread().getName();
+
+        if (name.contains("-srv-worker")) {
+            return true;
+        }
+        if (name.contains("-client")) {
+            return true;
+        }
+        if (name.contains("-srv-accept")) {
+            return true;
+        }
+
+        return false;
+    }
+
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkObject.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkObject.java
new file mode 100644
index 0000000..9b0d7fb
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkObject.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.List;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+
+public class NetworkObject {
+
+    private final NetworkMessage networkMessage;
+    private final List<ClassDescriptorMessage> descriptors;
+
+    public NetworkObject(NetworkMessage networkMessage, List<ClassDescriptorMessage> descriptors) {
+        this.networkMessage = networkMessage;
+        this.descriptors = descriptors;
+    }
+
+    public NetworkMessage getNetworkMessage() {
+        return networkMessage;
+    }
+
+    public List<ClassDescriptorMessage> getDescriptors() {
+        return descriptors;
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index e1faa56..f8f77be 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -104,7 +104,9 @@ public class ScaleCubeClusterServiceFactory {
                         nettyBootstrapFactory
                 );
 
-                var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr, topologyService, messageFactory);
+                connectionMgr.start();
+
+                var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr.getLocalAddress(), messagingService, topologyService, messageFactory);
 
                 NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder());
 
@@ -122,8 +124,6 @@ public class ScaleCubeClusterServiceFactory {
 
                 shutdownFuture = cluster.onShutdown().toFuture();
 
-                connectionMgr.start();
-
                 // resolve cyclic dependencies
                 topologyService.setCluster(cluster);
                 messagingService.setConnectionManager(connectionMgr);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index ad763de..e263467 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Map;
 import java.util.Objects;
+import org.apache.ignite.internal.network.NetworkMessageTypes;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import org.apache.ignite.internal.network.message.ScaleCubeMessageBuilder;
@@ -32,6 +33,7 @@ import org.apache.ignite.internal.network.netty.ConnectionManager;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
@@ -63,7 +65,7 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
     private final MonoProcessor<Void> onStop = MonoProcessor.create();
 
     /** Connection manager. */
-    private final ConnectionManager connectionManager;
+    private final MessagingService messagingService;
 
     /** Message factory. */
     private final NetworkMessagesFactory messageFactory;
@@ -82,15 +84,17 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
      * @param messageFactory    message factory
      */
     ScaleCubeDirectMarshallerTransport(
-            ConnectionManager connectionManager,
+            SocketAddress localAddress,
+            MessagingService messagingService,
             ScaleCubeTopologyService topologyService,
             NetworkMessagesFactory messageFactory
     ) {
-        this.connectionManager = connectionManager;
+        this.address = prepareAddress(localAddress);
+        this.messagingService = messagingService;
         this.topologyService = topologyService;
         this.messageFactory = messageFactory;
 
-        this.connectionManager.addListener(this::onMessage);
+        this.messagingService.addMessageHandler(NetworkMessageTypes.class, (message, senderAddr, correlationId) -> onMessage(message));
         // Setup cleanup
         stop.then(doStop())
                 .doFinally(s -> onStop.onComplete())
@@ -145,8 +149,6 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
     /** {@inheritDoc} */
     @Override
     public Mono<Transport> start() {
-        address = prepareAddress(connectionManager.getLocalAddress());
-
         return Mono.just(this);
     }
 
@@ -171,21 +173,23 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
         var addr = InetSocketAddress.createUnresolved(address.host(), address.port());
 
         return Mono.fromFuture(() -> {
-            ClusterNode node = topologyService.getByAddress(NetworkAddress.from(addr));
+            NetworkAddress networkAddress = NetworkAddress.from(addr);
+            ClusterNode node = topologyService.getByAddress(networkAddress);
 
-            String consistentId = node != null ? node.name() : null;
+            if (node == null) {
+                node = new ClusterNode(null, null, networkAddress);
+            }
 
-            return connectionManager.channel(consistentId, addr).thenCompose(client -> client.send(fromMessage(message)));
+            return messagingService.send(node, fromMessage(message));
         });
     }
 
     /**
      * Handles new network messages from {@link #connectionManager}.
      *
-     * @param senderConsistentId Sender's consistent id.
      * @param msg    Network message.
      */
-    private void onMessage(String senderConsistentId, NetworkMessage msg) {
+    private void onMessage(NetworkMessage msg) {
         Message message = fromNetworkMessage(msg);
 
         if (message != null) {
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index cc6cfb0..54ab776 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -168,7 +168,7 @@ public class NettyClientTest {
                 address,
                 null,
                 new MockClientHandshakeManager(channel),
-                (address1, message) -> {
+                (message) -> {
                 }
         );
 
@@ -190,7 +190,7 @@ public class NettyClientTest {
                 address,
                 null,
                 new MockClientHandshakeManager(future.channel()),
-                (address1, message) -> {
+                (message) -> {
                 }
         );
 
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index e59db1a..4e84a89 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -183,7 +183,7 @@ public class NettyServerTest {
                 () -> handshakeManager,
                 sender -> {
                 },
-                (socketAddress, message) -> {
+                (message) -> {
                 },
                 new SerializationService(registry, mock(UserObjectSerializationContext.class)),
                 bootstrapFactory
diff --git a/modules/raft/pom.xml b/modules/raft/pom.xml
index 5d88ad5..2a3e2bc 100644
--- a/modules/raft/pom.xml
+++ b/modules/raft/pom.xml
@@ -118,7 +118,6 @@
         <dependency>
             <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-network</artifactId>
-            <scope>test</scope>
         </dependency>
 
         <dependency>
diff --git a/modules/transactions/pom.xml b/modules/transactions/pom.xml
index 7ffc632..26155ec 100644
--- a/modules/transactions/pom.xml
+++ b/modules/transactions/pom.xml
@@ -50,6 +50,11 @@
 
     <dependency>
       <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-network</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
       <artifactId>ignite-raft-client</artifactId>
     </dependency>