You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/03/04 11:04:00 UTC

[ignite-3] branch ignite-16393 updated (4bda819 -> 97f6c4e)

This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a change to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git.


 discard 4bda819  .
 discard 5907a5d  .
 discard 6c24e92  .
 discard b4355b7  .
 discard a5e8c16  .
 discard 38a54b1  .
 discard 6850bed  .
 discard 5818509  .
 discard 4869c13  .
     add ed519fb  IGNITE-16498 Add ability to add handlers to RestModule from other Ignite modules (#690)
     add 157a8f0  IGNITE-16633 Adoption of a bunch of calcite related tickets from Ignite-2 - Fixes #698.
     new 24c8e94  .
     new 2706e42  .
     new 448bed2  .
     new 69224dc  .
     new 5462d76  .
     new 1bdeb5e  .
     new b013634  .
     new cbc4ec8  .
     new a22ce45  .
     new 925295e  .
     new 38c4404  .
     new 97f6c4e  .

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (4bda819)
            \
             N -- N -- N   refs/heads/ignite-16393 (97f6c4e)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 modules/api/pom.xml                                |   5 -
 modules/configuration/pom.xml                      |  10 +
 .../rest/ConfigurationHttpHandlers.java}           | 168 ++-------
 .../presentation/ConfigurationPresentation.java    |   2 +-
 .../rest/presentation/hocon/HoconPresentation.java |   4 +-
 .../rest/presentation/hocon/package-info.java      |   2 +-
 .../rest/presentation/package-info.java            |   2 +-
 .../ConfigurationPresentationTest.java             |   4 +-
 .../internal/network/netty/OutboundEncoder.java    |  18 +-
 .../ignite/network/DefaultMessagingService.java    |   2 +-
 modules/page-memory/pom.xml                        |   5 +
 .../raft/server/ItJraftCounterServerTest.java      |  42 +--
 modules/{vault => rest-api}/pom.xml                |  41 ++-
 .../ignite/internal/rest/api}/ErrorResult.java     |   2 +-
 .../ignite/internal/rest/api}/RequestHandler.java  |   4 +-
 .../internal/rest/api}/RestApiHttpRequest.java     |   2 +-
 .../internal/rest/api}/RestApiHttpResponse.java    |   2 +-
 .../internal/rest/api/RestHandlersRegister.java}   |  15 +-
 .../apache/ignite/internal/rest/api}/Route.java    |   4 +-
 .../apache/ignite/internal/rest/api/Routes.java    |  83 +++++
 .../rest/api}/RestApiHttpResponseTest.java         |   3 +-
 modules/rest/pom.xml                               |   5 +
 .../apache/ignite/internal/rest/RestComponent.java | 149 ++++++++
 .../ignite/internal/rest/netty/RestApiHandler.java |   1 +
 .../apache/ignite/internal/rest/routes/Router.java | 102 +-----
 .../ignite/internal/rest/routes/SimpleRouter.java  |  71 ++++
 .../ignite/internal/rest/routes/RouteTest.java     |   2 +
 .../internal/sql/engine/ItDataTypesTest.java       |  64 +++-
 .../internal/sql/engine/ItFunctionsTest.java       |   9 +
 .../ignite/internal/sql/engine/ItIntervalTest.java | 398 +++++++++++++++++++++
 .../ignite/internal/sql/engine/ItMetadataTest.java |   9 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  15 +-
 .../CoreDistributedConfigurationModule.java        |   4 +-
 .../CoreLocalConfigurationModule.java              |   4 +-
 ...nite.internal.configuration.ConfigurationModule |   4 +-
 .../CoreDistributedConfigurationModuleTest.java    |   3 +-
 .../CoreLocalConfigurationModuleTest.java          |   3 +-
 modules/sql-engine/src/main/codegen/config.fmpp    |   1 +
 .../src/main/codegen/includes/parserImpls.ftl      |  32 +-
 .../sql/engine/exec/exp/IgniteBuiltInMethod.java   |   8 +-
 .../internal/sql/engine/exec/exp/RexImpTable.java  |  15 +-
 .../sql/engine/exec/exp/RexToLixTranslator.java    |  39 ++
 .../internal/sql/engine/externalize/RelJson.java   |  10 +-
 .../sql/engine/prepare/IgniteConvertletTable.java  | 128 +++++++
 .../sql/engine/prepare/IgniteTypeCoercion.java     |  58 +++
 .../engine/sql/IgniteSqlIntervalTypeNameSpec.java  |  62 ++++
 .../sql/engine/type/IgniteTypeFactory.java         |  59 ++-
 .../ignite/internal/sql/engine/util/Commons.java   |   6 +-
 .../ignite/internal/sql/engine/util/TypeUtils.java |  36 +-
 .../sql/engine/planner/TableSpoolPlannerTest.java  |   6 +-
 parent/pom.xml                                     |   6 +
 pom.xml                                            |   1 +
 52 files changed, 1357 insertions(+), 373 deletions(-)
 rename modules/{rest/src/main/java/org/apache/ignite/internal/rest/RestModule.java => configuration/src/main/java/org/apache/ignite/internal/configuration/rest/ConfigurationHttpHandlers.java} (51%)
 rename modules/{rest/src/main/java/org/apache/ignite/internal => configuration/src/main/java/org/apache/ignite/internal/configuration}/rest/presentation/ConfigurationPresentation.java (97%)
 rename modules/{rest/src/main/java/org/apache/ignite/internal => configuration/src/main/java/org/apache/ignite/internal/configuration}/rest/presentation/hocon/HoconPresentation.java (95%)
 rename modules/{rest/src/main/java/org/apache/ignite/internal => configuration/src/main/java/org/apache/ignite/internal/configuration}/rest/presentation/hocon/package-info.java (92%)
 rename modules/{rest/src/main/java/org/apache/ignite/internal => configuration/src/main/java/org/apache/ignite/internal/configuration}/rest/presentation/package-info.java (93%)
 rename modules/{rest/src/test/java/org/apache/ignite/internal => configuration/src/test/java/org/apache/ignite/internal/configuration}/rest/presentation/ConfigurationPresentationTest.java (97%)
 copy modules/{vault => rest-api}/pom.xml (73%)
 rename modules/{rest/src/main/java/org/apache/ignite/internal/rest => rest-api/src/main/java/org/apache/ignite/internal/rest/api}/ErrorResult.java (97%)
 rename modules/{rest/src/main/java/org/apache/ignite/internal/rest/routes => rest-api/src/main/java/org/apache/ignite/internal/rest/api}/RequestHandler.java (89%)
 rename modules/{rest/src/main/java/org/apache/ignite/internal/rest/netty => rest-api/src/main/java/org/apache/ignite/internal/rest/api}/RestApiHttpRequest.java (97%)
 rename modules/{rest/src/main/java/org/apache/ignite/internal/rest/netty => rest-api/src/main/java/org/apache/ignite/internal/rest/api}/RestApiHttpResponse.java (98%)
 copy modules/{api/src/main/java/org/apache/ignite/binary/BinaryObject.java => rest-api/src/main/java/org/apache/ignite/internal/rest/api/RestHandlersRegister.java} (72%)
 rename modules/{rest/src/main/java/org/apache/ignite/internal/rest/routes => rest-api/src/main/java/org/apache/ignite/internal/rest/api}/Route.java (96%)
 create mode 100644 modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/Routes.java
 rename modules/{rest/src/test/java/org/apache/ignite/internal/rest/netty => rest-api/src/test/java/org/apache/ignite/internal/rest/api}/RestApiHttpResponseTest.java (95%)
 create mode 100644 modules/rest/src/main/java/org/apache/ignite/internal/rest/RestComponent.java
 create mode 100644 modules/rest/src/main/java/org/apache/ignite/internal/rest/routes/SimpleRouter.java
 create mode 100644 modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIntervalTest.java
 rename modules/{api/src/main/java/org/apache/ignite => runner/src/main/java/org/apache/ignite/internal}/configuration/CoreDistributedConfigurationModule.java (96%)
 rename modules/{api/src/main/java/org/apache/ignite => runner/src/main/java/org/apache/ignite/internal}/configuration/CoreLocalConfigurationModule.java (94%)
 rename modules/{api => runner}/src/main/resources/META-INF/services/org.apache.ignite.internal.configuration.ConfigurationModule (84%)
 rename modules/{api/src/test/java/org/apache/ignite => runner/src/test/java/org/apache/ignite/internal}/configuration/CoreDistributedConfigurationModuleTest.java (97%)
 rename modules/{api/src/test/java/org/apache/ignite => runner/src/test/java/org/apache/ignite/internal}/configuration/CoreLocalConfigurationModuleTest.java (96%)
 create mode 100644 modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteConvertletTable.java
 create mode 100644 modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/IgniteTypeCoercion.java
 create mode 100644 modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlIntervalTypeNameSpec.java

[ignite-3] 03/12: .

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 448bed2447e9c658ef509c34b58271703cd20105
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Mar 3 14:27:14 2022 +0300

    .
---
 .../processor/messages/MessageImplGenerator.java   |   2 +
 modules/network/pom.xml                            |  12 +
 .../ItTransferableObjectProcessorTest.java         | 261 +++++++++++++++++++++
 .../network/processor/AllTypesMessage.java         |   0
 .../network/processor/ConflictingTypeMessage.java  |   0
 .../network/processor/ITTestMessageGroup.java      |   0
 .../network/processor/InheritedMessageClash.java   |   0
 .../processor/InvalidAnnotatedTypeMessage.java     |   0
 .../processor/InvalidParameterGetterMessage.java   |   0
 .../processor/InvalidReturnTypeGetterMessage.java  |   0
 .../internal/network/processor/SecondGroup.java    |   0
 .../network/processor/TransitiveMessage.java       |   0
 .../processor/UnmarshallableTypeMessage.java       |   0
 .../UnmarshallableTypeNonSerializableMessage.java  |   0
 .../ignite/network/DefaultMessagingService.java    |  23 +-
 .../network/serialization/MarshallableTest.java    |  51 +++-
 16 files changed, 329 insertions(+), 20 deletions(-)

diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index 95f0ba2..c76b227 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -249,6 +249,7 @@ public class MessageImplGenerator {
                 String baName = objectName + "ByteArray";
                 String moName = baName + "mo";
                 beforeRead.addStatement("$T $N = marshaller.marshal($N)", marshalledObjectClass, moName, objectName);
+                beforeRead.addStatement("$N = null", objectName);
                 beforeRead.addStatement("set.addAll($N.usedDescriptorIds())", moName);
                 beforeRead.addStatement("$N = $N.bytes()", baName, moName).addCode("\n");
             } else {
@@ -342,6 +343,7 @@ public class MessageImplGenerator {
             if (executableElement.getAnnotation(Marshallable.class) != null) {
                 String baName = objectName + "ByteArray";
                 afterRead.addStatement("$N = marshaller.unmarshal($N, descriptorRegistry)", objectName, baName);
+                afterRead.addStatement("$N = null", baName);
             } else {
                 Type objectType = resolveType(type);
 
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index 20603b8..aa6c4b3 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -150,6 +150,18 @@
             <artifactId>byte-buddy</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.google.testing.compile</groupId>
+            <artifactId>compile-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-network-annotation-processor</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/processor/ItTransferableObjectProcessorTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/processor/ItTransferableObjectProcessorTest.java
new file mode 100644
index 0000000..9d9d479
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/processor/ItTransferableObjectProcessorTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.processor;
+
+import static com.google.testing.compile.CompilationSubject.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import com.google.testing.compile.Compilation;
+import com.google.testing.compile.Compiler;
+import com.google.testing.compile.JavaFileObjects;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.tools.JavaFileObject;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.MessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for the {@link TransferableObjectProcessor}.
+ */
+public class ItTransferableObjectProcessorTest {
+    /**
+     * Package name of the test sources.
+     */
+    private static final String RESOURCE_PACKAGE_NAME = "org.apache.ignite.internal.network.processor";
+
+    /**
+     * Compiler instance configured with the annotation processor being tested.
+     */
+    private final Compiler compiler = Compiler.javac().withProcessors(new TransferableObjectProcessor());
+
+    /**
+     * Compiles the network message with all supported directly marshallable types and checks that the compilation completed successfully.
+     */
+    @Test
+    void testCompileAllTypesMessage() {
+        Compilation compilation = compile("AllTypesMessage");
+
+        assertThat(compilation).succeededWithoutWarnings();
+
+        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageBuilder"));
+        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageImpl"));
+        assertThat(compilation).generatedSourceFile(fileName("NetworkMessageProcessorTestFactory"));
+
+        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageSerializer"));
+        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageDeserializer"));
+        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageSerializationFactory"));
+        assertThat(compilation).generatedSourceFile(
+            fileName("NetworkMessageProcessorTestSerializationRegistryInitializer")
+        );
+    }
+
+    /**
+     * Compiles a network message that does not implement {@link NetworkMessage} directly but rather through a bunch of superinterfaces.
+     */
+    @Test
+    void testTransitiveMessage() {
+        Compilation compilation = compile("TransitiveMessage");
+
+        assertThat(compilation).succeededWithoutWarnings();
+
+        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageBuilder"));
+        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageImpl"));
+        assertThat(compilation).generatedSourceFile(fileName("NetworkMessageProcessorTestFactory"));
+
+        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageSerializer"));
+        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageDeserializer"));
+        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageSerializationFactory"));
+        assertThat(compilation).generatedSourceFile(
+            fileName("NetworkMessageProcessorTestSerializationRegistryInitializer")
+        );
+    }
+
+    /**
+     * Tests that compilation of multiple well-formed messages is successful.
+     */
+    @Test
+    void testCompileMultipleMessage() {
+        Compilation compilation = compiler.compile(
+            sources("AllTypesMessage", "TransitiveMessage", "ITTestMessageGroup")
+        );
+
+        assertThat(compilation).succeededWithoutWarnings();
+
+        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageBuilder"));
+        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageImpl"));
+
+        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageBuilder"));
+        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageImpl"));
+
+        assertThat(compilation).generatedSourceFile(fileName("NetworkMessageProcessorTestFactory"));
+
+        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageSerializer"));
+        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageDeserializer"));
+        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageSerializationFactory"));
+
+        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageSerializer"));
+        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageDeserializer"));
+        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageSerializationFactory"));
+
+        assertThat(compilation).generatedSourceFile(
+            fileName("NetworkMessageProcessorTestSerializationRegistryInitializer")
+        );
+    }
+
+    /**
+     * Compiles a test message that doesn't extend {@link NetworkMessage}.
+     */
+    @Test
+    void testInvalidAnnotatedTypeMessage() {
+        Compilation compilation = compile("InvalidAnnotatedTypeMessage");
+
+        assertThat(compilation).hadErrorContaining("annotation must only be present on interfaces that extend");
+    }
+
+    /**
+     * Compiles a test message that contains an unsupported content type.
+     */
+    @Test
+    void testUnmarshallableTypeMessage() {
+        Compilation compilation = compile("UnmarshallableTypeMessage");
+
+        assertThat(compilation).hadErrorContaining(
+            "Unsupported reference type for message (de-)serialization: java.util.ArrayList"
+        );
+    }
+
+    /**
+     * Compiles a test message that violates the message contract by declaring a getter with {@code void} return type.
+     */
+    @Test
+    void testInvalidReturnTypeGetterMessage() {
+        Compilation compilation = compile("InvalidReturnTypeGetterMessage");
+
+        assertThat(compilation).hadErrorContaining("Invalid getter method a()");
+    }
+
+    /**
+     * Compiles a test message that violates the message contract by declaring a getter with a parameter.
+     */
+    @Test
+    void testInvalidParameterGetterMessage() {
+        Compilation compilation = compile("InvalidParameterGetterMessage");
+
+        assertThat(compilation).hadErrorContaining("Invalid getter method a(int)");
+    }
+
+    /**
+     * Tests that compilation fails if no {@link MessageGroup} annotated elements were found.
+     */
+    @Test
+    void testMissingMessageGroup() {
+        Compilation compilation = compiler.compile(sources("AllTypesMessage"));
+
+        assertThat(compilation).hadErrorContaining(String.format(
+            "No message groups (classes annotated with @%s) found while processing messages from the following packages: [%s]",
+            MessageGroup.class.getSimpleName(),
+            RESOURCE_PACKAGE_NAME
+        ));
+    }
+
+    /**
+     * Tests that compilation fails if multiple {@link MessageGroup} annotated elements were found.
+     */
+    @Test
+    void testMultipleMessageGroups() {
+        Compilation compilation = compiler.compile(
+            sources("AllTypesMessage", "ConflictingTypeMessage", "ITTestMessageGroup", "SecondGroup")
+        );
+
+        assertThat(compilation).hadErrorContaining(String.format(
+            "Invalid number of message groups (classes annotated with @%s), "
+                + "only one can be present in a compilation unit: [%s.ITTestMessageGroup, %s.SecondGroup]",
+            MessageGroup.class.getSimpleName(),
+            RESOURCE_PACKAGE_NAME,
+            RESOURCE_PACKAGE_NAME
+        ));
+    }
+
+    /**
+     * Tests that setting the {@link Transferable#autoSerializable()} to {@code false} does not produce any serialization-related classes
+     * and errors.
+     */
+    @Test
+    void testNonSerializableMessage() {
+        Compilation compilation = compile("UnmarshallableTypeNonSerializableMessage");
+
+        assertThat(compilation).succeededWithoutWarnings();
+
+        assertThat(compilation).generatedSourceFile(fileName("UnmarshallableTypeNonSerializableMessageBuilder"));
+        assertThat(compilation).generatedSourceFile(fileName("UnmarshallableTypeNonSerializableMessageImpl"));
+        assertThat(compilation).generatedSourceFile(fileName("NetworkMessageProcessorTestFactory"));
+
+        // test that no additional classes have been generated
+        assertThrows(
+            AssertionError.class,
+            () -> assertThat(compilation)
+                .generatedSourceFile(fileName("UnmarshallableTypeNonSerializableMessageSerializer"))
+        );
+    }
+
+    /**
+     * Tests that messages with the same message type fail to compile.
+     */
+    @Test
+    void testConflictingMessageTypes() {
+        Compilation compilation = compiler.compile(
+            sources("AllTypesMessage", "ConflictingTypeMessage", "ITTestMessageGroup")
+        );
+
+        assertThat(compilation).hadErrorContaining("message with type 1 already exists");
+    }
+
+    /**
+     * Tests that if a message getter clashes with a getter in a superinterface, an appropriate error is displayed.
+     */
+    @Test
+    void testInheritedMessageClash() {
+        Compilation compilation = compile("InheritedMessageClash");
+
+        assertThat(compilation).hadErrorContaining("Getter with name 'x' is already defined");
+    }
+
+    /**
+     * Compiles the given network message.
+     */
+    private Compilation compile(String messageSource) {
+        return compiler.compile(sources(messageSource, "ITTestMessageGroup"));
+    }
+
+    /**
+     * Converts given test source class names to a list of {@link JavaFileObject}s.
+     */
+    private static List<JavaFileObject> sources(String... sources) {
+        return Arrays.stream(sources)
+            .map(source -> RESOURCE_PACKAGE_NAME.replace('.', '/') + '/' + source + ".java")
+            .map(JavaFileObjects::forResource)
+            .collect(Collectors.toList());
+    }
+
+    private static String fileName(String className) {
+        return RESOURCE_PACKAGE_NAME + '.' + className;
+    }
+}
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/AllTypesMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/AllTypesMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/AllTypesMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/AllTypesMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ConflictingTypeMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ConflictingTypeMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ConflictingTypeMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ConflictingTypeMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ITTestMessageGroup.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ITTestMessageGroup.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ITTestMessageGroup.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ITTestMessageGroup.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InheritedMessageClash.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InheritedMessageClash.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InheritedMessageClash.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InheritedMessageClash.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidAnnotatedTypeMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidAnnotatedTypeMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidAnnotatedTypeMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidAnnotatedTypeMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidParameterGetterMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidParameterGetterMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidParameterGetterMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidParameterGetterMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidReturnTypeGetterMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidReturnTypeGetterMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidReturnTypeGetterMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidReturnTypeGetterMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/SecondGroup.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/SecondGroup.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/SecondGroup.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/SecondGroup.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/TransitiveMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/TransitiveMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/TransitiveMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/TransitiveMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeNonSerializableMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeNonSerializableMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeNonSerializableMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeNonSerializableMessage.java
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 2ee7961..1f922ff 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
@@ -190,7 +191,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
     private CompletableFuture<Void> sendMessage0(NetworkMessage message, String recipientConsistentId, InetSocketAddress addr) {
         if (isInNetworkThread()) {
-            return CompletableFuture.supplyAsync(() -> sendMessage0(message, recipientConsistentId, addr), svc).thenCompose(fut -> fut);
+            return CompletableFuture.supplyAsync(() -> sendMessage0(message, recipientConsistentId, addr), svc).thenCompose(Function.identity());
         }
 
         List<ClassDescriptorMessage> descriptors;
@@ -244,10 +245,14 @@ public class DefaultMessagingService extends AbstractMessagingService {
     private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws Exception {
         Set<Integer> ids = msg.beforeRead(new HashSet<>(), marshaller);
 
-        return createClassDescriptorsMessages(ids);
+        return createClassDescriptorsMessages(factory, classDescriptorRegistry, ids);
     }
 
-    public List<ClassDescriptorMessage> createClassDescriptorsMessages(Set<Integer> descriptorIds) {
+    public static List<ClassDescriptorMessage> createClassDescriptorsMessages(
+            NetworkMessagesFactory factory,
+            ClassDescriptorRegistry classDescriptorRegistry,
+            Set<Integer> descriptorIds
+    ) {
         List<ClassDescriptorMessage> messages = descriptorIds.stream()
                 .map(classDescriptorRegistry::getDescriptor)
                 .map(descriptor -> {
@@ -281,14 +286,14 @@ public class DefaultMessagingService extends AbstractMessagingService {
         return messages;
     }
 
-    private byte fieldFlags(FieldDescriptor fieldDescriptor) {
+    private static byte fieldFlags(FieldDescriptor fieldDescriptor) {
         int bits = condMask(fieldDescriptor.isUnshared(), FieldDescriptorMessage.UNSHARED_MASK)
                 | condMask(fieldDescriptor.isPrimitive(), FieldDescriptorMessage.IS_PRIMITIVE)
                 | condMask(fieldDescriptor.isRuntimeTypeKnownUpfront(), FieldDescriptorMessage.IS_RUNTIME_TYPE_KNOWN_UPFRONT);
         return (byte) bits;
     }
 
-    private byte serializationAttributeFlags(Serialization serialization) {
+    private static byte serializationAttributeFlags(Serialization serialization) {
         int bits = condMask(serialization.hasWriteObject(), ClassDescriptorMessage.HAS_WRITE_OBJECT_MASK)
                 | condMask(serialization.hasReadObject(), ClassDescriptorMessage.HAS_READ_OBJECT_MASK)
                 | condMask(serialization.hasReadObjectNoData(), ClassDescriptorMessage.HAS_READ_OBJECT_NO_DATA_MASK)
@@ -297,11 +302,11 @@ public class DefaultMessagingService extends AbstractMessagingService {
         return (byte) bits;
     }
 
-    private int condMask(boolean value, int mask) {
+    private static int condMask(boolean value, int mask) {
         return value ? mask : 0;
     }
 
-    private byte classDescriptorAttributeFlags(ClassDescriptor descriptor) {
+    private static byte classDescriptorAttributeFlags(ClassDescriptor descriptor) {
         int bits = condMask(descriptor.isPrimitive(), ClassDescriptorMessage.IS_PRIMITIVE_MASK)
                 | condMask(descriptor.isArray(), ClassDescriptorMessage.IS_ARRAY_MASK)
                 | condMask(descriptor.isRuntimeEnum(), ClassDescriptorMessage.IS_RUNTIME_ENUM_MASK)
@@ -309,7 +314,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
         return (byte) bits;
     }
 
-    private int superClassDescriptorIdForMessage(ClassDescriptor descriptor) {
+    private static int superClassDescriptorIdForMessage(ClassDescriptor descriptor) {
         Integer id = descriptor.superClassDescriptorId();
 
         if (id == null) {
@@ -319,7 +324,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
         return id;
     }
 
-    private int componentTypeDescriptorIdForMessage(ClassDescriptor descriptor) {
+    private static int componentTypeDescriptorIdForMessage(ClassDescriptor descriptor) {
         Integer id = descriptor.componentTypeDescriptorId();
 
         if (id == null) {
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
index 30c64ab..de4fa63 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
 import static org.hamcrest.collection.IsEmptyCollection.empty;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
@@ -30,6 +29,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
 import it.unimi.dsi.fastutil.ints.IntSets;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -37,14 +38,21 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
 import org.apache.ignite.internal.network.netty.InboundDecoder;
+import org.apache.ignite.internal.network.netty.OutboundEncoder;
 import org.apache.ignite.internal.network.serialization.marshal.MarshalException;
 import org.apache.ignite.internal.network.serialization.marshal.MarshalledObject;
 import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
+import org.apache.ignite.network.DefaultMessagingService;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
 import org.apache.ignite.network.TestMessagesFactory;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
@@ -68,7 +76,7 @@ public class MarshallableTest {
      * Tests that marshallable object can be serialized along with its descriptor.
      */
     @Test
-    public void testMarshallable() {
+    public void testMarshallable() throws Exception {
         // Test map that will be sent as a Marshallable object within the MessageWithMarshallable message
         Map<String, SimpleSerializableObject> testMap = Map.of("test", new SimpleSerializableObject(10));
 
@@ -80,29 +88,36 @@ public class MarshallableTest {
     }
 
     /** Writes a map to a buffer through the {@link MessageWithMarshallable}. */
-    private ByteBuffer write(Map<String, SimpleSerializableObject> testMap) {
+    private ByteBuffer write(Map<String, SimpleSerializableObject> testMap) throws Exception {
         var serializers = new Serialization();
 
         var writer = new DirectMessageWriter(serializers.perSessionSerializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
 
         MessageWithMarshallable msg = msgFactory.messageWithMarshallable().marshallableMap(testMap).build();
 
+        var ids = new HashSet<Integer>();
+
+        msg.beforeRead(ids, serializers.userObjectSerializer);
+
         MessageSerializer<NetworkMessage> serializer = registry.createSerializer(msg.groupType(), msg.messageType());
 
-        ByteBuffer nioBuffer = ByteBuffer.allocate(1000);
+        var catcher = new OutboundByteBufCatcher();
+        var channel = new EmbeddedChannel(catcher, new ChunkedWriteHandler(), new OutboundEncoder(serializers.perSessionSerializationService));
 
-        writer.setBuffer(nioBuffer);
+        List<ClassDescriptorMessage> classDescriptorsMessages = DefaultMessagingService.createClassDescriptorsMessages(
+            new NetworkMessagesFactory(), serializers.descriptorRegistry, ids);
 
-        // Write a message to the ByteBuffer.
-        boolean fullyWritten = serializer.writeMessage(msg, writer);
+        channel.writeAndFlush(new NetworkObject(msg, classDescriptorsMessages));
 
-        assertTrue(fullyWritten);
+        channel.flushOutbound();
+
+        ByteBuffer nioBuffer = catcher.buf;
 
         return nioBuffer;
     }
 
     /** Reads a {@link MessageWithMarshallable} from the buffer (byte by byte) and checks for the class descriptor merging. */
-    private Map<String, SimpleSerializableObject> read(ByteBuffer outBuffer) {
+    private Map<String, SimpleSerializableObject> read(ByteBuffer outBuffer) throws Exception {
         ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
 
         var channel = new EmbeddedChannel();
@@ -150,6 +165,8 @@ public class MarshallableTest {
 
         MessageWithMarshallable received = (MessageWithMarshallable) list.get(0);
 
+        received.afterRead(serializers.userObjectSerializer, serializers.descriptorRegistry);
+
         return received.marshallableMap();
     }
 
@@ -158,15 +175,17 @@ public class MarshallableTest {
         private final PerSessionSerializationService perSessionSerializationService;
 
         private final ClassDescriptor descriptor;
+        private final StubMarshaller userObjectSerializer;
+        private final ClassDescriptorRegistry descriptorRegistry;
 
         Serialization() {
-            var descriptorRegistry = new ClassDescriptorRegistry();
+            this.descriptorRegistry = new ClassDescriptorRegistry();
             var factory = new ClassDescriptorFactory(descriptorRegistry);
 
             // Create descriptor for SimpleSerializableObject
             this.descriptor = factory.create(SimpleSerializableObject.class);
 
-            var userObjectSerializer = new StubMarshaller(descriptor);
+            this.userObjectSerializer = new StubMarshaller(descriptor);
 
             var ser = new UserObjectSerializationContext(descriptorRegistry, factory, userObjectSerializer);
 
@@ -175,6 +194,16 @@ public class MarshallableTest {
         }
     }
 
+    private static class OutboundByteBufCatcher extends MessageToMessageEncoder<ByteBuf> {
+        private ByteBuffer buf = ByteBuffer.allocateDirect(1000);
+        @Override
+        protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
+            ByteBuffer nioBuffer = byteBuf.nioBuffer();
+            buf.put(nioBuffer);
+            list.add(1);
+        }
+    }
+
     /**
      *  Stub implementation of the {@link UserObjectMarshaller}, which uses the JDK's serializable
      *  serialization to actually marshall an object.

[ignite-3] 09/12: .

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit a22ce4520c81f87f2fef7dfe09175decfe229d32
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Fri Mar 4 11:41:53 2022 +0300

    .
---
 .../main/java/org/apache/ignite/network/DefaultMessagingService.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 8577233..cdb5222 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -349,7 +349,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
     }
 
     private final ExecutorService outSvc = Executors.newSingleThreadExecutor();
-    private final ExecutorService inSvc = Executors.newSingleThreadExecutor();
+    private final ExecutorService inSvc = Executors.newFixedThreadPool(5);
 
     /**
      * Handles an incoming messages.

[ignite-3] 12/12: .

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 97f6c4e2e024686754fea95cd5a96a9ae1c8b613
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Fri Mar 4 13:59:19 2022 +0300

    .
---
 .../raft/server/ItJraftCounterServerTest.java      | 42 ++++++----------------
 1 file changed, 11 insertions(+), 31 deletions(-)

diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index c329f76..80b81b4 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -569,44 +569,24 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
         listenerFactory = () -> new CounterListener() {
             @Override
             public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
-                Iterator<CommandClosure<WriteCommand>> wrapper = new Iterator<>() {
-                    @Override
-                    public boolean hasNext() {
-                        return iterator.hasNext();
-                    }
+                while (iterator.hasNext()) {
+                    CommandClosure<WriteCommand> clo = iterator.next();
 
-                    @Override
-                    public CommandClosure<WriteCommand> next() {
-                        CommandClosure<WriteCommand> cmd = iterator.next();
-
-                        cmd.result(new RuntimeException("Expected message"));
-
-                        return cmd;
-                    }
-                };
+                    IncrementAndGetCommand cmd0 = (IncrementAndGetCommand) clo.command();
 
-                super.onWrite(wrapper);
+                    clo.result(new RuntimeException("Expected message"));
+                }
             }
 
             @Override
             public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
-                Iterator<CommandClosure<ReadCommand>> wrapper = new Iterator<>() {
-                    @Override
-                    public boolean hasNext() {
-                        return iterator.hasNext();
-                    }
+                while (iterator.hasNext()) {
+                    CommandClosure<ReadCommand> clo = iterator.next();
 
-                    @Override
-                    public CommandClosure<ReadCommand> next() {
-                        CommandClosure<ReadCommand> cmd = iterator.next();
+                    assert clo.command() instanceof GetValueCommand;
 
-                        cmd.result(new RuntimeException("Another expected message"));
-
-                        return cmd;
-                    }
-                };
-
-                super.onRead(wrapper);
+                    clo.result(new RuntimeException("Another expected message"));
+                }
             }
         };
 
@@ -624,7 +604,7 @@ class ItJraftCounterServerTest extends RaftServerAbstractTest {
         assertNotNull(leader);
 
         try {
-            client1.<Long>run(new IncrementAndGetCommand(0)).get();
+            Long aLong = client1.<Long>run(new IncrementAndGetCommand(3)).get();
 
             fail();
         } catch (Exception e) {

[ignite-3] 07/12: .

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit b01363490e802e27e829ec96bb5b407d9a4c5469
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Mar 3 18:20:40 2022 +0300

    .
---
 .../org/apache/ignite/network/DefaultMessagingService.java     | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 54d9de6..8577233 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -191,7 +191,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
     private CompletableFuture<Void> sendMessage0(NetworkMessage message, String recipientConsistentId, InetSocketAddress addr) {
         if (isInNetworkThread()) {
-            return CompletableFuture.supplyAsync(() -> sendMessage0(message, recipientConsistentId, addr), svc).thenCompose(Function.identity());
+            return CompletableFuture.supplyAsync(() -> sendMessage0(message, recipientConsistentId, addr), outSvc).thenCompose(Function.identity());
         }
 
         List<ClassDescriptorMessage> descriptors;
@@ -348,7 +348,8 @@ public class DefaultMessagingService extends AbstractMessagingService {
         }
     }
 
-    private final ExecutorService svc = Executors.newSingleThreadExecutor();
+    private final ExecutorService outSvc = Executors.newSingleThreadExecutor();
+    private final ExecutorService inSvc = Executors.newSingleThreadExecutor();
 
     /**
      * Handles an incoming messages.
@@ -358,7 +359,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
      */
     private void onMessage(InNetworkObject obj) {
         if (isInNetworkThread()) {
-            svc.submit(() -> onMessage(obj));
+            inSvc.submit(() -> onMessage(obj));
             return;
         }
 
@@ -491,7 +492,8 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         requestsMap.clear();
 
-        svc.shutdown();
+        inSvc.shutdown();
+        outSvc.shutdown();
     }
 
     private UserObjectSerializationContext createUserObjectSerializationContext() {

[ignite-3] 01/12: .

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 24c8e94faed8680f4468af9ab2d52c14971cd837
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Mar 2 18:09:25 2022 +0300

    .
---
 .../messages/MessageBuilderGenerator.java          |  73 +++---
 .../processor/messages/MessageImplGenerator.java   | 259 ++++++++++++++++++++-
 .../serialization/BaseMethodNameResolver.java      |   6 +-
 .../MessageDeserializerGenerator.java              |   9 +-
 .../serialization/MessageReaderMethodResolver.java |   3 +-
 .../serialization/MessageSerializerGenerator.java  |   4 +-
 .../serialization/MessageWriterMethodResolver.java |   7 +-
 .../org/apache/ignite/network/NetworkMessage.java  |  11 +
 .../network/netty/ItConnectionManagerTest.java     |  20 +-
 .../network/recovery/ItRecoveryHandshakeTest.java  |   6 +-
 .../scalecube/ItScaleCubeNetworkMessagingTest.java |  10 +-
 .../internal/network/NetworkMessageTypes.java      |   6 +
 .../message/ClassDescriptorListMessage.java        |  30 +++
 .../internal/network/netty/ConnectionManager.java  |  12 +-
 .../{MessageHandler.java => InNetworkObject.java}  |  41 ++--
 .../internal/network/netty/InboundDecoder.java     |  13 +-
 .../internal/network/netty/MessageHandler.java     |  16 +-
 .../ignite/internal/network/netty/NettyClient.java |  11 +-
 .../ignite/internal/network/netty/NettySender.java |   6 +-
 .../ignite/internal/network/netty/NettyServer.java |  10 +-
 .../internal/network/netty/OutboundEncoder.java    |  41 +++-
 .../recovery/RecoveryClientHandshakeManager.java   |   4 +-
 .../recovery/RecoveryServerHandshakeManager.java   |   4 +-
 .../PerSessionSerializationService.java            |  16 +-
 .../ignite/network/DefaultMessagingService.java    | 165 ++++++++++++-
 .../ignite/network/NettyBootstrapFactory.java      |  16 ++
 .../org/apache/ignite/network/NetworkObject.java   |  40 ++++
 .../scalecube/ScaleCubeClusterServiceFactory.java  |   6 +-
 .../ScaleCubeDirectMarshallerTransport.java        |  26 ++-
 .../internal/network/netty/NettyClientTest.java    |   4 +-
 .../internal/network/netty/NettyServerTest.java    |   2 +-
 modules/raft/pom.xml                               |   1 -
 modules/transactions/pom.xml                       |   5 +
 33 files changed, 747 insertions(+), 136 deletions(-)

diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java
index 650296e..7f23723 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java
@@ -17,17 +17,20 @@
 
 package org.apache.ignite.internal.network.processor.messages;
 
+import com.squareup.javapoet.ArrayTypeName;
 import com.squareup.javapoet.ClassName;
 import com.squareup.javapoet.MethodSpec;
 import com.squareup.javapoet.TypeName;
 import com.squareup.javapoet.TypeSpec;
-import java.util.List;
-import java.util.stream.Collectors;
+import java.util.ArrayList;
 import javax.annotation.processing.ProcessingEnvironment;
 import javax.lang.model.element.Modifier;
+import javax.lang.model.type.TypeMirror;
 import javax.tools.Diagnostic;
 import org.apache.ignite.internal.network.processor.MessageClass;
 import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
+import org.apache.ignite.internal.network.processor.TypeUtils;
+import org.apache.ignite.network.annotations.Marshallable;
 
 /**
  * Class for generating Builder interfaces for Network Messages.
@@ -39,6 +42,8 @@ public class MessageBuilderGenerator {
     /** Message group. */
     private final MessageGroupWrapper messageGroup;
 
+    private final TypeUtils typeUtils;
+
     /**
      * Constructor.
      *
@@ -48,6 +53,7 @@ public class MessageBuilderGenerator {
     public MessageBuilderGenerator(ProcessingEnvironment processingEnvironment, MessageGroupWrapper messageGroup) {
         this.processingEnvironment = processingEnvironment;
         this.messageGroup = messageGroup;
+        this.typeUtils = new TypeUtils(processingEnvironment);
     }
 
     /**
@@ -62,30 +68,45 @@ public class MessageBuilderGenerator {
         processingEnvironment.getMessager()
                 .printMessage(Diagnostic.Kind.NOTE, "Generating " + builderName, message.element());
 
-        // generate a setter for each getter in the original interface
-        List<MethodSpec> setters = message.getters().stream()
-                .map(getter -> {
-                    String getterName = getter.getSimpleName().toString();
-
-                    return MethodSpec.methodBuilder(getterName)
-                            .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
-                            .addParameter(TypeName.get(getter.getReturnType()), getterName)
-                            .returns(builderName)
-                            .build();
-                })
-                .collect(Collectors.toList());
-
-        // generate a getter for each getter in the original interface
-        List<MethodSpec> getters = message.getters().stream()
-                .map(getter -> {
-                    String getterName = getter.getSimpleName().toString();
-
-                    return MethodSpec.methodBuilder(getterName)
-                            .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
-                            .returns(TypeName.get(getter.getReturnType()))
-                            .build();
-                })
-                .collect(Collectors.toList());
+        var setters = new ArrayList<MethodSpec>(message.getters().size());
+        var getters = new ArrayList<MethodSpec>(message.getters().size());
+        message.getters().forEach(getter -> {
+            String getterName = getter.getSimpleName().toString();
+
+            TypeMirror type = getter.getReturnType();
+
+            // generate a setter for each getter in the original interface
+            MethodSpec setterSpec = MethodSpec.methodBuilder(getterName)
+                    .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+                    .addParameter(TypeName.get(type), getterName)
+                    .returns(builderName)
+                    .build();
+
+            // generate a getter for each getter in the original interface
+            MethodSpec getterSpec = MethodSpec.methodBuilder(getterName)
+                    .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+                    .returns(TypeName.get(type))
+                    .build();
+
+            setters.add(setterSpec);
+            getters.add(getterSpec);
+
+            if (getter.getAnnotation(Marshallable.class) != null) {
+                MethodSpec baSetter = MethodSpec.methodBuilder(getterName + "ByteArray")
+                        .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+                        .addParameter(ArrayTypeName.of(TypeName.BYTE), getterName + "ByteArray")
+                        .returns(builderName)
+                        .build();
+
+                MethodSpec baGetter = MethodSpec.methodBuilder(getterName + "ByteArray")
+                        .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+                        .returns(ArrayTypeName.of(TypeName.BYTE))
+                        .build();
+
+                setters.add(baSetter);
+                getters.add(baGetter);
+            }
+        });
 
         MethodSpec buildMethod = MethodSpec.methodBuilder("build")
                 .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index 9694785..835894b 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -17,26 +17,38 @@
 
 package org.apache.ignite.internal.network.processor.messages;
 
+import com.squareup.javapoet.ArrayTypeName;
 import com.squareup.javapoet.ClassName;
 import com.squareup.javapoet.CodeBlock;
 import com.squareup.javapoet.FieldSpec;
 import com.squareup.javapoet.MethodSpec;
+import com.squareup.javapoet.MethodSpec.Builder;
+import com.squareup.javapoet.ParameterizedTypeName;
 import com.squareup.javapoet.TypeName;
 import com.squareup.javapoet.TypeSpec;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import javax.annotation.processing.ProcessingEnvironment;
 import javax.lang.model.element.ExecutableElement;
 import javax.lang.model.element.Modifier;
+import javax.lang.model.type.ArrayType;
+import javax.lang.model.type.DeclaredType;
 import javax.lang.model.type.TypeKind;
+import javax.lang.model.type.TypeMirror;
 import javax.tools.Diagnostic;
 import org.apache.ignite.internal.network.processor.MessageClass;
 import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
+import org.apache.ignite.internal.network.processor.TypeUtils;
+import org.apache.ignite.internal.network.processor.serialization.BaseMethodNameResolver;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
 
 /**
  * Class for generating implementations of the {@link NetworkMessage} interfaces and their builders, generated by a {@link
@@ -49,6 +61,10 @@ public class MessageImplGenerator {
     /** Message group. */
     private final MessageGroupWrapper messageGroup;
 
+    private final TypeUtils typeUtils;
+
+    private final BaseMethodNameResolver methodNameResolver;
+
     /**
      * Constructor.
      *
@@ -58,6 +74,8 @@ public class MessageImplGenerator {
     public MessageImplGenerator(ProcessingEnvironment processingEnv, MessageGroupWrapper messageGroup) {
         this.processingEnv = processingEnv;
         this.messageGroup = messageGroup;
+        this.typeUtils = new TypeUtils(processingEnv);
+        this.methodNameResolver = new BaseMethodNameResolver(processingEnv);
     }
 
     /**
@@ -76,6 +94,7 @@ public class MessageImplGenerator {
         List<ExecutableElement> getters = message.getters();
 
         var fields = new ArrayList<FieldSpec>(getters.size());
+        var allFields = new ArrayList<FieldSpec>(getters.size());
         var getterImpls = new ArrayList<MethodSpec>(getters.size());
 
         // create a field and a getter implementation for every getter in the message interface
@@ -85,24 +104,42 @@ public class MessageImplGenerator {
             String getterName = getter.getSimpleName().toString();
 
             FieldSpec field = FieldSpec.builder(getterReturnType, getterName)
-                    .addModifiers(Modifier.PRIVATE, Modifier.FINAL)
+                    .addModifiers(Modifier.PRIVATE)
                     .build();
 
             fields.add(field);
+            allFields.add(field);
 
             MethodSpec getterImpl = MethodSpec.overriding(getter)
                     .addStatement("return $N", field)
                     .build();
 
             getterImpls.add(getterImpl);
+
+            if (getter.getAnnotation(Marshallable.class) != null) {
+                ArrayTypeName arrayTypeName = ArrayTypeName.of(TypeName.BYTE);
+                String name = getterName + "ByteArray";
+                FieldSpec marshallableFieldArray = FieldSpec.builder(arrayTypeName, name)
+                        .addModifiers(Modifier.PRIVATE)
+                        .build();
+
+                allFields.add(marshallableFieldArray);
+
+                MethodSpec baGetterImpl = MethodSpec.methodBuilder(name)
+                        .returns(arrayTypeName)
+                        .addStatement("return $N", marshallableFieldArray)
+                        .build();
+
+                getterImpls.add(baGetterImpl);
+            }
         }
 
         TypeSpec.Builder messageImpl = TypeSpec.classBuilder(messageImplClassName)
                 .addModifiers(Modifier.PUBLIC)
                 .addSuperinterface(message.className())
-                .addFields(fields)
+                .addFields(allFields)
                 .addMethods(getterImpls)
-                .addMethod(constructor(fields));
+                .addMethod(constructor(allFields));
 
         // group type constant and getter
         FieldSpec groupTypeField = FieldSpec.builder(short.class, "GROUP_TYPE")
@@ -156,6 +193,9 @@ public class MessageImplGenerator {
 
         messageImpl.addMethod(builderMethod);
 
+        generateBeforeSend(messageImpl, message);
+        generateAfterReceive(messageImpl, message);
+
         messageImpl
                 .addOriginatingElement(message.element())
                 .addOriginatingElement(messageGroup.element());
@@ -163,6 +203,183 @@ public class MessageImplGenerator {
         return messageImpl.build();
     }
 
+    private void generateBeforeSend(TypeSpec.Builder messageImplBuild, MessageClass message) {
+        ParameterizedTypeName returnType = ParameterizedTypeName.get(ClassName.get(Set.class), TypeName.INT.box());
+        ParameterizedTypeName returnTypeImpl = ParameterizedTypeName.get(ClassName.get(HashSet.class), TypeName.INT.box());
+
+        Builder beforeRead = MethodSpec.methodBuilder("beforeRead")
+                .addAnnotation(Override.class)
+                .addModifiers(Modifier.PUBLIC)
+                .addException(Exception.class)
+                .addParameter(returnType, "set")
+                .addParameter(Object.class, "marshallerObj")
+                .returns(returnType);
+
+        ClassName marshallerClass = ClassName.get("org.apache.ignite.internal.network.serialization.marshal", "UserObjectMarshaller");
+        ClassName marshalledObjectClass = ClassName.get("org.apache.ignite.internal.network.serialization.marshal", "MarshalledObject");
+
+        beforeRead.addStatement("$T marshaller = ($T) marshallerObj", marshallerClass, marshallerClass);
+
+        message.getters().forEach(executableElement -> {
+            TypeMirror type = executableElement.getReturnType();
+            String objectName = executableElement.getSimpleName().toString();
+
+            if (executableElement.getAnnotation(Marshallable.class) != null) {
+                String baName = objectName + "ByteArray";
+                String moName = baName + "mo";
+                beforeRead.addStatement("$T $N = marshaller.marshal($N)", marshalledObjectClass, moName, objectName);
+                beforeRead.addStatement("set.addAll($N.usedDescriptorIds())", moName);
+                beforeRead.addStatement("$N = $N.bytes()", baName, moName).addCode("\n");
+            } else if (typeUtils.isSubType(type, NetworkMessage.class)) {
+                beforeRead.addStatement("if ($N != null) $N.beforeRead(set, marshallerObj)", objectName, objectName);
+            } else {
+                String methodType = methodNameResolver.resolveBaseMethodName(type);
+                switch (methodType) {
+                    case "ObjectArray":
+                        ArrayType arrayType = (ArrayType) type;
+                        if (typeUtils.isSubType(arrayType.getComponentType(), NetworkMessage.class)) {
+                            beforeRead.beginControlFlow("if ($N != null)", objectName);
+                            beforeRead.beginControlFlow("for (int i = 0; i < $N.length; i++)", objectName);
+                            beforeRead.addStatement("if ($N[i] != null) $N[i].beforeRead(set, marshallerObj)", objectName, objectName);
+                            beforeRead.endControlFlow();
+                            beforeRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    case "Collection":
+                        DeclaredType declaredType = (DeclaredType) type;
+                        TypeMirror elementType = declaredType.getTypeArguments().get(0);
+                        if (typeUtils.isSubType(elementType, NetworkMessage.class)) {
+                            beforeRead.beginControlFlow("if ($N != null)", objectName);
+                            beforeRead.beginControlFlow("for ($T obj : $N)", elementType, objectName);
+                            beforeRead.addStatement("if (obj != null) obj.beforeRead(set, marshallerObj)", objectName);
+                            beforeRead.endControlFlow();
+                            beforeRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    case "Map":
+                        DeclaredType mapType = (DeclaredType) type;
+                        TypeMirror keyType = mapType.getTypeArguments().get(0);
+                        boolean keyIsMessage = typeUtils.isSubType(keyType, NetworkMessage.class);
+                        TypeMirror valueType = mapType.getTypeArguments().get(1);
+                        boolean valueIsMessage = typeUtils.isSubType(valueType, NetworkMessage.class);
+
+                        if (keyIsMessage || valueIsMessage) {
+                            ParameterizedTypeName entryType = ParameterizedTypeName.get(ClassName.get(Map.Entry.class), TypeName.get(keyType), TypeName.get(valueType));
+                            ParameterizedTypeName entrySetType = ParameterizedTypeName.get(ClassName.get(Set.class), entryType);
+                            String entrySetName = objectName + "EntrySet";
+
+                            beforeRead.beginControlFlow("if ($N != null)", objectName);
+                            beforeRead.addStatement("$T $N = $N.entrySet()", entrySetType, entrySetName, objectName);
+                            beforeRead.beginControlFlow("for ($T entry : $N)", entryType, entrySetName);
+                            beforeRead.addStatement("$T key = entry.getKey()", keyType);
+                            beforeRead.addStatement("$T value = entry.getValue()", valueType);
+                            if (keyIsMessage) {
+                                beforeRead.addStatement("if (key != null) key.beforeRead(set, marshallerObj)");
+                            }
+                            if (valueIsMessage) {
+                                beforeRead.addStatement("if (value != null) value.beforeRead(set, marshallerObj)");
+                            }
+                            beforeRead.endControlFlow().addCode("\n");
+                            beforeRead.endControlFlow();
+                        }
+                        break;
+                    default:
+                        break;
+                }
+            }
+        });
+
+        beforeRead.addStatement("return set");
+
+        messageImplBuild.addMethod(beforeRead.build());
+    }
+
+    private void generateAfterReceive(TypeSpec.Builder messageImplBuild, MessageClass message) {
+        ParameterizedTypeName returnType = ParameterizedTypeName.get(ClassName.get(Set.class), TypeName.INT.box());
+        ParameterizedTypeName returnTypeImpl = ParameterizedTypeName.get(ClassName.get(HashSet.class), TypeName.INT.box());
+
+        Builder afterRead = MethodSpec.methodBuilder("afterRead")
+                .addAnnotation(Override.class)
+                .addModifiers(Modifier.PUBLIC)
+                .addException(Exception.class)
+                .addParameter(Object.class, "marshallerObj")
+                .addParameter(Object.class, "descriptorsObj");
+
+        ClassName marshallerClass = ClassName.get("org.apache.ignite.internal.network.serialization.marshal", "UserObjectMarshaller");
+        ClassName descriptorRegistryClass = ClassName.get("org.apache.ignite.internal.network.serialization", "DescriptorRegistry");
+
+        afterRead.addStatement("$T marshaller = ($T) marshallerObj", marshallerClass, marshallerClass);
+        afterRead.addStatement("$T descriptorRegistry = ($T) descriptorsObj", descriptorRegistryClass, descriptorRegistryClass);
+
+        message.getters().forEach(executableElement -> {
+            TypeMirror type = executableElement.getReturnType();
+            String objectName = executableElement.getSimpleName().toString();
+
+            if (executableElement.getAnnotation(Marshallable.class) != null) {
+                String baName = objectName + "ByteArray";
+                afterRead.addStatement("$N = marshaller.unmarshal($N, descriptorRegistry)", objectName, baName);
+            } else if (typeUtils.isSubType(type, NetworkMessage.class)) {
+                afterRead.addStatement("if ($N != null) $N.afterRead(marshallerObj, descriptorsObj)", objectName, objectName);
+            } else {
+                String methodType = methodNameResolver.resolveBaseMethodName(type);
+                switch (methodType) {
+                    case "ObjectArray":
+                        ArrayType arrayType = (ArrayType) type;
+                        if (typeUtils.isSubType(arrayType.getComponentType(), NetworkMessage.class)) {
+                            afterRead.beginControlFlow("if ($N != null)", objectName);
+                            afterRead.beginControlFlow("for (int i = 0; i < $N.length; i++)", objectName);
+                            afterRead.addStatement("if ($N[i] != null) $N[i].afterRead(marshallerObj, descriptorsObj)", objectName, objectName);
+                            afterRead.endControlFlow();
+                            afterRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    case "Collection":
+                        DeclaredType declaredType = (DeclaredType) type;
+                        TypeMirror elementType = declaredType.getTypeArguments().get(0);
+                        if (typeUtils.isSubType(elementType, NetworkMessage.class)) {
+                            afterRead.beginControlFlow("if ($N != null)", objectName);
+                            afterRead.beginControlFlow("for ($T obj : $N)", elementType, objectName);
+                            afterRead.addStatement("if (obj != null) obj.afterRead(marshallerObj, descriptorsObj)", objectName);
+                            afterRead.endControlFlow();
+                            afterRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    case "Map":
+                        DeclaredType mapType = (DeclaredType) type;
+                        TypeMirror keyType = mapType.getTypeArguments().get(0);
+                        boolean keyIsMessage = typeUtils.isSubType(keyType, NetworkMessage.class);
+                        TypeMirror valueType = mapType.getTypeArguments().get(1);
+                        boolean valueIsMessage = typeUtils.isSubType(valueType, NetworkMessage.class);
+
+                        if (keyIsMessage || valueIsMessage) {
+                            ParameterizedTypeName entryType = ParameterizedTypeName.get(ClassName.get(Map.Entry.class), TypeName.get(keyType), TypeName.get(valueType));
+                            ParameterizedTypeName entrySetType = ParameterizedTypeName.get(ClassName.get(Set.class), entryType);
+                            String entrySetName = objectName + "EntrySet";
+
+                            afterRead.beginControlFlow("if ($N != null)", objectName);
+                            afterRead.addStatement("$T $N = $N.entrySet()", entrySetType, entrySetName, objectName);
+                            afterRead.beginControlFlow("for ($T entry : $N)", entryType, entrySetName);
+                            afterRead.addStatement("$T key = entry.getKey()", keyType);
+                            afterRead.addStatement("$T value = entry.getValue()", valueType);
+                            if (keyIsMessage) {
+                                afterRead.addStatement("if (key != null) key.afterRead(marshallerObj, descriptorsObj)");
+                            }
+                            if (valueIsMessage) {
+                                afterRead.addStatement("if (value != null) value.afterRead(marshallerObj, descriptorsObj)");
+                            }
+                            afterRead.endControlFlow();
+                            afterRead.endControlFlow().addCode("\n");
+                        }
+                        break;
+                    default:
+                        break;
+                }
+            }
+        });
+
+        messageImplBuild.addMethod(afterRead.build());
+    }
+
     /**
      * Generates implementations of {@link #hashCode} and {@link #equals} for the provided {@code message} and adds them to the provided
      * builder.
@@ -322,6 +539,7 @@ public class MessageImplGenerator {
         List<ExecutableElement> messageGetters = message.getters();
 
         var fields = new ArrayList<FieldSpec>(messageGetters.size());
+        var allFields = new ArrayList<FieldSpec>(messageGetters.size());
         var setters = new ArrayList<MethodSpec>(messageGetters.size());
         var getters = new ArrayList<MethodSpec>(messageGetters.size());
 
@@ -335,6 +553,7 @@ public class MessageImplGenerator {
                     .build();
 
             fields.add(field);
+            allFields.add(field);
 
             MethodSpec setter = MethodSpec.methodBuilder(getterName)
                     .addAnnotation(Override.class)
@@ -355,15 +574,45 @@ public class MessageImplGenerator {
                     .build();
 
             getters.add(getter);
+
+            if (messageGetter.getAnnotation(Marshallable.class) != null) {
+                String name = getterName + "ByteArray";
+                ArrayTypeName type = ArrayTypeName.of(TypeName.BYTE);
+                FieldSpec baField = FieldSpec.builder(type, name)
+                        .addModifiers(Modifier.PRIVATE)
+                        .build();
+
+                allFields.add(baField);
+
+                MethodSpec baSetter = MethodSpec.methodBuilder(name)
+                        .addAnnotation(Override.class)
+                        .addModifiers(Modifier.PUBLIC)
+                        .returns(builderName)
+                        .addParameter(type, name)
+                        .addStatement("this.$N = $L", baField, name)
+                        .addStatement("return this")
+                        .build();
+
+                setters.add(baSetter);
+
+                MethodSpec baGetter = MethodSpec.methodBuilder(name)
+                        .addAnnotation(Override.class)
+                        .addModifiers(Modifier.PUBLIC)
+                        .returns(type)
+                        .addStatement("return $N", baField)
+                        .build();
+
+                getters.add(baGetter);
+            }
         }
 
         return TypeSpec.classBuilder("Builder")
                 .addModifiers(Modifier.PRIVATE, Modifier.STATIC)
                 .addSuperinterface(builderName)
-                .addFields(fields)
+                .addFields(allFields)
                 .addMethods(setters)
                 .addMethods(getters)
-                .addMethod(buildMethod(message, messageImplClass, fields))
+                .addMethod(buildMethod(message, messageImplClass, allFields))
                 .build();
     }
 
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
index 00783d0..3a87c4a 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
@@ -38,7 +38,7 @@ import org.apache.ignite.network.NetworkMessage;
  * @see MessageReaderMethodResolver
  * @see MessageWriterMethodResolver
  */
-class BaseMethodNameResolver {
+public class BaseMethodNameResolver {
     /** Processing environment. */
     private final ProcessingEnvironment processingEnvironment;
 
@@ -47,7 +47,7 @@ class BaseMethodNameResolver {
      *
      * @param processingEnvironment Processing environment.
      */
-    BaseMethodNameResolver(ProcessingEnvironment processingEnvironment) {
+    public BaseMethodNameResolver(ProcessingEnvironment processingEnvironment) {
         this.processingEnvironment = processingEnvironment;
     }
 
@@ -57,7 +57,7 @@ class BaseMethodNameResolver {
      * @param parameterType parameter of the method to resolve
      * @return part of the method name, depending on the parameter type
      */
-    String resolveBaseMethodName(TypeMirror parameterType) {
+    public String resolveBaseMethodName(TypeMirror parameterType) {
         if (parameterType.getKind().isPrimitive()) {
             return resolvePrimitiveMethodName(parameterType);
         } else if (parameterType.getKind() == TypeKind.ARRAY) {
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
index 8ea3d12..93ae44b 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
@@ -30,6 +30,7 @@ import javax.lang.model.element.Modifier;
 import javax.tools.Diagnostic;
 import org.apache.ignite.internal.network.processor.MessageClass;
 import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
+import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.serialization.MessageDeserializer;
 import org.apache.ignite.network.serialization.MessageMappingException;
 import org.apache.ignite.network.serialization.MessageReader;
@@ -153,8 +154,14 @@ public class MessageDeserializerGenerator {
     private CodeBlock readMessageCodeBlock(ExecutableElement getter, FieldSpec msgField) {
         var methodResolver = new MessageReaderMethodResolver(processingEnv);
 
+        String name = getter.getSimpleName().toString();
+
+        if (getter.getAnnotation(Marshallable.class) != null) {
+            name += "ByteArray";
+        }
+
         return CodeBlock.builder()
-                .add("$N.$N(reader.", msgField, getter.getSimpleName())
+                .add("$N.$N(reader.", msgField, name)
                 .add(methodResolver.resolveReadMethod(getter))
                 .add(")")
                 .build();
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
index 13e9ae2..925c3f0 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
@@ -60,8 +60,9 @@ class MessageReaderMethodResolver {
         String parameterName = getter.getSimpleName().toString();
 
         if (getter.getAnnotation(Marshallable.class) != null) {
+            parameterName += "ByteArray";
             return CodeBlock.builder()
-                    .add("readMarshallable($S)", parameterName)
+                    .add("readByteArray($S)", parameterName)
                     .build();
         }
 
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
index 1926e8e..7d579e4 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
@@ -81,10 +81,12 @@ public class MessageSerializerGenerator {
                 .addAnnotation(Override.class)
                 .addModifiers(Modifier.PUBLIC)
                 .returns(boolean.class)
-                .addParameter(message.className(), "message")
+                .addParameter(message.className(), "msg")
                 .addParameter(MessageWriter.class, "writer")
                 .addException(MessageMappingException.class);
 
+        method.addStatement("$T message = ($T) msg", message.implClassName(), message.implClassName()).addCode("\n");
+
         List<ExecutableElement> getters = message.getters();
 
         method
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java
index 38aeb5e..29d973c 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java
@@ -35,7 +35,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
 /**
  * Class for resolving {@link MessageWriter} "write*" methods for the corresponding message field type.
  */
-class MessageWriterMethodResolver {
+public class MessageWriterMethodResolver {
     /** Method name resolver. */
     private final BaseMethodNameResolver methodNameResolver;
 
@@ -47,7 +47,7 @@ class MessageWriterMethodResolver {
      *
      * @param processingEnvironment Processing environment.
      */
-    MessageWriterMethodResolver(ProcessingEnvironment processingEnvironment) {
+    public MessageWriterMethodResolver(ProcessingEnvironment processingEnvironment) {
         methodNameResolver = new BaseMethodNameResolver(processingEnvironment);
         typeConverter = new MessageCollectionItemTypeConverter(processingEnvironment);
     }
@@ -72,8 +72,9 @@ class MessageWriterMethodResolver {
         String parameterName = getter.getSimpleName().toString();
 
         if (getter.getAnnotation(Marshallable.class) != null) {
+            parameterName += "ByteArray";
             return CodeBlock.builder()
-                    .add("writeMarshallable($S, message.$L())", parameterName, parameterName)
+                    .add("writeByteArray($S, message.$L())", parameterName, parameterName)
                     .build();
         }
 
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
index d0becd8..08c2534 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.network;
 
+import java.util.Collections;
+import java.util.Set;
+
 /**
  * Message for exchanging information in a cluster.
  */
@@ -41,4 +44,12 @@ public interface NetworkMessage {
      * @return group type.
      */
     short groupType();
+
+    default Set<Integer> beforeRead(Set<Integer> ids, Object marshaller) throws Exception {
+        return Collections.emptySet();
+    }
+
+    default void afterRead(Object marshaller, Object descriptors) throws Exception {
+        // No-op.
+    }
 }
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 447089b..9d92908 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -34,6 +34,7 @@ import java.net.InetSocketAddress;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -53,6 +54,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 import org.apache.ignite.network.TestMessage;
 import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
 import org.apache.ignite.network.TestMessagesFactory;
@@ -108,13 +110,13 @@ public class ItConnectionManagerTest {
 
         var fut = new CompletableFuture<NetworkMessage>();
 
-        manager2.addListener((consistentId, message) -> fut.complete(message));
+        manager2.addListener((obj) -> fut.complete(obj.getMessage()));
 
         NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
 
         TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
 
-        sender.send(testMessage).get(3, TimeUnit.SECONDS);
+        sender.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
         NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
 
@@ -140,7 +142,7 @@ public class ItConnectionManagerTest {
 
         var fut = new CompletableFuture<NetworkMessage>();
 
-        manager1.addListener((consistentId, message) -> fut.complete(message));
+        manager1.addListener((obj) -> fut.complete(obj.getMessage()));
 
         NettySender senderFrom1to2 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
 
@@ -150,9 +152,9 @@ public class ItConnectionManagerTest {
         var messageReceivedOn2 = new CompletableFuture<Void>();
 
         // If the message is received, that means that the handshake was successfully performed.
-        manager2.addListener((consistentId, message) -> messageReceivedOn2.complete(null));
+        manager2.addListener((message) -> messageReceivedOn2.complete(null));
 
-        senderFrom1to2.send(testMessage);
+        senderFrom1to2.send(new NetworkObject(testMessage, Collections.emptyList()));
 
         messageReceivedOn2.get(3, TimeUnit.SECONDS);
 
@@ -164,7 +166,7 @@ public class ItConnectionManagerTest {
 
         assertEquals(clientLocalAddress, clientRemoteAddress);
 
-        senderFrom2to1.send(testMessage).get(3, TimeUnit.SECONDS);
+        senderFrom2to1.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
         NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
 
@@ -231,7 +233,7 @@ public class ItConnectionManagerTest {
 
         assertThrows(ClosedChannelException.class, () -> {
             try {
-                finalSender.send(testMessage).get(3, TimeUnit.SECONDS);
+                finalSender.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
             } catch (Exception e) {
                 throw e.getCause();
             }
@@ -241,11 +243,11 @@ public class ItConnectionManagerTest {
 
         var fut = new CompletableFuture<NetworkMessage>();
 
-        manager2.get1().addListener((consistentId, message) -> fut.complete(message));
+        manager2.get1().addListener((obj) -> fut.complete(obj.getMessage()));
 
         sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
 
-        sender.send(testMessage).get(3, TimeUnit.SECONDS);
+        sender.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
         NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
 
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
index 6ad6231..53b2463 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.mock;
 
 import io.netty.channel.Channel;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -52,6 +53,8 @@ import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
+import org.apache.ignite.network.TestMessage;
 import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
 import org.apache.ignite.network.TestMessagesFactory;
 import org.junit.jupiter.api.AfterEach;
@@ -130,7 +133,8 @@ public class ItRecoveryHandshakeTest {
         assertNotNull(from2to1);
 
         // Ensure the handshake has finished on both sides.
-        from2to1.send(messageFactory.testMessage().msg("test").build()).get(3, TimeUnit.SECONDS);
+        TestMessage msg = messageFactory.testMessage().msg("test").build();
+        from2to1.send(new NetworkObject(msg, Collections.emptyList())).get(3, TimeUnit.SECONDS);
 
         NettySender from1to2 = manager1.channel(manager2.consistentId(), manager2.getLocalAddress()).get(3, TimeUnit.SECONDS);
 
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index 87c215f..fd52931 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.network.NetworkMessageTypes;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
@@ -294,12 +294,16 @@ class ItScaleCubeNetworkMessagingTest {
         // register a different handle for the second group
         node1.messagingService().addMessageHandler(
                 NetworkMessageTypes.class,
-                (message, senderAddr, correlationId) -> assertTrue(networkMessageFuture.complete(message))
+                (message, senderAddr, correlationId) -> {
+                    if (message instanceof FieldDescriptorMessage) {
+                        assertTrue(networkMessageFuture.complete(message));
+                    }
+                }
         );
 
         var testMessage = messageFactory.testMessage().msg("foo").build();
 
-        ClassDescriptorMessage networkMessage = new NetworkMessagesFactory().classDescriptorMessage().build();
+        FieldDescriptorMessage networkMessage = new NetworkMessagesFactory().fieldDescriptorMessage().build();
 
         // test that a message gets delivered to both handlers
         node2.messagingService()
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
index bae7c7d..ee0df8b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.network;
 
+import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
 import org.apache.ignite.internal.network.message.InvokeRequest;
@@ -65,4 +66,9 @@ public class NetworkMessageTypes {
      * Type for {@link FieldDescriptorMessage}.
      */
     public static final short FIELD_DESCRIPTOR_MESSAGE = 6;
+
+    /**
+     * Type for {@link ClassDescriptorListMessage}.
+     */
+    public static final short CLASS_DESCRIPTOR_LIST_MESSAGE = 7;
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClassDescriptorListMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClassDescriptorListMessage.java
new file mode 100644
index 0000000..bfc7b97
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClassDescriptorListMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.message;
+
+import java.util.Collection;
+import org.apache.ignite.internal.network.NetworkMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+@Transferable(NetworkMessageTypes.CLASS_DESCRIPTOR_LIST_MESSAGE)
+public interface ClassDescriptorListMessage extends NetworkMessage {
+
+    Collection<ClassDescriptorMessage> messages();
+
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index d8f8967..a95d285 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -31,7 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 import org.apache.ignite.configuration.schemas.network.NetworkView;
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -71,7 +70,7 @@ public class ConnectionManager {
     private final SerializationService serializationService;
 
     /** Message listeners. */
-    private final List<BiConsumer<String, NetworkMessage>> listeners = new CopyOnWriteArrayList<>();
+    private final List<Consumer<InNetworkObject>> listeners = new CopyOnWriteArrayList<>();
 
     /** Node consistent id. */
     private final String consistentId;
@@ -197,11 +196,10 @@ public class ConnectionManager {
     /**
      * Callback that is called upon receiving a new message.
      *
-     * @param consistentId Consistent id of the message's sender.
      * @param message New message.
      */
-    private void onMessage(String consistentId, NetworkMessage message) {
-        listeners.forEach(consumer -> consumer.accept(consistentId, message));
+    private void onMessage(InNetworkObject message) {
+        listeners.forEach(consumer -> consumer.accept(message));
     }
 
     /**
@@ -243,7 +241,7 @@ public class ConnectionManager {
      *
      * @param listener Message listener.
      */
-    public void addListener(BiConsumer<String, NetworkMessage> listener) {
+    public void addListener(Consumer<InNetworkObject> listener) {
         listeners.add(listener);
     }
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
similarity index 51%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
index cc72427..85fb883 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
@@ -17,37 +17,32 @@
 
 package org.apache.ignite.internal.network.netty;
 
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import java.util.function.BiConsumer;
+import org.apache.ignite.internal.network.serialization.CompositeDescriptorRegistry;
 import org.apache.ignite.network.NetworkMessage;
 
-/**
- * Network message handler that delegates handling to {@link #messageListener}.
- */
-public class MessageHandler extends ChannelInboundHandlerAdapter {
-    /** Message listener. */
-    private final BiConsumer<String, NetworkMessage> messageListener;
+public class InNetworkObject {
+
+    private final NetworkMessage message;
 
-    /** Consistent id of the remote node. */
     private final String consistentId;
 
-    /**
-     * Constructor.
-     *
-     * @param messageListener Message listener.
-     * @param consistentId Consistent id of the remote node.
-     */
-    public MessageHandler(BiConsumer<String, NetworkMessage> messageListener, String consistentId) {
-        this.messageListener = messageListener;
+    private final CompositeDescriptorRegistry registry;
+    
+    public InNetworkObject(NetworkMessage message, String consistentId, CompositeDescriptorRegistry registry) {
+        this.message = message;
         this.consistentId = consistentId;
+        this.registry = registry;
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        NetworkMessage message = (NetworkMessage) msg;
+    public NetworkMessage getMessage() {
+        return message;
+    }
+
+    public String getConsistentId() {
+        return consistentId;
+    }
 
-        messageListener.accept(consistentId, message);
+    public CompositeDescriptorRegistry getRegistry() {
+        return registry;
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
index 49c2ed0..cd1bc2b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.ignite.internal.network.direct.DirectMarshallingUtils;
 import org.apache.ignite.internal.network.direct.DirectMessageReader;
+import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
 import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.NetworkMessage;
@@ -107,7 +108,13 @@ public class InboundDecoder extends ByteToMessageDecoder {
                     reader.reset();
                     messageAttr.set(null);
 
-                    out.add(msg.getMessage());
+                    NetworkMessage message = msg.getMessage();
+
+                    if (message instanceof ClassDescriptorListMessage) {
+                        onClassDescriptorMessage((ClassDescriptorListMessage) message);
+                    } else {
+                        out.add(message);
+                    }
                 } else {
                     messageAttr.set(msg);
                 }
@@ -133,4 +140,8 @@ public class InboundDecoder extends ByteToMessageDecoder {
             }
         }
     }
+
+    private void onClassDescriptorMessage(ClassDescriptorListMessage msg) {
+        serializationService.mergeDescriptors(msg.messages());
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
index cc72427..c587071 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
@@ -19,7 +19,8 @@ package org.apache.ignite.internal.network.netty;
 
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.network.NetworkMessage;
 
 /**
@@ -27,20 +28,23 @@ import org.apache.ignite.network.NetworkMessage;
  */
 public class MessageHandler extends ChannelInboundHandlerAdapter {
     /** Message listener. */
-    private final BiConsumer<String, NetworkMessage> messageListener;
+    private final Consumer<InNetworkObject> messageListener;
 
     /** Consistent id of the remote node. */
     private final String consistentId;
 
+    private final PerSessionSerializationService serializationService;
+
     /**
      * Constructor.
-     *
-     * @param messageListener Message listener.
+     *  @param messageListener Message listener.
      * @param consistentId Consistent id of the remote node.
+     * @param serializationService
      */
-    public MessageHandler(BiConsumer<String, NetworkMessage> messageListener, String consistentId) {
+    public MessageHandler(Consumer<InNetworkObject> messageListener, String consistentId, PerSessionSerializationService serializationService) {
         this.messageListener = messageListener;
         this.consistentId = consistentId;
+        this.serializationService = serializationService;
     }
 
     /** {@inheritDoc} */
@@ -48,6 +52,6 @@ public class MessageHandler extends ChannelInboundHandlerAdapter {
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
         NetworkMessage message = (NetworkMessage) msg;
 
-        messageListener.accept(consistentId, message);
+        messageListener.accept(new InNetworkObject(message, consistentId, serializationService.compositeDescriptorRegistry()));
     }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
index 1b1e4c7..7983e40 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
@@ -25,13 +25,12 @@ import io.netty.handler.stream.ChunkedWriteHandler;
 import java.net.SocketAddress;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.ignite.internal.network.handshake.HandshakeManager;
 import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -59,7 +58,7 @@ public class NettyClient {
     private volatile Channel channel = null;
 
     /** Message listener. */
-    private final BiConsumer<String, NetworkMessage> messageListener;
+    private final Consumer<InNetworkObject> messageListener;
 
     /** Handshake manager. */
     private final HandshakeManager handshakeManager;
@@ -79,7 +78,7 @@ public class NettyClient {
             SocketAddress address,
             SerializationService serializationService,
             HandshakeManager manager,
-            BiConsumer<String, NetworkMessage> messageListener
+            Consumer<InNetworkObject> messageListener
     ) {
         this.address = address;
         this.serializationService = serializationService;
@@ -113,7 +112,9 @@ public class NettyClient {
 
                     ch.pipeline().addLast(
                             new InboundDecoder(sessionSerializationService),
-                            new HandshakeHandler(handshakeManager, (consistentId) -> new MessageHandler(messageListener, consistentId)),
+                            new HandshakeHandler(handshakeManager,
+                                    (consistentId) -> new MessageHandler(messageListener, consistentId, sessionSerializationService)
+                            ),
                             new ChunkedWriteHandler(),
                             new OutboundEncoder(sessionSerializationService),
                             new IoExceptionSuppressingHandler()
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
index 6185994..46a598f 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
@@ -21,7 +21,7 @@ import io.netty.channel.Channel;
 import io.netty.handler.stream.ChunkedInput;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.direct.DirectMessageWriter;
-import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -56,8 +56,8 @@ public class NettySender {
      * @param msg Network message.
      * @return Future of the send operation.
      */
-    public CompletableFuture<Void> send(NetworkMessage msg) {
-        return NettyUtils.toCompletableFuture(channel.writeAndFlush(msg));
+    public CompletableFuture<Void> send(NetworkObject obj) {
+        return NettyUtils.toCompletableFuture(channel.writeAndFlush(obj));
     }
 
     /**
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index 89e7f8c..7050d8e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -27,7 +27,6 @@ import io.netty.handler.stream.ChunkedWriteHandler;
 import java.net.SocketAddress;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -37,7 +36,6 @@ import org.apache.ignite.internal.network.serialization.PerSessionSerializationS
 import org.apache.ignite.internal.network.serialization.SerializationService;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
@@ -58,7 +56,7 @@ public class NettyServer {
     private final SerializationService serializationService;
 
     /** Incoming message listener. */
-    private final BiConsumer<String, NetworkMessage> messageListener;
+    private final Consumer<InNetworkObject> messageListener;
 
     /** Handshake manager. */
     private final Supplier<HandshakeManager> handshakeManager;
@@ -94,7 +92,7 @@ public class NettyServer {
             NetworkView configuration,
             Supplier<HandshakeManager> handshakeManager,
             Consumer<NettySender> newConnectionListener,
-            BiConsumer<String, NetworkMessage> messageListener,
+            Consumer<InNetworkObject> messageListener,
             SerializationService serializationService,
             NettyBootstrapFactory bootstrapFactory
     ) {
@@ -139,7 +137,9 @@ public class NettyServer {
                                      */
                                     new InboundDecoder(sessionSerializationService),
                                     // Handshake handler.
-                                    new HandshakeHandler(manager, (consistentId) -> new MessageHandler(messageListener, consistentId)),
+                                    new HandshakeHandler(manager,
+                                            (consistentId) -> new MessageHandler(messageListener, consistentId, sessionSerializationService)
+                                    ),
                                     /*
                                      * Encoder that uses the MessageWriter
                                      * to write chunked data.
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index c629084..fa6f04b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -24,15 +24,20 @@ import io.netty.handler.codec.MessageToMessageEncoder;
 import io.netty.handler.stream.ChunkedInput;
 import java.nio.ByteBuffer;
 import java.util.List;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
 import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 import org.apache.ignite.network.serialization.MessageSerializer;
 
 /**
  * An encoder for the outbound messages that uses {@link DirectMessageWriter}.
  */
-public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
+public class OutboundEncoder extends MessageToMessageEncoder<NetworkObject> {
+    private static final NetworkMessagesFactory MSG_FACTORY = new NetworkMessagesFactory();
+
     /** Serialization registry. */
     private final PerSessionSerializationService serializationService;
 
@@ -47,7 +52,7 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
 
     /** {@inheritDoc} */
     @Override
-    protected void encode(ChannelHandlerContext ctx, NetworkMessage msg, List<Object> out) throws Exception {
+    protected void encode(ChannelHandlerContext ctx, NetworkObject msg, List<Object> out) throws Exception {
         out.add(new NetworkMessageChunkedInput(msg, serializationService));
     }
 
@@ -61,11 +66,17 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
         /** Message serializer. */
         private final MessageSerializer<NetworkMessage> serializer;
 
+        private final MessageSerializer<ClassDescriptorListMessage> descriptorSerializer;
+
         /** Message writer. */
         private final DirectMessageWriter writer;
 
+        private final ClassDescriptorListMessage descriptors;
+        private final PerSessionSerializationService serializationService;
+
         /** Whether the message was fully written. */
         private boolean finished = false;
+        private boolean descriptorsFinished = false;
 
         /**
          * Constructor.
@@ -74,10 +85,23 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
          * @param serializationService Serialization service.
          */
         private NetworkMessageChunkedInput(
-                NetworkMessage msg,
+                NetworkObject object,
                 PerSessionSerializationService serializationService
         ) {
-            this.msg = msg;
+            this.serializationService = serializationService;
+            this.msg = object.getNetworkMessage();
+
+            if (!object.getDescriptors().isEmpty()) {
+                descriptors = MSG_FACTORY.classDescriptorListMessage().messages(object.getDescriptors()).build();
+                short groupType = descriptors.groupType();
+                short messageType = descriptors.messageType();
+                descriptorSerializer = serializationService.createMessageSerializer(groupType, messageType);
+            } else {
+                descriptors = null;
+                descriptorSerializer = null;
+                descriptorsFinished = true;
+            }
+
             this.serializer = serializationService.createMessageSerializer(msg.groupType(), msg.messageType());
             this.writer = new DirectMessageWriter(serializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
         }
@@ -113,7 +137,14 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
 
             writer.setBuffer(byteBuffer);
 
-            finished = serializer.writeMessage(msg, writer);
+            if (!descriptorsFinished) {
+                descriptorsFinished = descriptorSerializer.writeMessage(descriptors, writer);
+                if (descriptorsFinished) {
+                    writer.reset();
+                }
+            } else {
+                finished = serializer.writeMessage(msg, writer);
+            }
 
             buffer.writerIndex(byteBuffer.position() - initialPosition);
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 5f37db8..8c29293 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.recovery;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 
 /**
  * Recovery protocol handshake manager for a client.
@@ -78,7 +80,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
                     .connectionsCount(0)
                     .build();
 
-            ChannelFuture sendFuture = channel.writeAndFlush(response);
+            ChannelFuture sendFuture = channel.writeAndFlush(new NetworkObject(response, Collections.emptyList()));
 
             NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
                 if (throwable != null) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index 4954981..bb0b5f3 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.recovery;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
 import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
 
 /**
  * Recovery protocol handshake manager for a server.
@@ -76,7 +78,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
                 .consistentId(consistentId)
                 .build();
 
-        ChannelFuture sendFuture = channel.writeAndFlush(handshakeStartMessage);
+        ChannelFuture sendFuture = channel.writeAndFlush(new NetworkObject(handshakeStartMessage, Collections.emptyList()));
 
         NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
             if (throwable != null) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
index 288b51e..4ce1381 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
@@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -184,6 +185,14 @@ public class PerSessionSerializationService {
         return messages;
     }
 
+    public boolean isSent(int descriptorId) {
+        return sentDescriptors.contains(descriptorId);
+    }
+
+    public void addSent(int descriptorId) {
+        sentDescriptors.add(descriptorId);
+    }
+
     private byte fieldFlags(FieldDescriptor fieldDescriptor) {
         int bits = condMask(fieldDescriptor.isUnshared(), FieldDescriptorMessage.UNSHARED_MASK)
                 | condMask(fieldDescriptor.isPrimitive(), FieldDescriptorMessage.IS_PRIMITIVE)
@@ -232,7 +241,7 @@ public class PerSessionSerializationService {
         return id;
     }
 
-    private void mergeDescriptors(List<ClassDescriptorMessage> remoteDescriptors) {
+    public void mergeDescriptors(Collection<ClassDescriptorMessage> remoteDescriptors) {
         List<ClassDescriptorMessage> leftToProcess = remoteDescriptors.stream()
                 .filter(classMessage -> !knownMergedDescriptor(classMessage.descriptorId()))
                 .collect(toCollection(LinkedList::new));
@@ -378,6 +387,11 @@ public class PerSessionSerializationService {
         }
     }
 
+    public CompositeDescriptorRegistry compositeDescriptorRegistry() {
+        return descriptors;
+    }
+
+
     @TestOnly
     Map<Integer, ClassDescriptor> getDescriptorMapView() {
         return mergedIdToDescriptorMap;
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 9d8965d..2ee7961 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -18,22 +18,40 @@
 package org.apache.ignite.network;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.network.NettyBootstrapFactory.isInNetworkThread;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
 import org.apache.ignite.internal.network.message.InvokeRequest;
 import org.apache.ignite.internal.network.message.InvokeResponse;
-import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.InNetworkObject;
 import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.internal.network.serialization.ClassDescriptor;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.CompositeDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.FieldDescriptor;
+import org.apache.ignite.internal.network.serialization.Serialization;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.jetbrains.annotations.Nullable;
 
@@ -42,9 +60,15 @@ public class DefaultMessagingService extends AbstractMessagingService {
     /** Network messages factory. */
     private final NetworkMessagesFactory factory;
 
+    /** Integer value that is sent when there is no descriptor. */
+    private static final int NO_DESCRIPTOR_ID = Integer.MIN_VALUE;
+
     /** Topology service. */
     private final TopologyService topologyService;
 
+    private final UserObjectMarshaller marshaller;
+    private final ClassDescriptorRegistry classDescriptorRegistry;
+
     /** Connection manager that provides access to {@link NettySender}. */
     private volatile ConnectionManager connectionManager;
 
@@ -75,6 +99,9 @@ public class DefaultMessagingService extends AbstractMessagingService {
     public DefaultMessagingService(NetworkMessagesFactory factory, TopologyService topologyService) {
         this.factory = factory;
         this.topologyService = topologyService;
+        UserObjectSerializationContext userObjectSerializationContext = createUserObjectSerializationContext();
+        this.marshaller = userObjectSerializationContext.marshaller();
+        this.classDescriptorRegistry = userObjectSerializationContext.descriptorRegistry();
     }
 
     /**
@@ -158,7 +185,24 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         String recipientConsistentId = recipient != null ? recipient.name() : address.consistentId();
 
-        return connectionManager.channel(recipientConsistentId, addr).thenCompose(sender -> sender.send(message));
+        return this.sendMessage0(message, recipientConsistentId, addr);
+    }
+
+    private CompletableFuture<Void> sendMessage0(NetworkMessage message, String recipientConsistentId, InetSocketAddress addr) {
+        if (isInNetworkThread()) {
+            return CompletableFuture.supplyAsync(() -> sendMessage0(message, recipientConsistentId, addr), svc).thenCompose(fut -> fut);
+        }
+
+        List<ClassDescriptorMessage> descriptors;
+
+        try {
+            descriptors = beforeRead(message);
+        } catch (Exception e) {
+            return CompletableFuture.failedFuture(e);
+        }
+
+        return connectionManager.channel(recipientConsistentId, addr).thenCompose(
+                sender -> sender.send(new NetworkObject(message, descriptors)));
     }
 
     /**
@@ -194,8 +238,95 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         String recipientConsistentId = recipient != null ? recipient.name() : addr.consistentId();
 
-        return connectionManager.channel(recipientConsistentId, address).thenCompose(sender -> sender.send(message))
-                .thenCompose(unused -> responseFuture);
+        return sendMessage0(message, recipientConsistentId, address).thenCompose(unused -> responseFuture);
+    }
+
+    private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws Exception {
+        Set<Integer> ids = msg.beforeRead(new HashSet<>(), marshaller);
+
+        return createClassDescriptorsMessages(ids);
+    }
+
+    public List<ClassDescriptorMessage> createClassDescriptorsMessages(Set<Integer> descriptorIds) {
+        List<ClassDescriptorMessage> messages = descriptorIds.stream()
+                .map(classDescriptorRegistry::getDescriptor)
+                .map(descriptor -> {
+                    List<FieldDescriptorMessage> fields = descriptor.fields().stream()
+                            .map(d -> {
+                                return factory.fieldDescriptorMessage()
+                                    .name(d.name())
+                                    .typeDescriptorId(d.typeDescriptorId())
+                                    .className(d.typeName())
+                                    .flags(fieldFlags(d))
+                                    .build();
+                            })
+                            .collect(toList());
+
+                    Serialization serialization = descriptor.serialization();
+
+                    return factory.classDescriptorMessage()
+                        .fields(fields)
+                        .serializationType((byte) serialization.type().value())
+                        .serializationFlags(serializationAttributeFlags(serialization))
+                        .descriptorId(descriptor.descriptorId())
+                        .className(descriptor.className())
+                        .superClassDescriptorId(superClassDescriptorIdForMessage(descriptor))
+                        .superClassName(descriptor.superClassName())
+                        .componentTypeDescriptorId(componentTypeDescriptorIdForMessage(descriptor))
+                        .componentTypeName(descriptor.componentTypeName())
+                        .attributes(classDescriptorAttributeFlags(descriptor))
+                        .build();
+                }).collect(toList());
+
+        return messages;
+    }
+
+    private byte fieldFlags(FieldDescriptor fieldDescriptor) {
+        int bits = condMask(fieldDescriptor.isUnshared(), FieldDescriptorMessage.UNSHARED_MASK)
+                | condMask(fieldDescriptor.isPrimitive(), FieldDescriptorMessage.IS_PRIMITIVE)
+                | condMask(fieldDescriptor.isRuntimeTypeKnownUpfront(), FieldDescriptorMessage.IS_RUNTIME_TYPE_KNOWN_UPFRONT);
+        return (byte) bits;
+    }
+
+    private byte serializationAttributeFlags(Serialization serialization) {
+        int bits = condMask(serialization.hasWriteObject(), ClassDescriptorMessage.HAS_WRITE_OBJECT_MASK)
+                | condMask(serialization.hasReadObject(), ClassDescriptorMessage.HAS_READ_OBJECT_MASK)
+                | condMask(serialization.hasReadObjectNoData(), ClassDescriptorMessage.HAS_READ_OBJECT_NO_DATA_MASK)
+                | condMask(serialization.hasWriteReplace(), ClassDescriptorMessage.HAS_WRITE_REPLACE_MASK)
+                | condMask(serialization.hasReadResolve(), ClassDescriptorMessage.HAS_READ_RESOLVE_MASK);
+        return (byte) bits;
+    }
+
+    private int condMask(boolean value, int mask) {
+        return value ? mask : 0;
+    }
+
+    private byte classDescriptorAttributeFlags(ClassDescriptor descriptor) {
+        int bits = condMask(descriptor.isPrimitive(), ClassDescriptorMessage.IS_PRIMITIVE_MASK)
+                | condMask(descriptor.isArray(), ClassDescriptorMessage.IS_ARRAY_MASK)
+                | condMask(descriptor.isRuntimeEnum(), ClassDescriptorMessage.IS_RUNTIME_ENUM_MASK)
+                | condMask(descriptor.isRuntimeTypeKnownUpfront(), ClassDescriptorMessage.IS_RUNTIME_TYPE_KNOWN_UPFRONT_MASK);
+        return (byte) bits;
+    }
+
+    private int superClassDescriptorIdForMessage(ClassDescriptor descriptor) {
+        Integer id = descriptor.superClassDescriptorId();
+
+        if (id == null) {
+            return NO_DESCRIPTOR_ID;
+        }
+
+        return id;
+    }
+
+    private int componentTypeDescriptorIdForMessage(ClassDescriptor descriptor) {
+        Integer id = descriptor.componentTypeDescriptorId();
+
+        if (id == null) {
+            return NO_DESCRIPTOR_ID;
+        }
+
+        return id;
     }
 
     /**
@@ -212,18 +343,28 @@ public class DefaultMessagingService extends AbstractMessagingService {
         }
     }
 
+    private final ExecutorService svc = Executors.newSingleThreadExecutor();
+
     /**
      * Handles an incoming messages.
      *
      * @param consistentId Sender's consistent id.
      * @param msg Incoming message.
      */
-    private void onMessage(String consistentId, NetworkMessage msg) {
-        if (msg instanceof ScaleCubeMessage) {
-            // ScaleCube messages are handled in the ScaleCubeTransport
+    private void onMessage(InNetworkObject obj) {
+        if (isInNetworkThread()) {
+            svc.submit(() -> onMessage(obj));
             return;
         }
 
+        NetworkMessage msg = obj.getMessage();
+        CompositeDescriptorRegistry registry = obj.getRegistry();
+        String consistentId = obj.getConsistentId();
+        try {
+            msg.afterRead(marshaller, registry);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
         if (msg instanceof InvokeResponse) {
             InvokeResponse response = (InvokeResponse) msg;
             onInvokeResponse(response.message(), response.correlationId());
@@ -345,4 +486,14 @@ public class DefaultMessagingService extends AbstractMessagingService {
 
         requestsMap.clear();
     }
+
+    private UserObjectSerializationContext createUserObjectSerializationContext() {
+        var userObjectDescriptorRegistry = new ClassDescriptorRegistry();
+        var userObjectDescriptorFactory = new ClassDescriptorFactory(userObjectDescriptorRegistry);
+
+        var userObjectMarshaller = new DefaultUserObjectMarshaller(userObjectDescriptorRegistry, userObjectDescriptorFactory);
+
+        return new UserObjectSerializationContext(userObjectDescriptorRegistry, userObjectDescriptorFactory,
+            userObjectMarshaller);
+    }
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
index 0d11ccf..a4b7fa8 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
@@ -139,6 +139,22 @@ public class NettyBootstrapFactory implements IgniteComponent {
         clientWorkerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-client");
     }
 
+    public static boolean isInNetworkThread() {
+        String name = Thread.currentThread().getName();
+
+        if (name.contains("-srv-worker")) {
+            return true;
+        }
+        if (name.contains("-client")) {
+            return true;
+        }
+        if (name.contains("-srv-accept")) {
+            return true;
+        }
+
+        return false;
+    }
+
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkObject.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkObject.java
new file mode 100644
index 0000000..9b0d7fb
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkObject.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.List;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+
+public class NetworkObject {
+
+    private final NetworkMessage networkMessage;
+    private final List<ClassDescriptorMessage> descriptors;
+
+    public NetworkObject(NetworkMessage networkMessage, List<ClassDescriptorMessage> descriptors) {
+        this.networkMessage = networkMessage;
+        this.descriptors = descriptors;
+    }
+
+    public NetworkMessage getNetworkMessage() {
+        return networkMessage;
+    }
+
+    public List<ClassDescriptorMessage> getDescriptors() {
+        return descriptors;
+    }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index e1faa56..f8f77be 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -104,7 +104,9 @@ public class ScaleCubeClusterServiceFactory {
                         nettyBootstrapFactory
                 );
 
-                var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr, topologyService, messageFactory);
+                connectionMgr.start();
+
+                var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr.getLocalAddress(), messagingService, topologyService, messageFactory);
 
                 NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder());
 
@@ -122,8 +124,6 @@ public class ScaleCubeClusterServiceFactory {
 
                 shutdownFuture = cluster.onShutdown().toFuture();
 
-                connectionMgr.start();
-
                 // resolve cyclic dependencies
                 topologyService.setCluster(cluster);
                 messagingService.setConnectionManager(connectionMgr);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index ad763de..e263467 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Map;
 import java.util.Objects;
+import org.apache.ignite.internal.network.NetworkMessageTypes;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.message.ScaleCubeMessage;
 import org.apache.ignite.internal.network.message.ScaleCubeMessageBuilder;
@@ -32,6 +33,7 @@ import org.apache.ignite.internal.network.netty.ConnectionManager;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
@@ -63,7 +65,7 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
     private final MonoProcessor<Void> onStop = MonoProcessor.create();
 
     /** Connection manager. */
-    private final ConnectionManager connectionManager;
+    private final MessagingService messagingService;
 
     /** Message factory. */
     private final NetworkMessagesFactory messageFactory;
@@ -82,15 +84,17 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
      * @param messageFactory    message factory
      */
     ScaleCubeDirectMarshallerTransport(
-            ConnectionManager connectionManager,
+            SocketAddress localAddress,
+            MessagingService messagingService,
             ScaleCubeTopologyService topologyService,
             NetworkMessagesFactory messageFactory
     ) {
-        this.connectionManager = connectionManager;
+        this.address = prepareAddress(localAddress);
+        this.messagingService = messagingService;
         this.topologyService = topologyService;
         this.messageFactory = messageFactory;
 
-        this.connectionManager.addListener(this::onMessage);
+        this.messagingService.addMessageHandler(NetworkMessageTypes.class, (message, senderAddr, correlationId) -> onMessage(message));
         // Setup cleanup
         stop.then(doStop())
                 .doFinally(s -> onStop.onComplete())
@@ -145,8 +149,6 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
     /** {@inheritDoc} */
     @Override
     public Mono<Transport> start() {
-        address = prepareAddress(connectionManager.getLocalAddress());
-
         return Mono.just(this);
     }
 
@@ -171,21 +173,23 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
         var addr = InetSocketAddress.createUnresolved(address.host(), address.port());
 
         return Mono.fromFuture(() -> {
-            ClusterNode node = topologyService.getByAddress(NetworkAddress.from(addr));
+            NetworkAddress networkAddress = NetworkAddress.from(addr);
+            ClusterNode node = topologyService.getByAddress(networkAddress);
 
-            String consistentId = node != null ? node.name() : null;
+            if (node == null) {
+                node = new ClusterNode(null, null, networkAddress);
+            }
 
-            return connectionManager.channel(consistentId, addr).thenCompose(client -> client.send(fromMessage(message)));
+            return messagingService.send(node, fromMessage(message));
         });
     }
 
     /**
      * Handles new network messages from {@link #connectionManager}.
      *
-     * @param senderConsistentId Sender's consistent id.
      * @param msg    Network message.
      */
-    private void onMessage(String senderConsistentId, NetworkMessage msg) {
+    private void onMessage(NetworkMessage msg) {
         Message message = fromNetworkMessage(msg);
 
         if (message != null) {
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index cc6cfb0..54ab776 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -168,7 +168,7 @@ public class NettyClientTest {
                 address,
                 null,
                 new MockClientHandshakeManager(channel),
-                (address1, message) -> {
+                (message) -> {
                 }
         );
 
@@ -190,7 +190,7 @@ public class NettyClientTest {
                 address,
                 null,
                 new MockClientHandshakeManager(future.channel()),
-                (address1, message) -> {
+                (message) -> {
                 }
         );
 
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index e59db1a..4e84a89 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -183,7 +183,7 @@ public class NettyServerTest {
                 () -> handshakeManager,
                 sender -> {
                 },
-                (socketAddress, message) -> {
+                (message) -> {
                 },
                 new SerializationService(registry, mock(UserObjectSerializationContext.class)),
                 bootstrapFactory
diff --git a/modules/raft/pom.xml b/modules/raft/pom.xml
index 5d88ad5..2a3e2bc 100644
--- a/modules/raft/pom.xml
+++ b/modules/raft/pom.xml
@@ -118,7 +118,6 @@
         <dependency>
             <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-network</artifactId>
-            <scope>test</scope>
         </dependency>
 
         <dependency>
diff --git a/modules/transactions/pom.xml b/modules/transactions/pom.xml
index 7ffc632..26155ec 100644
--- a/modules/transactions/pom.xml
+++ b/modules/transactions/pom.xml
@@ -50,6 +50,11 @@
 
     <dependency>
       <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-network</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
       <artifactId>ignite-raft-client</artifactId>
     </dependency>
 

[ignite-3] 08/12: .

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit cbc4ec8e63ab0c8bdc387181c686333aabaa28d2
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Fri Mar 4 10:52:18 2022 +0300

    .
---
 .../network/serialization/MessageReader.java       | 10 ---
 .../network/serialization/MessageWriter.java       | 11 ---
 .../network/direct/DirectMessageReader.java        | 12 ---
 .../network/direct/DirectMessageWriter.java        | 10 ---
 .../direct/stream/DirectByteBufferStream.java      | 19 -----
 .../stream/DirectByteBufferStreamImplV1.java       | 99 ----------------------
 6 files changed, 161 deletions(-)

diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageReader.java b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageReader.java
index 2289152..712f536 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageReader.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageReader.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 
 /**
@@ -274,15 +273,6 @@ public interface MessageReader {
             MessageCollectionItemType valType, boolean linked);
 
     /**
-     * Reads a field annotated with {@link Marshallable}.
-     *
-     * @param name Field name.
-     * @param <T>  Field's type.
-     * @return Marshallable object.
-     */
-    <T> T readMarshallable(String name);
-
-    /**
      * Tells whether the last invocation of any of the {@code readXXX(...)} methods has fully written the value. {@code False} is returned
      * if there were not enough remaining bytes in a byte buffer.
      *
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageWriter.java b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageWriter.java
index 7252ac3..a746eba 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageWriter.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/serialization/MessageWriter.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 
 /**
@@ -302,16 +301,6 @@ public interface MessageWriter {
             MessageCollectionItemType valType);
 
     /**
-     * Writes a field annotated with {@link Marshallable}.
-     *
-     * @param name   Field name.
-     * @param object Marshallable object.
-     * @param <T>    Object's type.
-     * @return Whether a value was fully written.
-     */
-    <T> boolean writeMarshallable(String name, T object);
-
-    /**
      * Returns {@code true} if the header of the current message has been written, {@code false} otherwise.
      *
      * @return Whether the message header has already been written.
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java
index 6f299a9..39de7c3 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java
@@ -386,18 +386,6 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override
-    public <T> T readMarshallable(String name) {
-        DirectByteBufferStream stream = state.item().stream;
-
-        T object = stream.readMarshallable(this);
-
-        lastRead = stream.lastFinished();
-
-        return object;
-    }
-
-    /** {@inheritDoc} */
-    @Override
     public boolean isLastRead() {
         return lastRead;
     }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java
index 4d77358..a4726ce 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java
@@ -387,16 +387,6 @@ public class DirectMessageWriter implements MessageWriter {
         state.reset();
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public <T> boolean writeMarshallable(String name, T object) {
-        DirectByteBufferStream stream = state.item().stream;
-
-        stream.writeMarshallable(object, this);
-
-        return stream.lastFinished();
-    }
-
     /**
      * State item.
      */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStream.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStream.java
index 321f047..401e18b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStream.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStream.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Marshallable;
 import org.apache.ignite.network.serialization.MessageReader;
 import org.apache.ignite.network.serialization.MessageWriter;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -434,22 +433,4 @@ public interface DirectByteBufferStream {
      */
     <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType,
             boolean linked, MessageReader reader);
-
-    /**
-     * Writes a field annotated with {@link Marshallable}.
-     *
-     * @param object Marshallable object.
-     * @param writer Writer.
-     * @param <T> Object's type.
-     */
-    <T> void writeMarshallable(T object, MessageWriter writer);
-
-    /**
-     * Reads a field annotated with {@link Marshallable}.
-     *
-     * @param reader Reader.
-     * @param <T> Field's type.
-     * @return Marshallable object.
-     */
-    <T> T readMarshallable(MessageReader reader);
 }
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
index 5561abb..1f478e1 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
@@ -47,7 +47,6 @@ import java.util.RandomAccess;
 import java.util.UUID;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
-import org.apache.ignite.internal.network.serialization.marshal.MarshalledObject;
 import org.apache.ignite.internal.util.ArrayFactory;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -143,10 +142,6 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
 
     private long uuidLocId;
 
-    private int marshallableState;
-
-    private byte[] marshallable;
-
     private List<ClassDescriptorMessage> descriptors;
 
     protected boolean lastFinished;
@@ -1361,100 +1356,6 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
         return map0;
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public <T> void writeMarshallable(T object, MessageWriter writer) {
-        switch (marshallableState) {
-            case 0:
-                writeBoolean(object == null);
-
-                if (!lastFinished || object == null) {
-                    return;
-                }
-
-                marshallableState++;
-
-                //noinspection fallthrough
-            case 1:
-                if (marshallable == null) {
-                    // If object was not serialized to a byte array, serialize it
-                    MarshalledObject res = serializationService.writeMarshallable(object);
-                    marshallable = res.bytes();
-                    // Get descriptors that were not previously sent to the remote node
-                    descriptors = serializationService.createClassDescriptorsMessages(res.usedDescriptorIds());
-                }
-
-                writeCollection(descriptors, MessageCollectionItemType.MSG, writer);
-
-                if (!lastFinished) {
-                    return;
-                }
-
-                marshallableState++;
-
-                //noinspection fallthrough
-            case 2:
-                writeByteArray(marshallable);
-
-                if (!lastFinished) {
-                    return;
-                }
-
-                marshallable = null;
-                descriptors = null;
-                marshallableState = 0;
-                break;
-
-            default:
-                throw new IllegalArgumentException("Unknown marshallableState: " + marshallableState);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public <T> T readMarshallable(MessageReader reader) {
-        switch (marshallableState) {
-            case 0:
-                boolean isNull = readBoolean();
-
-                if (!lastFinished || isNull) {
-                    return null;
-                }
-
-                marshallableState++;
-
-                //noinspection fallthrough
-            case 1:
-                descriptors = readCollection(MessageCollectionItemType.MSG, reader);
-
-                if (!lastFinished) {
-                    return null;
-                }
-
-                marshallableState++;
-
-                //noinspection fallthrough
-            case 2:
-                marshallable = readByteArray();
-
-                if (!lastFinished) {
-                    return null;
-                }
-
-                break;
-            default:
-                throw new IllegalArgumentException("Unknown marshallableState: " + marshallableState);
-        }
-
-        T read = serializationService.readMarshallable(descriptors, marshallable);
-
-        marshallableState = 0;
-        marshallable = null;
-        descriptors = null;
-
-        return read;
-    }
-
     /**
      * Writes array.
      *

[ignite-3] 11/12: .

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 38c44044e58f542adc9f6c32592efd3125836537
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Fri Mar 4 12:43:06 2022 +0300

    .
---
 .../ignite/internal/network/netty/OutboundEncoder.java | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index fa6f04b..210f048 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -24,9 +24,11 @@ import io.netty.handler.codec.MessageToMessageEncoder;
 import io.netty.handler.stream.ChunkedInput;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.ignite.internal.network.NetworkMessagesFactory;
 import org.apache.ignite.internal.network.direct.DirectMessageWriter;
 import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.NetworkObject;
@@ -92,9 +94,16 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkObject> {
             this.msg = object.getNetworkMessage();
 
             if (!object.getDescriptors().isEmpty()) {
-                descriptors = MSG_FACTORY.classDescriptorListMessage().messages(object.getDescriptors()).build();
-                short groupType = descriptors.groupType();
-                short messageType = descriptors.messageType();
+
+                List<ClassDescriptorMessage> descriptors = object.getDescriptors();
+
+                descriptors = descriptors.stream()
+                        .filter(classDescriptorMessage -> !serializationService.isSent(classDescriptorMessage.descriptorId()))
+                        .collect(Collectors.toList());
+
+                this.descriptors = MSG_FACTORY.classDescriptorListMessage().messages(descriptors).build();
+                short groupType = this.descriptors.groupType();
+                short messageType = this.descriptors.messageType();
                 descriptorSerializer = serializationService.createMessageSerializer(groupType, messageType);
             } else {
                 descriptors = null;
@@ -140,6 +149,9 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkObject> {
             if (!descriptorsFinished) {
                 descriptorsFinished = descriptorSerializer.writeMessage(descriptors, writer);
                 if (descriptorsFinished) {
+                    descriptors.messages().forEach(classDescriptorMessage -> {
+                        serializationService.addSent(classDescriptorMessage.descriptorId());
+                    });
                     writer.reset();
                 }
             } else {

[ignite-3] 04/12: .

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 69224dc9b60a7c7ae8037bfab54d6c86e56a1b26
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Mar 3 14:32:22 2022 +0300

    .
---
 .../ItTransferableObjectProcessorTest.java         | 261 ---------------------
 1 file changed, 261 deletions(-)

diff --git a/modules/network-annotation-processor/src/integrationTest/java/org/apache/ignite/internal/network/processor/ItTransferableObjectProcessorTest.java b/modules/network-annotation-processor/src/integrationTest/java/org/apache/ignite/internal/network/processor/ItTransferableObjectProcessorTest.java
deleted file mode 100644
index 789c3f0..0000000
--- a/modules/network-annotation-processor/src/integrationTest/java/org/apache/ignite/internal/network/processor/ItTransferableObjectProcessorTest.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.network.processor;
-
-import static com.google.testing.compile.CompilationSubject.assertThat;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import com.google.testing.compile.Compilation;
-import com.google.testing.compile.Compiler;
-import com.google.testing.compile.JavaFileObjects;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-import javax.tools.JavaFileObject;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.MessageGroup;
-import org.apache.ignite.network.annotations.Transferable;
-import org.junit.jupiter.api.Test;
-
-/**
- * Integration tests for the {@link TransferableObjectProcessor}.
- */
-public class ItTransferableObjectProcessorTest {
-    /**
-     * Package name of the test sources.
-     */
-    private static final String RESOURCE_PACKAGE_NAME = "org.apache.ignite.internal.network.processor";
-
-    /**
-     * Compiler instance configured with the annotation processor being tested.
-     */
-    private final Compiler compiler = Compiler.javac().withProcessors(new TransferableObjectProcessor());
-
-    /**
-     * Compiles the network message with all supported directly marshallable types and checks that the compilation completed successfully.
-     */
-    @Test
-    void testCompileAllTypesMessage() {
-        Compilation compilation = compile("AllTypesMessage");
-
-        assertThat(compilation).succeededWithoutWarnings();
-
-        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageBuilder"));
-        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageImpl"));
-        assertThat(compilation).generatedSourceFile(fileName("NetworkMessageProcessorTestFactory"));
-
-        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageSerializer"));
-        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageDeserializer"));
-        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageSerializationFactory"));
-        assertThat(compilation).generatedSourceFile(
-                fileName("NetworkMessageProcessorTestSerializationRegistryInitializer")
-        );
-    }
-
-    /**
-     * Compiles a network message that does not implement {@link NetworkMessage} directly but rather through a bunch of superinterfaces.
-     */
-    @Test
-    void testTransitiveMessage() {
-        Compilation compilation = compile("TransitiveMessage");
-
-        assertThat(compilation).succeededWithoutWarnings();
-
-        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageBuilder"));
-        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageImpl"));
-        assertThat(compilation).generatedSourceFile(fileName("NetworkMessageProcessorTestFactory"));
-
-        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageSerializer"));
-        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageDeserializer"));
-        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageSerializationFactory"));
-        assertThat(compilation).generatedSourceFile(
-                fileName("NetworkMessageProcessorTestSerializationRegistryInitializer")
-        );
-    }
-
-    /**
-     * Tests that compilation of multiple well-formed messages is successful.
-     */
-    @Test
-    void testCompileMultipleMessage() {
-        Compilation compilation = compiler.compile(
-                sources("AllTypesMessage", "TransitiveMessage", "ITTestMessageGroup")
-        );
-
-        assertThat(compilation).succeededWithoutWarnings();
-
-        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageBuilder"));
-        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageImpl"));
-
-        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageBuilder"));
-        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageImpl"));
-
-        assertThat(compilation).generatedSourceFile(fileName("NetworkMessageProcessorTestFactory"));
-
-        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageSerializer"));
-        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageDeserializer"));
-        assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageSerializationFactory"));
-
-        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageSerializer"));
-        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageDeserializer"));
-        assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageSerializationFactory"));
-
-        assertThat(compilation).generatedSourceFile(
-                fileName("NetworkMessageProcessorTestSerializationRegistryInitializer")
-        );
-    }
-
-    /**
-     * Compiles a test message that doesn't extend {@link NetworkMessage}.
-     */
-    @Test
-    void testInvalidAnnotatedTypeMessage() {
-        Compilation compilation = compile("InvalidAnnotatedTypeMessage");
-
-        assertThat(compilation).hadErrorContaining("annotation must only be present on interfaces that extend");
-    }
-
-    /**
-     * Compiles a test message that contains an unsupported content type.
-     */
-    @Test
-    void testUnmarshallableTypeMessage() {
-        Compilation compilation = compile("UnmarshallableTypeMessage");
-
-        assertThat(compilation).hadErrorContaining(
-                "Unsupported reference type for message (de-)serialization: java.util.ArrayList"
-        );
-    }
-
-    /**
-     * Compiles a test message that violates the message contract by declaring a getter with {@code void} return type.
-     */
-    @Test
-    void testInvalidReturnTypeGetterMessage() {
-        Compilation compilation = compile("InvalidReturnTypeGetterMessage");
-
-        assertThat(compilation).hadErrorContaining("Invalid getter method a()");
-    }
-
-    /**
-     * Compiles a test message that violates the message contract by declaring a getter with a parameter.
-     */
-    @Test
-    void testInvalidParameterGetterMessage() {
-        Compilation compilation = compile("InvalidParameterGetterMessage");
-
-        assertThat(compilation).hadErrorContaining("Invalid getter method a(int)");
-    }
-
-    /**
-     * Tests that compilation fails if no {@link MessageGroup} annotated elements were found.
-     */
-    @Test
-    void testMissingMessageGroup() {
-        Compilation compilation = compiler.compile(sources("AllTypesMessage"));
-
-        assertThat(compilation).hadErrorContaining(String.format(
-                "No message groups (classes annotated with @%s) found while processing messages from the following packages: [%s]",
-                MessageGroup.class.getSimpleName(),
-                RESOURCE_PACKAGE_NAME
-        ));
-    }
-
-    /**
-     * Tests that compilation fails if multiple {@link MessageGroup} annotated elements were found.
-     */
-    @Test
-    void testMultipleMessageGroups() {
-        Compilation compilation = compiler.compile(
-                sources("AllTypesMessage", "ConflictingTypeMessage", "ITTestMessageGroup", "SecondGroup")
-        );
-
-        assertThat(compilation).hadErrorContaining(String.format(
-                "Invalid number of message groups (classes annotated with @%s), "
-                        + "only one can be present in a compilation unit: [%s.ITTestMessageGroup, %s.SecondGroup]",
-                MessageGroup.class.getSimpleName(),
-                RESOURCE_PACKAGE_NAME,
-                RESOURCE_PACKAGE_NAME
-        ));
-    }
-
-    /**
-     * Tests that setting the {@link Transferable#autoSerializable()} to {@code false} does not produce any serialization-related classes
-     * and errors.
-     */
-    @Test
-    void testNonSerializableMessage() {
-        Compilation compilation = compile("UnmarshallableTypeNonSerializableMessage");
-
-        assertThat(compilation).succeededWithoutWarnings();
-
-        assertThat(compilation).generatedSourceFile(fileName("UnmarshallableTypeNonSerializableMessageBuilder"));
-        assertThat(compilation).generatedSourceFile(fileName("UnmarshallableTypeNonSerializableMessageImpl"));
-        assertThat(compilation).generatedSourceFile(fileName("NetworkMessageProcessorTestFactory"));
-
-        // test that no additional classes have been generated
-        assertThrows(
-                AssertionError.class,
-                () -> assertThat(compilation)
-                        .generatedSourceFile(fileName("UnmarshallableTypeNonSerializableMessageSerializer"))
-        );
-    }
-
-    /**
-     * Tests that messages with the same message type fail to compile.
-     */
-    @Test
-    void testConflictingMessageTypes() {
-        Compilation compilation = compiler.compile(
-                sources("AllTypesMessage", "ConflictingTypeMessage", "ITTestMessageGroup")
-        );
-
-        assertThat(compilation).hadErrorContaining("message with type 1 already exists");
-    }
-
-    /**
-     * Tests that if a message getter clashes with a getter in a superinterface, an appropriate error is displayed.
-     */
-    @Test
-    void testInheritedMessageClash() {
-        Compilation compilation = compile("InheritedMessageClash");
-
-        assertThat(compilation).hadErrorContaining("Getter with name 'x' is already defined");
-    }
-
-    /**
-     * Compiles the given network message.
-     */
-    private Compilation compile(String messageSource) {
-        return compiler.compile(sources(messageSource, "ITTestMessageGroup"));
-    }
-
-    /**
-     * Converts given test source class names to a list of {@link JavaFileObject}s.
-     */
-    private static List<JavaFileObject> sources(String... sources) {
-        return Arrays.stream(sources)
-                .map(source -> RESOURCE_PACKAGE_NAME.replace('.', '/') + '/' + source + ".java")
-                .map(JavaFileObjects::forResource)
-                .collect(Collectors.toList());
-    }
-
-    private static String fileName(String className) {
-        return RESOURCE_PACKAGE_NAME + '.' + className;
-    }
-}

[ignite-3] 05/12: .

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 5462d76ff5669b9e9c66bd0ece8d676c38fa7079
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Mar 3 15:55:43 2022 +0300

    .
---
 .../internal/network/processor/messages/MessageImplGenerator.java    | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index c76b227..701ba1e 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -218,9 +218,9 @@ public class MessageImplGenerator {
                 return Type.COLLECTION;
             } else if (typeUtils.isSameType(parameterType, Map.class)) {
                 return Type.MAP;
+            } else if (typeUtils.isSubType(parameterType, NetworkMessage.class)) {
+                return Type.MESSAGE;
             }
-        } else if (typeUtils.isSubType(parameterType, NetworkMessage.class)) {
-            return Type.MESSAGE;
         }
         return null;
     }
@@ -249,7 +249,6 @@ public class MessageImplGenerator {
                 String baName = objectName + "ByteArray";
                 String moName = baName + "mo";
                 beforeRead.addStatement("$T $N = marshaller.marshal($N)", marshalledObjectClass, moName, objectName);
-                beforeRead.addStatement("$N = null", objectName);
                 beforeRead.addStatement("set.addAll($N.usedDescriptorIds())", moName);
                 beforeRead.addStatement("$N = $N.bytes()", baName, moName).addCode("\n");
             } else {

[ignite-3] 10/12: .

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 925295ed144903878a9bdfd62e8545ce01e3e5d5
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Fri Mar 4 12:00:17 2022 +0300

    .
---
 .../main/java/org/apache/ignite/network/DefaultMessagingService.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index cdb5222..cc0aee7 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -349,7 +349,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
     }
 
     private final ExecutorService outSvc = Executors.newSingleThreadExecutor();
-    private final ExecutorService inSvc = Executors.newFixedThreadPool(5);
+    private final ExecutorService inSvc = Executors.newFixedThreadPool(16);
 
     /**
      * Handles an incoming messages.

[ignite-3] 02/12: .

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 2706e424938bb342d815d9dc2f732b768be36e09
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Mar 3 11:53:47 2022 +0300

    .
---
 .../processor/messages/MessageImplGenerator.java   | 76 +++++++++++++++-------
 .../serialization/BaseMethodNameResolver.java      |  6 +-
 2 files changed, 56 insertions(+), 26 deletions(-)

diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index 835894b..95f0ba2 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -28,7 +28,7 @@ import com.squareup.javapoet.TypeName;
 import com.squareup.javapoet.TypeSpec;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -46,9 +46,9 @@ import javax.tools.Diagnostic;
 import org.apache.ignite.internal.network.processor.MessageClass;
 import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
 import org.apache.ignite.internal.network.processor.TypeUtils;
-import org.apache.ignite.internal.network.processor.serialization.BaseMethodNameResolver;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.annotations.Marshallable;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Class for generating implementations of the {@link NetworkMessage} interfaces and their builders, generated by a {@link
@@ -63,8 +63,6 @@ public class MessageImplGenerator {
 
     private final TypeUtils typeUtils;
 
-    private final BaseMethodNameResolver methodNameResolver;
-
     /**
      * Constructor.
      *
@@ -75,7 +73,6 @@ public class MessageImplGenerator {
         this.processingEnv = processingEnv;
         this.messageGroup = messageGroup;
         this.typeUtils = new TypeUtils(processingEnv);
-        this.methodNameResolver = new BaseMethodNameResolver(processingEnv);
     }
 
     /**
@@ -203,9 +200,33 @@ public class MessageImplGenerator {
         return messageImpl.build();
     }
 
+    private enum Type {
+        OBJECT_ARRAY,
+        COLLECTION,
+        MESSAGE,
+        MAP;
+    }
+
+    @Nullable
+    private Type resolveType(TypeMirror parameterType) {
+        if (parameterType.getKind() == TypeKind.ARRAY) {
+            if (!((ArrayType) parameterType).getComponentType().getKind().isPrimitive()) {
+                return Type.OBJECT_ARRAY;
+            }
+        } else if (parameterType.getKind() == TypeKind.DECLARED) {
+            if (typeUtils.isSameType(parameterType, Collection.class)) {
+                return Type.COLLECTION;
+            } else if (typeUtils.isSameType(parameterType, Map.class)) {
+                return Type.MAP;
+            }
+        } else if (typeUtils.isSubType(parameterType, NetworkMessage.class)) {
+            return Type.MESSAGE;
+        }
+        return null;
+    }
+
     private void generateBeforeSend(TypeSpec.Builder messageImplBuild, MessageClass message) {
         ParameterizedTypeName returnType = ParameterizedTypeName.get(ClassName.get(Set.class), TypeName.INT.box());
-        ParameterizedTypeName returnTypeImpl = ParameterizedTypeName.get(ClassName.get(HashSet.class), TypeName.INT.box());
 
         Builder beforeRead = MethodSpec.methodBuilder("beforeRead")
                 .addAnnotation(Override.class)
@@ -230,12 +251,15 @@ public class MessageImplGenerator {
                 beforeRead.addStatement("$T $N = marshaller.marshal($N)", marshalledObjectClass, moName, objectName);
                 beforeRead.addStatement("set.addAll($N.usedDescriptorIds())", moName);
                 beforeRead.addStatement("$N = $N.bytes()", baName, moName).addCode("\n");
-            } else if (typeUtils.isSubType(type, NetworkMessage.class)) {
-                beforeRead.addStatement("if ($N != null) $N.beforeRead(set, marshallerObj)", objectName, objectName);
             } else {
-                String methodType = methodNameResolver.resolveBaseMethodName(type);
-                switch (methodType) {
-                    case "ObjectArray":
+                Type objectType = resolveType(type);
+
+                if (objectType == null) {
+                    return;
+                }
+
+                switch (objectType) {
+                    case OBJECT_ARRAY:
                         ArrayType arrayType = (ArrayType) type;
                         if (typeUtils.isSubType(arrayType.getComponentType(), NetworkMessage.class)) {
                             beforeRead.beginControlFlow("if ($N != null)", objectName);
@@ -245,7 +269,7 @@ public class MessageImplGenerator {
                             beforeRead.endControlFlow().addCode("\n");
                         }
                         break;
-                    case "Collection":
+                    case COLLECTION:
                         DeclaredType declaredType = (DeclaredType) type;
                         TypeMirror elementType = declaredType.getTypeArguments().get(0);
                         if (typeUtils.isSubType(elementType, NetworkMessage.class)) {
@@ -256,7 +280,10 @@ public class MessageImplGenerator {
                             beforeRead.endControlFlow().addCode("\n");
                         }
                         break;
-                    case "Map":
+                    case MESSAGE:
+                        beforeRead.addStatement("if ($N != null) $N.beforeRead(set, marshallerObj)", objectName, objectName);
+                        break;
+                    case MAP:
                         DeclaredType mapType = (DeclaredType) type;
                         TypeMirror keyType = mapType.getTypeArguments().get(0);
                         boolean keyIsMessage = typeUtils.isSubType(keyType, NetworkMessage.class);
@@ -295,9 +322,6 @@ public class MessageImplGenerator {
     }
 
     private void generateAfterReceive(TypeSpec.Builder messageImplBuild, MessageClass message) {
-        ParameterizedTypeName returnType = ParameterizedTypeName.get(ClassName.get(Set.class), TypeName.INT.box());
-        ParameterizedTypeName returnTypeImpl = ParameterizedTypeName.get(ClassName.get(HashSet.class), TypeName.INT.box());
-
         Builder afterRead = MethodSpec.methodBuilder("afterRead")
                 .addAnnotation(Override.class)
                 .addModifiers(Modifier.PUBLIC)
@@ -318,12 +342,15 @@ public class MessageImplGenerator {
             if (executableElement.getAnnotation(Marshallable.class) != null) {
                 String baName = objectName + "ByteArray";
                 afterRead.addStatement("$N = marshaller.unmarshal($N, descriptorRegistry)", objectName, baName);
-            } else if (typeUtils.isSubType(type, NetworkMessage.class)) {
-                afterRead.addStatement("if ($N != null) $N.afterRead(marshallerObj, descriptorsObj)", objectName, objectName);
             } else {
-                String methodType = methodNameResolver.resolveBaseMethodName(type);
-                switch (methodType) {
-                    case "ObjectArray":
+                Type objectType = resolveType(type);
+
+                if (objectType == null) {
+                    return;
+                }
+
+                switch (objectType) {
+                    case OBJECT_ARRAY:
                         ArrayType arrayType = (ArrayType) type;
                         if (typeUtils.isSubType(arrayType.getComponentType(), NetworkMessage.class)) {
                             afterRead.beginControlFlow("if ($N != null)", objectName);
@@ -333,7 +360,7 @@ public class MessageImplGenerator {
                             afterRead.endControlFlow().addCode("\n");
                         }
                         break;
-                    case "Collection":
+                    case COLLECTION:
                         DeclaredType declaredType = (DeclaredType) type;
                         TypeMirror elementType = declaredType.getTypeArguments().get(0);
                         if (typeUtils.isSubType(elementType, NetworkMessage.class)) {
@@ -344,7 +371,10 @@ public class MessageImplGenerator {
                             afterRead.endControlFlow().addCode("\n");
                         }
                         break;
-                    case "Map":
+                    case MESSAGE:
+                        afterRead.addStatement("if ($N != null) $N.afterRead(marshallerObj, descriptorsObj)", objectName, objectName);
+                        break;
+                    case MAP:
                         DeclaredType mapType = (DeclaredType) type;
                         TypeMirror keyType = mapType.getTypeArguments().get(0);
                         boolean keyIsMessage = typeUtils.isSubType(keyType, NetworkMessage.class);
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
index 3a87c4a..00783d0 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
@@ -38,7 +38,7 @@ import org.apache.ignite.network.NetworkMessage;
  * @see MessageReaderMethodResolver
  * @see MessageWriterMethodResolver
  */
-public class BaseMethodNameResolver {
+class BaseMethodNameResolver {
     /** Processing environment. */
     private final ProcessingEnvironment processingEnvironment;
 
@@ -47,7 +47,7 @@ public class BaseMethodNameResolver {
      *
      * @param processingEnvironment Processing environment.
      */
-    public BaseMethodNameResolver(ProcessingEnvironment processingEnvironment) {
+    BaseMethodNameResolver(ProcessingEnvironment processingEnvironment) {
         this.processingEnvironment = processingEnvironment;
     }
 
@@ -57,7 +57,7 @@ public class BaseMethodNameResolver {
      * @param parameterType parameter of the method to resolve
      * @return part of the method name, depending on the parameter type
      */
-    public String resolveBaseMethodName(TypeMirror parameterType) {
+    String resolveBaseMethodName(TypeMirror parameterType) {
         if (parameterType.getKind().isPrimitive()) {
             return resolvePrimitiveMethodName(parameterType);
         } else if (parameterType.getKind() == TypeKind.ARRAY) {

[ignite-3] 06/12: .

Posted by sd...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 1bdeb5e9fe1a81f9f04f08f0dc270b68a7e9d341
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Mar 3 16:09:55 2022 +0300

    .
---
 .../main/java/org/apache/ignite/network/DefaultMessagingService.java    | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 1f922ff..54d9de6 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -490,6 +490,8 @@ public class DefaultMessagingService extends AbstractMessagingService {
         requestsMap.values().forEach(fut -> fut.completeExceptionally(exception));
 
         requestsMap.clear();
+
+        svc.shutdown();
     }
 
     private UserObjectSerializationContext createUserObjectSerializationContext() {