You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/03/04 11:04:03 UTC
[ignite-3] 03/12: .
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 448bed2447e9c658ef509c34b58271703cd20105
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Thu Mar 3 14:27:14 2022 +0300
.
---
.../processor/messages/MessageImplGenerator.java | 2 +
modules/network/pom.xml | 12 +
.../ItTransferableObjectProcessorTest.java | 261 +++++++++++++++++++++
.../network/processor/AllTypesMessage.java | 0
.../network/processor/ConflictingTypeMessage.java | 0
.../network/processor/ITTestMessageGroup.java | 0
.../network/processor/InheritedMessageClash.java | 0
.../processor/InvalidAnnotatedTypeMessage.java | 0
.../processor/InvalidParameterGetterMessage.java | 0
.../processor/InvalidReturnTypeGetterMessage.java | 0
.../internal/network/processor/SecondGroup.java | 0
.../network/processor/TransitiveMessage.java | 0
.../processor/UnmarshallableTypeMessage.java | 0
.../UnmarshallableTypeNonSerializableMessage.java | 0
.../ignite/network/DefaultMessagingService.java | 23 +-
.../network/serialization/MarshallableTest.java | 51 +++-
16 files changed, 329 insertions(+), 20 deletions(-)
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index 95f0ba2..c76b227 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -249,6 +249,7 @@ public class MessageImplGenerator {
String baName = objectName + "ByteArray";
String moName = baName + "mo";
beforeRead.addStatement("$T $N = marshaller.marshal($N)", marshalledObjectClass, moName, objectName);
+ beforeRead.addStatement("$N = null", objectName);
beforeRead.addStatement("set.addAll($N.usedDescriptorIds())", moName);
beforeRead.addStatement("$N = $N.bytes()", baName, moName).addCode("\n");
} else {
@@ -342,6 +343,7 @@ public class MessageImplGenerator {
if (executableElement.getAnnotation(Marshallable.class) != null) {
String baName = objectName + "ByteArray";
afterRead.addStatement("$N = marshaller.unmarshal($N, descriptorRegistry)", objectName, baName);
+ afterRead.addStatement("$N = null", baName);
} else {
Type objectType = resolveType(type);
diff --git a/modules/network/pom.xml b/modules/network/pom.xml
index 20603b8..aa6c4b3 100644
--- a/modules/network/pom.xml
+++ b/modules/network/pom.xml
@@ -150,6 +150,18 @@
<artifactId>byte-buddy</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.google.testing.compile</groupId>
+ <artifactId>compile-testing</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-network-annotation-processor</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/processor/ItTransferableObjectProcessorTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/processor/ItTransferableObjectProcessorTest.java
new file mode 100644
index 0000000..9d9d479
--- /dev/null
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/processor/ItTransferableObjectProcessorTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.processor;
+
+import static com.google.testing.compile.CompilationSubject.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import com.google.testing.compile.Compilation;
+import com.google.testing.compile.Compiler;
+import com.google.testing.compile.JavaFileObjects;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.tools.JavaFileObject;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.MessageGroup;
+import org.apache.ignite.network.annotations.Transferable;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for the {@link TransferableObjectProcessor}.
+ */
+public class ItTransferableObjectProcessorTest {
+ /**
+ * Package name of the test sources.
+ */
+ private static final String RESOURCE_PACKAGE_NAME = "org.apache.ignite.internal.network.processor";
+
+ /**
+ * Compiler instance configured with the annotation processor being tested.
+ */
+ private final Compiler compiler = Compiler.javac().withProcessors(new TransferableObjectProcessor());
+
+ /**
+ * Compiles the network message with all supported directly marshallable types and checks that the compilation completed successfully.
+ */
+ @Test
+ void testCompileAllTypesMessage() {
+ Compilation compilation = compile("AllTypesMessage");
+
+ assertThat(compilation).succeededWithoutWarnings();
+
+ assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageBuilder"));
+ assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageImpl"));
+ assertThat(compilation).generatedSourceFile(fileName("NetworkMessageProcessorTestFactory"));
+
+ assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageSerializer"));
+ assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageDeserializer"));
+ assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageSerializationFactory"));
+ assertThat(compilation).generatedSourceFile(
+ fileName("NetworkMessageProcessorTestSerializationRegistryInitializer")
+ );
+ }
+
+ /**
+ * Compiles a network message that does not implement {@link NetworkMessage} directly but rather through a bunch of superinterfaces.
+ */
+ @Test
+ void testTransitiveMessage() {
+ Compilation compilation = compile("TransitiveMessage");
+
+ assertThat(compilation).succeededWithoutWarnings();
+
+ assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageBuilder"));
+ assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageImpl"));
+ assertThat(compilation).generatedSourceFile(fileName("NetworkMessageProcessorTestFactory"));
+
+ assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageSerializer"));
+ assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageDeserializer"));
+ assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageSerializationFactory"));
+ assertThat(compilation).generatedSourceFile(
+ fileName("NetworkMessageProcessorTestSerializationRegistryInitializer")
+ );
+ }
+
+ /**
+ * Tests that compilation of multiple well-formed messages is successful.
+ */
+ @Test
+ void testCompileMultipleMessage() {
+ Compilation compilation = compiler.compile(
+ sources("AllTypesMessage", "TransitiveMessage", "ITTestMessageGroup")
+ );
+
+ assertThat(compilation).succeededWithoutWarnings();
+
+ assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageBuilder"));
+ assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageImpl"));
+
+ assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageBuilder"));
+ assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageImpl"));
+
+ assertThat(compilation).generatedSourceFile(fileName("NetworkMessageProcessorTestFactory"));
+
+ assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageSerializer"));
+ assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageDeserializer"));
+ assertThat(compilation).generatedSourceFile(fileName("AllTypesMessageSerializationFactory"));
+
+ assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageSerializer"));
+ assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageDeserializer"));
+ assertThat(compilation).generatedSourceFile(fileName("TransitiveMessageSerializationFactory"));
+
+ assertThat(compilation).generatedSourceFile(
+ fileName("NetworkMessageProcessorTestSerializationRegistryInitializer")
+ );
+ }
+
+ /**
+ * Compiles a test message that doesn't extend {@link NetworkMessage}.
+ */
+ @Test
+ void testInvalidAnnotatedTypeMessage() {
+ Compilation compilation = compile("InvalidAnnotatedTypeMessage");
+
+ assertThat(compilation).hadErrorContaining("annotation must only be present on interfaces that extend");
+ }
+
+ /**
+ * Compiles a test message that contains an unsupported content type.
+ */
+ @Test
+ void testUnmarshallableTypeMessage() {
+ Compilation compilation = compile("UnmarshallableTypeMessage");
+
+ assertThat(compilation).hadErrorContaining(
+ "Unsupported reference type for message (de-)serialization: java.util.ArrayList"
+ );
+ }
+
+ /**
+ * Compiles a test message that violates the message contract by declaring a getter with {@code void} return type.
+ */
+ @Test
+ void testInvalidReturnTypeGetterMessage() {
+ Compilation compilation = compile("InvalidReturnTypeGetterMessage");
+
+ assertThat(compilation).hadErrorContaining("Invalid getter method a()");
+ }
+
+ /**
+ * Compiles a test message that violates the message contract by declaring a getter with a parameter.
+ */
+ @Test
+ void testInvalidParameterGetterMessage() {
+ Compilation compilation = compile("InvalidParameterGetterMessage");
+
+ assertThat(compilation).hadErrorContaining("Invalid getter method a(int)");
+ }
+
+ /**
+ * Tests that compilation fails if no {@link MessageGroup} annotated elements were found.
+ */
+ @Test
+ void testMissingMessageGroup() {
+ Compilation compilation = compiler.compile(sources("AllTypesMessage"));
+
+ assertThat(compilation).hadErrorContaining(String.format(
+ "No message groups (classes annotated with @%s) found while processing messages from the following packages: [%s]",
+ MessageGroup.class.getSimpleName(),
+ RESOURCE_PACKAGE_NAME
+ ));
+ }
+
+ /**
+ * Tests that compilation fails if multiple {@link MessageGroup} annotated elements were found.
+ */
+ @Test
+ void testMultipleMessageGroups() {
+ Compilation compilation = compiler.compile(
+ sources("AllTypesMessage", "ConflictingTypeMessage", "ITTestMessageGroup", "SecondGroup")
+ );
+
+ assertThat(compilation).hadErrorContaining(String.format(
+ "Invalid number of message groups (classes annotated with @%s), "
+ + "only one can be present in a compilation unit: [%s.ITTestMessageGroup, %s.SecondGroup]",
+ MessageGroup.class.getSimpleName(),
+ RESOURCE_PACKAGE_NAME,
+ RESOURCE_PACKAGE_NAME
+ ));
+ }
+
+ /**
+ * Tests that setting the {@link Transferable#autoSerializable()} to {@code false} does not produce any serialization-related classes
+ * and errors.
+ */
+ @Test
+ void testNonSerializableMessage() {
+ Compilation compilation = compile("UnmarshallableTypeNonSerializableMessage");
+
+ assertThat(compilation).succeededWithoutWarnings();
+
+ assertThat(compilation).generatedSourceFile(fileName("UnmarshallableTypeNonSerializableMessageBuilder"));
+ assertThat(compilation).generatedSourceFile(fileName("UnmarshallableTypeNonSerializableMessageImpl"));
+ assertThat(compilation).generatedSourceFile(fileName("NetworkMessageProcessorTestFactory"));
+
+ // test that no additional classes have been generated
+ assertThrows(
+ AssertionError.class,
+ () -> assertThat(compilation)
+ .generatedSourceFile(fileName("UnmarshallableTypeNonSerializableMessageSerializer"))
+ );
+ }
+
+ /**
+ * Tests that messages with the same message type fail to compile.
+ */
+ @Test
+ void testConflictingMessageTypes() {
+ Compilation compilation = compiler.compile(
+ sources("AllTypesMessage", "ConflictingTypeMessage", "ITTestMessageGroup")
+ );
+
+ assertThat(compilation).hadErrorContaining("message with type 1 already exists");
+ }
+
+ /**
+ * Tests that if a message getter clashes with a getter in a superinterface, an appropriate error is displayed.
+ */
+ @Test
+ void testInheritedMessageClash() {
+ Compilation compilation = compile("InheritedMessageClash");
+
+ assertThat(compilation).hadErrorContaining("Getter with name 'x' is already defined");
+ }
+
+ /**
+ * Compiles the given network message.
+ */
+ private Compilation compile(String messageSource) {
+ return compiler.compile(sources(messageSource, "ITTestMessageGroup"));
+ }
+
+ /**
+ * Converts given test source class names to a list of {@link JavaFileObject}s.
+ */
+ private static List<JavaFileObject> sources(String... sources) {
+ return Arrays.stream(sources)
+ .map(source -> RESOURCE_PACKAGE_NAME.replace('.', '/') + '/' + source + ".java")
+ .map(JavaFileObjects::forResource)
+ .collect(Collectors.toList());
+ }
+
+ private static String fileName(String className) {
+ return RESOURCE_PACKAGE_NAME + '.' + className;
+ }
+}
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/AllTypesMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/AllTypesMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/AllTypesMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/AllTypesMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ConflictingTypeMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ConflictingTypeMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ConflictingTypeMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ConflictingTypeMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ITTestMessageGroup.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ITTestMessageGroup.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ITTestMessageGroup.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/ITTestMessageGroup.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InheritedMessageClash.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InheritedMessageClash.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InheritedMessageClash.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InheritedMessageClash.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidAnnotatedTypeMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidAnnotatedTypeMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidAnnotatedTypeMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidAnnotatedTypeMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidParameterGetterMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidParameterGetterMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidParameterGetterMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidParameterGetterMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidReturnTypeGetterMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidReturnTypeGetterMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidReturnTypeGetterMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/InvalidReturnTypeGetterMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/SecondGroup.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/SecondGroup.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/SecondGroup.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/SecondGroup.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/TransitiveMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/TransitiveMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/TransitiveMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/TransitiveMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeMessage.java
diff --git a/modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeNonSerializableMessage.java b/modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeNonSerializableMessage.java
similarity index 100%
rename from modules/network-annotation-processor/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeNonSerializableMessage.java
rename to modules/network/src/integrationTest/resources/org/apache/ignite/internal/network/processor/UnmarshallableTypeNonSerializableMessage.java
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 2ee7961..1f922ff 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
@@ -190,7 +191,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
private CompletableFuture<Void> sendMessage0(NetworkMessage message, String recipientConsistentId, InetSocketAddress addr) {
if (isInNetworkThread()) {
- return CompletableFuture.supplyAsync(() -> sendMessage0(message, recipientConsistentId, addr), svc).thenCompose(fut -> fut);
+ return CompletableFuture.supplyAsync(() -> sendMessage0(message, recipientConsistentId, addr), svc).thenCompose(Function.identity());
}
List<ClassDescriptorMessage> descriptors;
@@ -244,10 +245,14 @@ public class DefaultMessagingService extends AbstractMessagingService {
private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws Exception {
Set<Integer> ids = msg.beforeRead(new HashSet<>(), marshaller);
- return createClassDescriptorsMessages(ids);
+ return createClassDescriptorsMessages(factory, classDescriptorRegistry, ids);
}
- public List<ClassDescriptorMessage> createClassDescriptorsMessages(Set<Integer> descriptorIds) {
+ public static List<ClassDescriptorMessage> createClassDescriptorsMessages(
+ NetworkMessagesFactory factory,
+ ClassDescriptorRegistry classDescriptorRegistry,
+ Set<Integer> descriptorIds
+ ) {
List<ClassDescriptorMessage> messages = descriptorIds.stream()
.map(classDescriptorRegistry::getDescriptor)
.map(descriptor -> {
@@ -281,14 +286,14 @@ public class DefaultMessagingService extends AbstractMessagingService {
return messages;
}
- private byte fieldFlags(FieldDescriptor fieldDescriptor) {
+ private static byte fieldFlags(FieldDescriptor fieldDescriptor) {
int bits = condMask(fieldDescriptor.isUnshared(), FieldDescriptorMessage.UNSHARED_MASK)
| condMask(fieldDescriptor.isPrimitive(), FieldDescriptorMessage.IS_PRIMITIVE)
| condMask(fieldDescriptor.isRuntimeTypeKnownUpfront(), FieldDescriptorMessage.IS_RUNTIME_TYPE_KNOWN_UPFRONT);
return (byte) bits;
}
- private byte serializationAttributeFlags(Serialization serialization) {
+ private static byte serializationAttributeFlags(Serialization serialization) {
int bits = condMask(serialization.hasWriteObject(), ClassDescriptorMessage.HAS_WRITE_OBJECT_MASK)
| condMask(serialization.hasReadObject(), ClassDescriptorMessage.HAS_READ_OBJECT_MASK)
| condMask(serialization.hasReadObjectNoData(), ClassDescriptorMessage.HAS_READ_OBJECT_NO_DATA_MASK)
@@ -297,11 +302,11 @@ public class DefaultMessagingService extends AbstractMessagingService {
return (byte) bits;
}
- private int condMask(boolean value, int mask) {
+ private static int condMask(boolean value, int mask) {
return value ? mask : 0;
}
- private byte classDescriptorAttributeFlags(ClassDescriptor descriptor) {
+ private static byte classDescriptorAttributeFlags(ClassDescriptor descriptor) {
int bits = condMask(descriptor.isPrimitive(), ClassDescriptorMessage.IS_PRIMITIVE_MASK)
| condMask(descriptor.isArray(), ClassDescriptorMessage.IS_ARRAY_MASK)
| condMask(descriptor.isRuntimeEnum(), ClassDescriptorMessage.IS_RUNTIME_ENUM_MASK)
@@ -309,7 +314,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
return (byte) bits;
}
- private int superClassDescriptorIdForMessage(ClassDescriptor descriptor) {
+ private static int superClassDescriptorIdForMessage(ClassDescriptor descriptor) {
Integer id = descriptor.superClassDescriptorId();
if (id == null) {
@@ -319,7 +324,7 @@ public class DefaultMessagingService extends AbstractMessagingService {
return id;
}
- private int componentTypeDescriptorIdForMessage(ClassDescriptor descriptor) {
+ private static int componentTypeDescriptorIdForMessage(ClassDescriptor descriptor) {
Integer id = descriptor.componentTypeDescriptorId();
if (id == null) {
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
index 30c64ab..de4fa63 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
import static org.hamcrest.collection.IsEmptyCollection.empty;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -30,6 +29,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
import it.unimi.dsi.fastutil.ints.IntSets;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -37,14 +38,21 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.netty.InboundDecoder;
+import org.apache.ignite.internal.network.netty.OutboundEncoder;
import org.apache.ignite.internal.network.serialization.marshal.MarshalException;
import org.apache.ignite.internal.network.serialization.marshal.MarshalledObject;
import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
+import org.apache.ignite.network.DefaultMessagingService;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.TestMessagesFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
@@ -68,7 +76,7 @@ public class MarshallableTest {
* Tests that marshallable object can be serialized along with its descriptor.
*/
@Test
- public void testMarshallable() {
+ public void testMarshallable() throws Exception {
// Test map that will be sent as a Marshallable object within the MessageWithMarshallable message
Map<String, SimpleSerializableObject> testMap = Map.of("test", new SimpleSerializableObject(10));
@@ -80,29 +88,36 @@ public class MarshallableTest {
}
/** Writes a map to a buffer through the {@link MessageWithMarshallable}. */
- private ByteBuffer write(Map<String, SimpleSerializableObject> testMap) {
+ private ByteBuffer write(Map<String, SimpleSerializableObject> testMap) throws Exception {
var serializers = new Serialization();
var writer = new DirectMessageWriter(serializers.perSessionSerializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
MessageWithMarshallable msg = msgFactory.messageWithMarshallable().marshallableMap(testMap).build();
+ var ids = new HashSet<Integer>();
+
+ msg.beforeRead(ids, serializers.userObjectSerializer);
+
MessageSerializer<NetworkMessage> serializer = registry.createSerializer(msg.groupType(), msg.messageType());
- ByteBuffer nioBuffer = ByteBuffer.allocate(1000);
+ var catcher = new OutboundByteBufCatcher();
+ var channel = new EmbeddedChannel(catcher, new ChunkedWriteHandler(), new OutboundEncoder(serializers.perSessionSerializationService));
- writer.setBuffer(nioBuffer);
+ List<ClassDescriptorMessage> classDescriptorsMessages = DefaultMessagingService.createClassDescriptorsMessages(
+ new NetworkMessagesFactory(), serializers.descriptorRegistry, ids);
- // Write a message to the ByteBuffer.
- boolean fullyWritten = serializer.writeMessage(msg, writer);
+ channel.writeAndFlush(new NetworkObject(msg, classDescriptorsMessages));
- assertTrue(fullyWritten);
+ channel.flushOutbound();
+
+ ByteBuffer nioBuffer = catcher.buf;
return nioBuffer;
}
/** Reads a {@link MessageWithMarshallable} from the buffer (byte by byte) and checks for the class descriptor merging. */
- private Map<String, SimpleSerializableObject> read(ByteBuffer outBuffer) {
+ private Map<String, SimpleSerializableObject> read(ByteBuffer outBuffer) throws Exception {
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
var channel = new EmbeddedChannel();
@@ -150,6 +165,8 @@ public class MarshallableTest {
MessageWithMarshallable received = (MessageWithMarshallable) list.get(0);
+ received.afterRead(serializers.userObjectSerializer, serializers.descriptorRegistry);
+
return received.marshallableMap();
}
@@ -158,15 +175,17 @@ public class MarshallableTest {
private final PerSessionSerializationService perSessionSerializationService;
private final ClassDescriptor descriptor;
+ private final StubMarshaller userObjectSerializer;
+ private final ClassDescriptorRegistry descriptorRegistry;
Serialization() {
- var descriptorRegistry = new ClassDescriptorRegistry();
+ this.descriptorRegistry = new ClassDescriptorRegistry();
var factory = new ClassDescriptorFactory(descriptorRegistry);
// Create descriptor for SimpleSerializableObject
this.descriptor = factory.create(SimpleSerializableObject.class);
- var userObjectSerializer = new StubMarshaller(descriptor);
+ this.userObjectSerializer = new StubMarshaller(descriptor);
var ser = new UserObjectSerializationContext(descriptorRegistry, factory, userObjectSerializer);
@@ -175,6 +194,16 @@ public class MarshallableTest {
}
}
+ private static class OutboundByteBufCatcher extends MessageToMessageEncoder<ByteBuf> {
+ private ByteBuffer buf = ByteBuffer.allocateDirect(1000);
+ @Override
+ protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
+ ByteBuffer nioBuffer = byteBuf.nioBuffer();
+ buf.put(nioBuffer);
+ list.add(1);
+ }
+ }
+
/**
* Stub implementation of the {@link UserObjectMarshaller}, which uses the JDK's serializable
* serialization to actually marshall an object.