You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/03/03 15:46:23 UTC
[ignite-3] 01/07: .
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch ignite-16393
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 4869c137990aa0af767b8c9d87806d266494a986
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Wed Mar 2 18:09:25 2022 +0300
.
---
.../messages/MessageBuilderGenerator.java | 73 +++---
.../processor/messages/MessageImplGenerator.java | 259 ++++++++++++++++++++-
.../serialization/BaseMethodNameResolver.java | 6 +-
.../MessageDeserializerGenerator.java | 9 +-
.../serialization/MessageReaderMethodResolver.java | 3 +-
.../serialization/MessageSerializerGenerator.java | 4 +-
.../serialization/MessageWriterMethodResolver.java | 7 +-
.../org/apache/ignite/network/NetworkMessage.java | 11 +
.../network/netty/ItConnectionManagerTest.java | 20 +-
.../network/recovery/ItRecoveryHandshakeTest.java | 6 +-
.../scalecube/ItScaleCubeNetworkMessagingTest.java | 10 +-
.../internal/network/NetworkMessageTypes.java | 6 +
.../message/ClassDescriptorListMessage.java | 30 +++
.../internal/network/netty/ConnectionManager.java | 12 +-
.../{MessageHandler.java => InNetworkObject.java} | 41 ++--
.../internal/network/netty/InboundDecoder.java | 13 +-
.../internal/network/netty/MessageHandler.java | 16 +-
.../ignite/internal/network/netty/NettyClient.java | 11 +-
.../ignite/internal/network/netty/NettySender.java | 6 +-
.../ignite/internal/network/netty/NettyServer.java | 10 +-
.../internal/network/netty/OutboundEncoder.java | 41 +++-
.../recovery/RecoveryClientHandshakeManager.java | 4 +-
.../recovery/RecoveryServerHandshakeManager.java | 4 +-
.../PerSessionSerializationService.java | 16 +-
.../ignite/network/DefaultMessagingService.java | 165 ++++++++++++-
.../ignite/network/NettyBootstrapFactory.java | 16 ++
.../org/apache/ignite/network/NetworkObject.java | 40 ++++
.../scalecube/ScaleCubeClusterServiceFactory.java | 6 +-
.../ScaleCubeDirectMarshallerTransport.java | 26 ++-
.../internal/network/netty/NettyClientTest.java | 4 +-
.../internal/network/netty/NettyServerTest.java | 2 +-
modules/raft/pom.xml | 1 -
modules/transactions/pom.xml | 5 +
33 files changed, 747 insertions(+), 136 deletions(-)
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java
index 650296e..7f23723 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageBuilderGenerator.java
@@ -17,17 +17,20 @@
package org.apache.ignite.internal.network.processor.messages;
+import com.squareup.javapoet.ArrayTypeName;
import com.squareup.javapoet.ClassName;
import com.squareup.javapoet.MethodSpec;
import com.squareup.javapoet.TypeName;
import com.squareup.javapoet.TypeSpec;
-import java.util.List;
-import java.util.stream.Collectors;
+import java.util.ArrayList;
import javax.annotation.processing.ProcessingEnvironment;
import javax.lang.model.element.Modifier;
+import javax.lang.model.type.TypeMirror;
import javax.tools.Diagnostic;
import org.apache.ignite.internal.network.processor.MessageClass;
import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
+import org.apache.ignite.internal.network.processor.TypeUtils;
+import org.apache.ignite.network.annotations.Marshallable;
/**
* Class for generating Builder interfaces for Network Messages.
@@ -39,6 +42,8 @@ public class MessageBuilderGenerator {
/** Message group. */
private final MessageGroupWrapper messageGroup;
+ private final TypeUtils typeUtils;
+
/**
* Constructor.
*
@@ -48,6 +53,7 @@ public class MessageBuilderGenerator {
public MessageBuilderGenerator(ProcessingEnvironment processingEnvironment, MessageGroupWrapper messageGroup) {
this.processingEnvironment = processingEnvironment;
this.messageGroup = messageGroup;
+ this.typeUtils = new TypeUtils(processingEnvironment);
}
/**
@@ -62,30 +68,45 @@ public class MessageBuilderGenerator {
processingEnvironment.getMessager()
.printMessage(Diagnostic.Kind.NOTE, "Generating " + builderName, message.element());
- // generate a setter for each getter in the original interface
- List<MethodSpec> setters = message.getters().stream()
- .map(getter -> {
- String getterName = getter.getSimpleName().toString();
-
- return MethodSpec.methodBuilder(getterName)
- .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
- .addParameter(TypeName.get(getter.getReturnType()), getterName)
- .returns(builderName)
- .build();
- })
- .collect(Collectors.toList());
-
- // generate a getter for each getter in the original interface
- List<MethodSpec> getters = message.getters().stream()
- .map(getter -> {
- String getterName = getter.getSimpleName().toString();
-
- return MethodSpec.methodBuilder(getterName)
- .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
- .returns(TypeName.get(getter.getReturnType()))
- .build();
- })
- .collect(Collectors.toList());
+ var setters = new ArrayList<MethodSpec>(message.getters().size());
+ var getters = new ArrayList<MethodSpec>(message.getters().size());
+ message.getters().forEach(getter -> {
+ String getterName = getter.getSimpleName().toString();
+
+ TypeMirror type = getter.getReturnType();
+
+ // generate a setter for each getter in the original interface
+ MethodSpec setterSpec = MethodSpec.methodBuilder(getterName)
+ .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+ .addParameter(TypeName.get(type), getterName)
+ .returns(builderName)
+ .build();
+
+ // generate a getter for each getter in the original interface
+ MethodSpec getterSpec = MethodSpec.methodBuilder(getterName)
+ .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+ .returns(TypeName.get(type))
+ .build();
+
+ setters.add(setterSpec);
+ getters.add(getterSpec);
+
+ if (getter.getAnnotation(Marshallable.class) != null) {
+ MethodSpec baSetter = MethodSpec.methodBuilder(getterName + "ByteArray")
+ .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+ .addParameter(ArrayTypeName.of(TypeName.BYTE), getterName + "ByteArray")
+ .returns(builderName)
+ .build();
+
+ MethodSpec baGetter = MethodSpec.methodBuilder(getterName + "ByteArray")
+ .addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
+ .returns(ArrayTypeName.of(TypeName.BYTE))
+ .build();
+
+ setters.add(baSetter);
+ getters.add(baGetter);
+ }
+ });
MethodSpec buildMethod = MethodSpec.methodBuilder("build")
.addModifiers(Modifier.PUBLIC, Modifier.ABSTRACT)
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
index 9694785..835894b 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/messages/MessageImplGenerator.java
@@ -17,26 +17,38 @@
package org.apache.ignite.internal.network.processor.messages;
+import com.squareup.javapoet.ArrayTypeName;
import com.squareup.javapoet.ClassName;
import com.squareup.javapoet.CodeBlock;
import com.squareup.javapoet.FieldSpec;
import com.squareup.javapoet.MethodSpec;
+import com.squareup.javapoet.MethodSpec.Builder;
+import com.squareup.javapoet.ParameterizedTypeName;
import com.squareup.javapoet.TypeName;
import com.squareup.javapoet.TypeSpec;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.processing.ProcessingEnvironment;
import javax.lang.model.element.ExecutableElement;
import javax.lang.model.element.Modifier;
+import javax.lang.model.type.ArrayType;
+import javax.lang.model.type.DeclaredType;
import javax.lang.model.type.TypeKind;
+import javax.lang.model.type.TypeMirror;
import javax.tools.Diagnostic;
import org.apache.ignite.internal.network.processor.MessageClass;
import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
+import org.apache.ignite.internal.network.processor.TypeUtils;
+import org.apache.ignite.internal.network.processor.serialization.BaseMethodNameResolver;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
/**
* Class for generating implementations of the {@link NetworkMessage} interfaces and their builders, generated by a {@link
@@ -49,6 +61,10 @@ public class MessageImplGenerator {
/** Message group. */
private final MessageGroupWrapper messageGroup;
+ private final TypeUtils typeUtils;
+
+ private final BaseMethodNameResolver methodNameResolver;
+
/**
* Constructor.
*
@@ -58,6 +74,8 @@ public class MessageImplGenerator {
public MessageImplGenerator(ProcessingEnvironment processingEnv, MessageGroupWrapper messageGroup) {
this.processingEnv = processingEnv;
this.messageGroup = messageGroup;
+ this.typeUtils = new TypeUtils(processingEnv);
+ this.methodNameResolver = new BaseMethodNameResolver(processingEnv);
}
/**
@@ -76,6 +94,7 @@ public class MessageImplGenerator {
List<ExecutableElement> getters = message.getters();
var fields = new ArrayList<FieldSpec>(getters.size());
+ var allFields = new ArrayList<FieldSpec>(getters.size());
var getterImpls = new ArrayList<MethodSpec>(getters.size());
// create a field and a getter implementation for every getter in the message interface
@@ -85,24 +104,42 @@ public class MessageImplGenerator {
String getterName = getter.getSimpleName().toString();
FieldSpec field = FieldSpec.builder(getterReturnType, getterName)
- .addModifiers(Modifier.PRIVATE, Modifier.FINAL)
+ .addModifiers(Modifier.PRIVATE)
.build();
fields.add(field);
+ allFields.add(field);
MethodSpec getterImpl = MethodSpec.overriding(getter)
.addStatement("return $N", field)
.build();
getterImpls.add(getterImpl);
+
+ if (getter.getAnnotation(Marshallable.class) != null) {
+ ArrayTypeName arrayTypeName = ArrayTypeName.of(TypeName.BYTE);
+ String name = getterName + "ByteArray";
+ FieldSpec marshallableFieldArray = FieldSpec.builder(arrayTypeName, name)
+ .addModifiers(Modifier.PRIVATE)
+ .build();
+
+ allFields.add(marshallableFieldArray);
+
+ MethodSpec baGetterImpl = MethodSpec.methodBuilder(name)
+ .returns(arrayTypeName)
+ .addStatement("return $N", marshallableFieldArray)
+ .build();
+
+ getterImpls.add(baGetterImpl);
+ }
}
TypeSpec.Builder messageImpl = TypeSpec.classBuilder(messageImplClassName)
.addModifiers(Modifier.PUBLIC)
.addSuperinterface(message.className())
- .addFields(fields)
+ .addFields(allFields)
.addMethods(getterImpls)
- .addMethod(constructor(fields));
+ .addMethod(constructor(allFields));
// group type constant and getter
FieldSpec groupTypeField = FieldSpec.builder(short.class, "GROUP_TYPE")
@@ -156,6 +193,9 @@ public class MessageImplGenerator {
messageImpl.addMethod(builderMethod);
+ generateBeforeSend(messageImpl, message);
+ generateAfterReceive(messageImpl, message);
+
messageImpl
.addOriginatingElement(message.element())
.addOriginatingElement(messageGroup.element());
@@ -163,6 +203,183 @@ public class MessageImplGenerator {
return messageImpl.build();
}
+ private void generateBeforeSend(TypeSpec.Builder messageImplBuild, MessageClass message) {
+ ParameterizedTypeName returnType = ParameterizedTypeName.get(ClassName.get(Set.class), TypeName.INT.box());
+ ParameterizedTypeName returnTypeImpl = ParameterizedTypeName.get(ClassName.get(HashSet.class), TypeName.INT.box());
+
+ Builder beforeRead = MethodSpec.methodBuilder("beforeRead")
+ .addAnnotation(Override.class)
+ .addModifiers(Modifier.PUBLIC)
+ .addException(Exception.class)
+ .addParameter(returnType, "set")
+ .addParameter(Object.class, "marshallerObj")
+ .returns(returnType);
+
+ ClassName marshallerClass = ClassName.get("org.apache.ignite.internal.network.serialization.marshal", "UserObjectMarshaller");
+ ClassName marshalledObjectClass = ClassName.get("org.apache.ignite.internal.network.serialization.marshal", "MarshalledObject");
+
+ beforeRead.addStatement("$T marshaller = ($T) marshallerObj", marshallerClass, marshallerClass);
+
+ message.getters().forEach(executableElement -> {
+ TypeMirror type = executableElement.getReturnType();
+ String objectName = executableElement.getSimpleName().toString();
+
+ if (executableElement.getAnnotation(Marshallable.class) != null) {
+ String baName = objectName + "ByteArray";
+ String moName = baName + "mo";
+ beforeRead.addStatement("$T $N = marshaller.marshal($N)", marshalledObjectClass, moName, objectName);
+ beforeRead.addStatement("set.addAll($N.usedDescriptorIds())", moName);
+ beforeRead.addStatement("$N = $N.bytes()", baName, moName).addCode("\n");
+ } else if (typeUtils.isSubType(type, NetworkMessage.class)) {
+ beforeRead.addStatement("if ($N != null) $N.beforeRead(set, marshallerObj)", objectName, objectName);
+ } else {
+ String methodType = methodNameResolver.resolveBaseMethodName(type);
+ switch (methodType) {
+ case "ObjectArray":
+ ArrayType arrayType = (ArrayType) type;
+ if (typeUtils.isSubType(arrayType.getComponentType(), NetworkMessage.class)) {
+ beforeRead.beginControlFlow("if ($N != null)", objectName);
+ beforeRead.beginControlFlow("for (int i = 0; i < $N.length; i++)", objectName);
+ beforeRead.addStatement("if ($N[i] != null) $N[i].beforeRead(set, marshallerObj)", objectName, objectName);
+ beforeRead.endControlFlow();
+ beforeRead.endControlFlow().addCode("\n");
+ }
+ break;
+ case "Collection":
+ DeclaredType declaredType = (DeclaredType) type;
+ TypeMirror elementType = declaredType.getTypeArguments().get(0);
+ if (typeUtils.isSubType(elementType, NetworkMessage.class)) {
+ beforeRead.beginControlFlow("if ($N != null)", objectName);
+ beforeRead.beginControlFlow("for ($T obj : $N)", elementType, objectName);
+ beforeRead.addStatement("if (obj != null) obj.beforeRead(set, marshallerObj)", objectName);
+ beforeRead.endControlFlow();
+ beforeRead.endControlFlow().addCode("\n");
+ }
+ break;
+ case "Map":
+ DeclaredType mapType = (DeclaredType) type;
+ TypeMirror keyType = mapType.getTypeArguments().get(0);
+ boolean keyIsMessage = typeUtils.isSubType(keyType, NetworkMessage.class);
+ TypeMirror valueType = mapType.getTypeArguments().get(1);
+ boolean valueIsMessage = typeUtils.isSubType(valueType, NetworkMessage.class);
+
+ if (keyIsMessage || valueIsMessage) {
+ ParameterizedTypeName entryType = ParameterizedTypeName.get(ClassName.get(Map.Entry.class), TypeName.get(keyType), TypeName.get(valueType));
+ ParameterizedTypeName entrySetType = ParameterizedTypeName.get(ClassName.get(Set.class), entryType);
+ String entrySetName = objectName + "EntrySet";
+
+ beforeRead.beginControlFlow("if ($N != null)", objectName);
+ beforeRead.addStatement("$T $N = $N.entrySet()", entrySetType, entrySetName, objectName);
+ beforeRead.beginControlFlow("for ($T entry : $N)", entryType, entrySetName);
+ beforeRead.addStatement("$T key = entry.getKey()", keyType);
+ beforeRead.addStatement("$T value = entry.getValue()", valueType);
+ if (keyIsMessage) {
+ beforeRead.addStatement("if (key != null) key.beforeRead(set, marshallerObj)");
+ }
+ if (valueIsMessage) {
+ beforeRead.addStatement("if (value != null) value.beforeRead(set, marshallerObj)");
+ }
+ beforeRead.endControlFlow().addCode("\n");
+ beforeRead.endControlFlow();
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ });
+
+ beforeRead.addStatement("return set");
+
+ messageImplBuild.addMethod(beforeRead.build());
+ }
+
+ private void generateAfterReceive(TypeSpec.Builder messageImplBuild, MessageClass message) {
+ ParameterizedTypeName returnType = ParameterizedTypeName.get(ClassName.get(Set.class), TypeName.INT.box());
+ ParameterizedTypeName returnTypeImpl = ParameterizedTypeName.get(ClassName.get(HashSet.class), TypeName.INT.box());
+
+ Builder afterRead = MethodSpec.methodBuilder("afterRead")
+ .addAnnotation(Override.class)
+ .addModifiers(Modifier.PUBLIC)
+ .addException(Exception.class)
+ .addParameter(Object.class, "marshallerObj")
+ .addParameter(Object.class, "descriptorsObj");
+
+ ClassName marshallerClass = ClassName.get("org.apache.ignite.internal.network.serialization.marshal", "UserObjectMarshaller");
+ ClassName descriptorRegistryClass = ClassName.get("org.apache.ignite.internal.network.serialization", "DescriptorRegistry");
+
+ afterRead.addStatement("$T marshaller = ($T) marshallerObj", marshallerClass, marshallerClass);
+ afterRead.addStatement("$T descriptorRegistry = ($T) descriptorsObj", descriptorRegistryClass, descriptorRegistryClass);
+
+ message.getters().forEach(executableElement -> {
+ TypeMirror type = executableElement.getReturnType();
+ String objectName = executableElement.getSimpleName().toString();
+
+ if (executableElement.getAnnotation(Marshallable.class) != null) {
+ String baName = objectName + "ByteArray";
+ afterRead.addStatement("$N = marshaller.unmarshal($N, descriptorRegistry)", objectName, baName);
+ } else if (typeUtils.isSubType(type, NetworkMessage.class)) {
+ afterRead.addStatement("if ($N != null) $N.afterRead(marshallerObj, descriptorsObj)", objectName, objectName);
+ } else {
+ String methodType = methodNameResolver.resolveBaseMethodName(type);
+ switch (methodType) {
+ case "ObjectArray":
+ ArrayType arrayType = (ArrayType) type;
+ if (typeUtils.isSubType(arrayType.getComponentType(), NetworkMessage.class)) {
+ afterRead.beginControlFlow("if ($N != null)", objectName);
+ afterRead.beginControlFlow("for (int i = 0; i < $N.length; i++)", objectName);
+ afterRead.addStatement("if ($N[i] != null) $N[i].afterRead(marshallerObj, descriptorsObj)", objectName, objectName);
+ afterRead.endControlFlow();
+ afterRead.endControlFlow().addCode("\n");
+ }
+ break;
+ case "Collection":
+ DeclaredType declaredType = (DeclaredType) type;
+ TypeMirror elementType = declaredType.getTypeArguments().get(0);
+ if (typeUtils.isSubType(elementType, NetworkMessage.class)) {
+ afterRead.beginControlFlow("if ($N != null)", objectName);
+ afterRead.beginControlFlow("for ($T obj : $N)", elementType, objectName);
+ afterRead.addStatement("if (obj != null) obj.afterRead(marshallerObj, descriptorsObj)", objectName);
+ afterRead.endControlFlow();
+ afterRead.endControlFlow().addCode("\n");
+ }
+ break;
+ case "Map":
+ DeclaredType mapType = (DeclaredType) type;
+ TypeMirror keyType = mapType.getTypeArguments().get(0);
+ boolean keyIsMessage = typeUtils.isSubType(keyType, NetworkMessage.class);
+ TypeMirror valueType = mapType.getTypeArguments().get(1);
+ boolean valueIsMessage = typeUtils.isSubType(valueType, NetworkMessage.class);
+
+ if (keyIsMessage || valueIsMessage) {
+ ParameterizedTypeName entryType = ParameterizedTypeName.get(ClassName.get(Map.Entry.class), TypeName.get(keyType), TypeName.get(valueType));
+ ParameterizedTypeName entrySetType = ParameterizedTypeName.get(ClassName.get(Set.class), entryType);
+ String entrySetName = objectName + "EntrySet";
+
+ afterRead.beginControlFlow("if ($N != null)", objectName);
+ afterRead.addStatement("$T $N = $N.entrySet()", entrySetType, entrySetName, objectName);
+ afterRead.beginControlFlow("for ($T entry : $N)", entryType, entrySetName);
+ afterRead.addStatement("$T key = entry.getKey()", keyType);
+ afterRead.addStatement("$T value = entry.getValue()", valueType);
+ if (keyIsMessage) {
+ afterRead.addStatement("if (key != null) key.afterRead(marshallerObj, descriptorsObj)");
+ }
+ if (valueIsMessage) {
+ afterRead.addStatement("if (value != null) value.afterRead(marshallerObj, descriptorsObj)");
+ }
+ afterRead.endControlFlow();
+ afterRead.endControlFlow().addCode("\n");
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ });
+
+ messageImplBuild.addMethod(afterRead.build());
+ }
+
/**
* Generates implementations of {@link #hashCode} and {@link #equals} for the provided {@code message} and adds them to the provided
* builder.
@@ -322,6 +539,7 @@ public class MessageImplGenerator {
List<ExecutableElement> messageGetters = message.getters();
var fields = new ArrayList<FieldSpec>(messageGetters.size());
+ var allFields = new ArrayList<FieldSpec>(messageGetters.size());
var setters = new ArrayList<MethodSpec>(messageGetters.size());
var getters = new ArrayList<MethodSpec>(messageGetters.size());
@@ -335,6 +553,7 @@ public class MessageImplGenerator {
.build();
fields.add(field);
+ allFields.add(field);
MethodSpec setter = MethodSpec.methodBuilder(getterName)
.addAnnotation(Override.class)
@@ -355,15 +574,45 @@ public class MessageImplGenerator {
.build();
getters.add(getter);
+
+ if (messageGetter.getAnnotation(Marshallable.class) != null) {
+ String name = getterName + "ByteArray";
+ ArrayTypeName type = ArrayTypeName.of(TypeName.BYTE);
+ FieldSpec baField = FieldSpec.builder(type, name)
+ .addModifiers(Modifier.PRIVATE)
+ .build();
+
+ allFields.add(baField);
+
+ MethodSpec baSetter = MethodSpec.methodBuilder(name)
+ .addAnnotation(Override.class)
+ .addModifiers(Modifier.PUBLIC)
+ .returns(builderName)
+ .addParameter(type, name)
+ .addStatement("this.$N = $L", baField, name)
+ .addStatement("return this")
+ .build();
+
+ setters.add(baSetter);
+
+ MethodSpec baGetter = MethodSpec.methodBuilder(name)
+ .addAnnotation(Override.class)
+ .addModifiers(Modifier.PUBLIC)
+ .returns(type)
+ .addStatement("return $N", baField)
+ .build();
+
+ getters.add(baGetter);
+ }
}
return TypeSpec.classBuilder("Builder")
.addModifiers(Modifier.PRIVATE, Modifier.STATIC)
.addSuperinterface(builderName)
- .addFields(fields)
+ .addFields(allFields)
.addMethods(setters)
.addMethods(getters)
- .addMethod(buildMethod(message, messageImplClass, fields))
+ .addMethod(buildMethod(message, messageImplClass, allFields))
.build();
}
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
index 00783d0..3a87c4a 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/BaseMethodNameResolver.java
@@ -38,7 +38,7 @@ import org.apache.ignite.network.NetworkMessage;
* @see MessageReaderMethodResolver
* @see MessageWriterMethodResolver
*/
-class BaseMethodNameResolver {
+public class BaseMethodNameResolver {
/** Processing environment. */
private final ProcessingEnvironment processingEnvironment;
@@ -47,7 +47,7 @@ class BaseMethodNameResolver {
*
* @param processingEnvironment Processing environment.
*/
- BaseMethodNameResolver(ProcessingEnvironment processingEnvironment) {
+ public BaseMethodNameResolver(ProcessingEnvironment processingEnvironment) {
this.processingEnvironment = processingEnvironment;
}
@@ -57,7 +57,7 @@ class BaseMethodNameResolver {
* @param parameterType parameter of the method to resolve
* @return part of the method name, depending on the parameter type
*/
- String resolveBaseMethodName(TypeMirror parameterType) {
+ public String resolveBaseMethodName(TypeMirror parameterType) {
if (parameterType.getKind().isPrimitive()) {
return resolvePrimitiveMethodName(parameterType);
} else if (parameterType.getKind() == TypeKind.ARRAY) {
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
index 8ea3d12..93ae44b 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageDeserializerGenerator.java
@@ -30,6 +30,7 @@ import javax.lang.model.element.Modifier;
import javax.tools.Diagnostic;
import org.apache.ignite.internal.network.processor.MessageClass;
import org.apache.ignite.internal.network.processor.MessageGroupWrapper;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.serialization.MessageDeserializer;
import org.apache.ignite.network.serialization.MessageMappingException;
import org.apache.ignite.network.serialization.MessageReader;
@@ -153,8 +154,14 @@ public class MessageDeserializerGenerator {
private CodeBlock readMessageCodeBlock(ExecutableElement getter, FieldSpec msgField) {
var methodResolver = new MessageReaderMethodResolver(processingEnv);
+ String name = getter.getSimpleName().toString();
+
+ if (getter.getAnnotation(Marshallable.class) != null) {
+ name += "ByteArray";
+ }
+
return CodeBlock.builder()
- .add("$N.$N(reader.", msgField, getter.getSimpleName())
+ .add("$N.$N(reader.", msgField, name)
.add(methodResolver.resolveReadMethod(getter))
.add(")")
.build();
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
index 13e9ae2..925c3f0 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
@@ -60,8 +60,9 @@ class MessageReaderMethodResolver {
String parameterName = getter.getSimpleName().toString();
if (getter.getAnnotation(Marshallable.class) != null) {
+ parameterName += "ByteArray";
return CodeBlock.builder()
- .add("readMarshallable($S)", parameterName)
+ .add("readByteArray($S)", parameterName)
.build();
}
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
index 1926e8e..7d579e4 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageSerializerGenerator.java
@@ -81,10 +81,12 @@ public class MessageSerializerGenerator {
.addAnnotation(Override.class)
.addModifiers(Modifier.PUBLIC)
.returns(boolean.class)
- .addParameter(message.className(), "message")
+ .addParameter(message.className(), "msg")
.addParameter(MessageWriter.class, "writer")
.addException(MessageMappingException.class);
+ method.addStatement("$T message = ($T) msg", message.implClassName(), message.implClassName()).addCode("\n");
+
List<ExecutableElement> getters = message.getters();
method
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java
index 38aeb5e..29d973c 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageWriterMethodResolver.java
@@ -35,7 +35,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
/**
* Class for resolving {@link MessageWriter} "write*" methods for the corresponding message field type.
*/
-class MessageWriterMethodResolver {
+public class MessageWriterMethodResolver {
/** Method name resolver. */
private final BaseMethodNameResolver methodNameResolver;
@@ -47,7 +47,7 @@ class MessageWriterMethodResolver {
*
* @param processingEnvironment Processing environment.
*/
- MessageWriterMethodResolver(ProcessingEnvironment processingEnvironment) {
+ public MessageWriterMethodResolver(ProcessingEnvironment processingEnvironment) {
methodNameResolver = new BaseMethodNameResolver(processingEnvironment);
typeConverter = new MessageCollectionItemTypeConverter(processingEnvironment);
}
@@ -72,8 +72,9 @@ class MessageWriterMethodResolver {
String parameterName = getter.getSimpleName().toString();
if (getter.getAnnotation(Marshallable.class) != null) {
+ parameterName += "ByteArray";
return CodeBlock.builder()
- .add("writeMarshallable($S, message.$L())", parameterName, parameterName)
+ .add("writeByteArray($S, message.$L())", parameterName, parameterName)
.build();
}
diff --git a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
index d0becd8..08c2534 100644
--- a/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
+++ b/modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java
@@ -17,6 +17,9 @@
package org.apache.ignite.network;
+import java.util.Collections;
+import java.util.Set;
+
/**
* Message for exchanging information in a cluster.
*/
@@ -41,4 +44,12 @@ public interface NetworkMessage {
* @return group type.
*/
short groupType();
+
+ default Set<Integer> beforeRead(Set<Integer> ids, Object marshaller) throws Exception {
+ return Collections.emptySet();
+ }
+
+ default void afterRead(Object marshaller, Object descriptors) throws Exception {
+ // No-op.
+ }
}
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 447089b..9d92908 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -34,6 +34,7 @@ import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -53,6 +54,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
import org.apache.ignite.network.TestMessage;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.TestMessagesFactory;
@@ -108,13 +110,13 @@ public class ItConnectionManagerTest {
var fut = new CompletableFuture<NetworkMessage>();
- manager2.addListener((consistentId, message) -> fut.complete(message));
+ manager2.addListener((obj) -> fut.complete(obj.getMessage()));
NettySender sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
TestMessage testMessage = messageFactory.testMessage().msg(msgText).build();
- sender.send(testMessage).get(3, TimeUnit.SECONDS);
+ sender.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
@@ -140,7 +142,7 @@ public class ItConnectionManagerTest {
var fut = new CompletableFuture<NetworkMessage>();
- manager1.addListener((consistentId, message) -> fut.complete(message));
+ manager1.addListener((obj) -> fut.complete(obj.getMessage()));
NettySender senderFrom1to2 = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
@@ -150,9 +152,9 @@ public class ItConnectionManagerTest {
var messageReceivedOn2 = new CompletableFuture<Void>();
// If the message is received, that means that the handshake was successfully performed.
- manager2.addListener((consistentId, message) -> messageReceivedOn2.complete(null));
+ manager2.addListener((message) -> messageReceivedOn2.complete(null));
- senderFrom1to2.send(testMessage);
+ senderFrom1to2.send(new NetworkObject(testMessage, Collections.emptyList()));
messageReceivedOn2.get(3, TimeUnit.SECONDS);
@@ -164,7 +166,7 @@ public class ItConnectionManagerTest {
assertEquals(clientLocalAddress, clientRemoteAddress);
- senderFrom2to1.send(testMessage).get(3, TimeUnit.SECONDS);
+ senderFrom2to1.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
@@ -231,7 +233,7 @@ public class ItConnectionManagerTest {
assertThrows(ClosedChannelException.class, () -> {
try {
- finalSender.send(testMessage).get(3, TimeUnit.SECONDS);
+ finalSender.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
} catch (Exception e) {
throw e.getCause();
}
@@ -241,11 +243,11 @@ public class ItConnectionManagerTest {
var fut = new CompletableFuture<NetworkMessage>();
- manager2.get1().addListener((consistentId, message) -> fut.complete(message));
+ manager2.get1().addListener((obj) -> fut.complete(obj.getMessage()));
sender = manager1.channel(null, new InetSocketAddress(port2)).get(3, TimeUnit.SECONDS);
- sender.send(testMessage).get(3, TimeUnit.SECONDS);
+ sender.send(new NetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS);
NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
index 6ad6231..53b2463 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.mock;
import io.netty.channel.Channel;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -52,6 +53,8 @@ import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
+import org.apache.ignite.network.TestMessage;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.TestMessagesFactory;
import org.junit.jupiter.api.AfterEach;
@@ -130,7 +133,8 @@ public class ItRecoveryHandshakeTest {
assertNotNull(from2to1);
// Ensure the handshake has finished on both sides.
- from2to1.send(messageFactory.testMessage().msg("test").build()).get(3, TimeUnit.SECONDS);
+ TestMessage msg = messageFactory.testMessage().msg("test").build();
+ from2to1.send(new NetworkObject(msg, Collections.emptyList())).get(3, TimeUnit.SECONDS);
NettySender from1to2 = manager1.channel(manager2.consistentId(), manager2.getLocalAddress()).get(3, TimeUnit.SECONDS);
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index 87c215f..fd52931 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -44,7 +44,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.internal.network.NetworkMessageTypes;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
-import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -294,12 +294,16 @@ class ItScaleCubeNetworkMessagingTest {
// register a different handle for the second group
node1.messagingService().addMessageHandler(
NetworkMessageTypes.class,
- (message, senderAddr, correlationId) -> assertTrue(networkMessageFuture.complete(message))
+ (message, senderAddr, correlationId) -> {
+ if (message instanceof FieldDescriptorMessage) {
+ assertTrue(networkMessageFuture.complete(message));
+ }
+ }
);
var testMessage = messageFactory.testMessage().msg("foo").build();
- ClassDescriptorMessage networkMessage = new NetworkMessagesFactory().classDescriptorMessage().build();
+ FieldDescriptorMessage networkMessage = new NetworkMessagesFactory().fieldDescriptorMessage().build();
// test that a message gets delivered to both handlers
node2.messagingService()
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
index bae7c7d..ee0df8b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.network;
+import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
import org.apache.ignite.internal.network.message.InvokeRequest;
@@ -65,4 +66,9 @@ public class NetworkMessageTypes {
* Type for {@link FieldDescriptorMessage}.
*/
public static final short FIELD_DESCRIPTOR_MESSAGE = 6;
+
+ /**
+ * Type for {@link ClassDescriptorListMessage}.
+ */
+ public static final short CLASS_DESCRIPTOR_LIST_MESSAGE = 7;
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClassDescriptorListMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClassDescriptorListMessage.java
new file mode 100644
index 0000000..bfc7b97
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/message/ClassDescriptorListMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.message;
+
+import java.util.Collection;
+import org.apache.ignite.internal.network.NetworkMessageTypes;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+@Transferable(NetworkMessageTypes.CLASS_DESCRIPTOR_LIST_MESSAGE)
+public interface ClassDescriptorListMessage extends NetworkMessage {
+
+ Collection<ClassDescriptorMessage> messages();
+
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
index d8f8967..a95d285 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
@@ -31,7 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.ignite.configuration.schemas.network.NetworkView;
@@ -41,7 +41,6 @@ import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -71,7 +70,7 @@ public class ConnectionManager {
private final SerializationService serializationService;
/** Message listeners. */
- private final List<BiConsumer<String, NetworkMessage>> listeners = new CopyOnWriteArrayList<>();
+ private final List<Consumer<InNetworkObject>> listeners = new CopyOnWriteArrayList<>();
/** Node consistent id. */
private final String consistentId;
@@ -197,11 +196,10 @@ public class ConnectionManager {
/**
* Callback that is called upon receiving a new message.
*
- * @param consistentId Consistent id of the message's sender.
* @param message New message.
*/
- private void onMessage(String consistentId, NetworkMessage message) {
- listeners.forEach(consumer -> consumer.accept(consistentId, message));
+ private void onMessage(InNetworkObject message) {
+ listeners.forEach(consumer -> consumer.accept(message));
}
/**
@@ -243,7 +241,7 @@ public class ConnectionManager {
*
* @param listener Message listener.
*/
- public void addListener(BiConsumer<String, NetworkMessage> listener) {
+ public void addListener(Consumer<InNetworkObject> listener) {
listeners.add(listener);
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
similarity index 51%
copy from modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
index cc72427..85fb883 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InNetworkObject.java
@@ -17,37 +17,32 @@
package org.apache.ignite.internal.network.netty;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import java.util.function.BiConsumer;
+import org.apache.ignite.internal.network.serialization.CompositeDescriptorRegistry;
import org.apache.ignite.network.NetworkMessage;
-/**
- * Network message handler that delegates handling to {@link #messageListener}.
- */
-public class MessageHandler extends ChannelInboundHandlerAdapter {
- /** Message listener. */
- private final BiConsumer<String, NetworkMessage> messageListener;
+public class InNetworkObject {
+
+ private final NetworkMessage message;
- /** Consistent id of the remote node. */
private final String consistentId;
- /**
- * Constructor.
- *
- * @param messageListener Message listener.
- * @param consistentId Consistent id of the remote node.
- */
- public MessageHandler(BiConsumer<String, NetworkMessage> messageListener, String consistentId) {
- this.messageListener = messageListener;
+ private final CompositeDescriptorRegistry registry;
+
+ public InNetworkObject(NetworkMessage message, String consistentId, CompositeDescriptorRegistry registry) {
+ this.message = message;
this.consistentId = consistentId;
+ this.registry = registry;
}
- /** {@inheritDoc} */
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- NetworkMessage message = (NetworkMessage) msg;
+ public NetworkMessage getMessage() {
+ return message;
+ }
+
+ public String getConsistentId() {
+ return consistentId;
+ }
- messageListener.accept(consistentId, message);
+ public CompositeDescriptorRegistry getRegistry() {
+ return registry;
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
index 49c2ed0..cd1bc2b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.ignite.internal.network.direct.DirectMarshallingUtils;
import org.apache.ignite.internal.network.direct.DirectMessageReader;
+import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.NetworkMessage;
@@ -107,7 +108,13 @@ public class InboundDecoder extends ByteToMessageDecoder {
reader.reset();
messageAttr.set(null);
- out.add(msg.getMessage());
+ NetworkMessage message = msg.getMessage();
+
+ if (message instanceof ClassDescriptorListMessage) {
+ onClassDescriptorMessage((ClassDescriptorListMessage) message);
+ } else {
+ out.add(message);
+ }
} else {
messageAttr.set(msg);
}
@@ -133,4 +140,8 @@ public class InboundDecoder extends ByteToMessageDecoder {
}
}
}
+
+ private void onClassDescriptorMessage(ClassDescriptorListMessage msg) {
+ serializationService.mergeDescriptors(msg.messages());
+ }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
index cc72427..c587071 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/MessageHandler.java
@@ -19,7 +19,8 @@ package org.apache.ignite.internal.network.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.network.NetworkMessage;
/**
@@ -27,20 +28,23 @@ import org.apache.ignite.network.NetworkMessage;
*/
public class MessageHandler extends ChannelInboundHandlerAdapter {
/** Message listener. */
- private final BiConsumer<String, NetworkMessage> messageListener;
+ private final Consumer<InNetworkObject> messageListener;
/** Consistent id of the remote node. */
private final String consistentId;
+ private final PerSessionSerializationService serializationService;
+
/**
* Constructor.
- *
- * @param messageListener Message listener.
+ * @param messageListener Message listener.
* @param consistentId Consistent id of the remote node.
+ * @param serializationService
*/
- public MessageHandler(BiConsumer<String, NetworkMessage> messageListener, String consistentId) {
+ public MessageHandler(Consumer<InNetworkObject> messageListener, String consistentId, PerSessionSerializationService serializationService) {
this.messageListener = messageListener;
this.consistentId = consistentId;
+ this.serializationService = serializationService;
}
/** {@inheritDoc} */
@@ -48,6 +52,6 @@ public class MessageHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
NetworkMessage message = (NetworkMessage) msg;
- messageListener.accept(consistentId, message);
+ messageListener.accept(new InNetworkObject(message, consistentId, serializationService.compositeDescriptorRegistry()));
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
index 1b1e4c7..7983e40 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyClient.java
@@ -25,13 +25,12 @@ import io.netty.handler.stream.ChunkedWriteHandler;
import java.net.SocketAddress;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.NetworkMessage;
import org.jetbrains.annotations.Nullable;
/**
@@ -59,7 +58,7 @@ public class NettyClient {
private volatile Channel channel = null;
/** Message listener. */
- private final BiConsumer<String, NetworkMessage> messageListener;
+ private final Consumer<InNetworkObject> messageListener;
/** Handshake manager. */
private final HandshakeManager handshakeManager;
@@ -79,7 +78,7 @@ public class NettyClient {
SocketAddress address,
SerializationService serializationService,
HandshakeManager manager,
- BiConsumer<String, NetworkMessage> messageListener
+ Consumer<InNetworkObject> messageListener
) {
this.address = address;
this.serializationService = serializationService;
@@ -113,7 +112,9 @@ public class NettyClient {
ch.pipeline().addLast(
new InboundDecoder(sessionSerializationService),
- new HandshakeHandler(handshakeManager, (consistentId) -> new MessageHandler(messageListener, consistentId)),
+ new HandshakeHandler(handshakeManager,
+ (consistentId) -> new MessageHandler(messageListener, consistentId, sessionSerializationService)
+ ),
new ChunkedWriteHandler(),
new OutboundEncoder(sessionSerializationService),
new IoExceptionSuppressingHandler()
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
index 6185994..46a598f 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
@@ -21,7 +21,7 @@ import io.netty.channel.Channel;
import io.netty.handler.stream.ChunkedInput;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.network.direct.DirectMessageWriter;
-import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
import org.jetbrains.annotations.TestOnly;
/**
@@ -56,8 +56,8 @@ public class NettySender {
* @param msg Network message.
* @return Future of the send operation.
*/
- public CompletableFuture<Void> send(NetworkMessage msg) {
- return NettyUtils.toCompletableFuture(channel.writeAndFlush(msg));
+ public CompletableFuture<Void> send(NetworkObject obj) {
+ return NettyUtils.toCompletableFuture(channel.writeAndFlush(obj));
}
/**
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
index 89e7f8c..7050d8e 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettyServer.java
@@ -27,7 +27,6 @@ import io.netty.handler.stream.ChunkedWriteHandler;
import java.net.SocketAddress;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -37,7 +36,6 @@ import org.apache.ignite.internal.network.serialization.PerSessionSerializationS
import org.apache.ignite.internal.network.serialization.SerializationService;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.NettyBootstrapFactory;
-import org.apache.ignite.network.NetworkMessage;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -58,7 +56,7 @@ public class NettyServer {
private final SerializationService serializationService;
/** Incoming message listener. */
- private final BiConsumer<String, NetworkMessage> messageListener;
+ private final Consumer<InNetworkObject> messageListener;
/** Handshake manager. */
private final Supplier<HandshakeManager> handshakeManager;
@@ -94,7 +92,7 @@ public class NettyServer {
NetworkView configuration,
Supplier<HandshakeManager> handshakeManager,
Consumer<NettySender> newConnectionListener,
- BiConsumer<String, NetworkMessage> messageListener,
+ Consumer<InNetworkObject> messageListener,
SerializationService serializationService,
NettyBootstrapFactory bootstrapFactory
) {
@@ -139,7 +137,9 @@ public class NettyServer {
*/
new InboundDecoder(sessionSerializationService),
// Handshake handler.
- new HandshakeHandler(manager, (consistentId) -> new MessageHandler(messageListener, consistentId)),
+ new HandshakeHandler(manager,
+ (consistentId) -> new MessageHandler(messageListener, consistentId, sessionSerializationService)
+ ),
/*
* Encoder that uses the MessageWriter
* to write chunked data.
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index c629084..fa6f04b 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -24,15 +24,20 @@ import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.stream.ChunkedInput;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.message.ClassDescriptorListMessage;
import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
import org.apache.ignite.network.serialization.MessageSerializer;
/**
* An encoder for the outbound messages that uses {@link DirectMessageWriter}.
*/
-public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
+public class OutboundEncoder extends MessageToMessageEncoder<NetworkObject> {
+ private static final NetworkMessagesFactory MSG_FACTORY = new NetworkMessagesFactory();
+
/** Serialization registry. */
private final PerSessionSerializationService serializationService;
@@ -47,7 +52,7 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
/** {@inheritDoc} */
@Override
- protected void encode(ChannelHandlerContext ctx, NetworkMessage msg, List<Object> out) throws Exception {
+ protected void encode(ChannelHandlerContext ctx, NetworkObject msg, List<Object> out) throws Exception {
out.add(new NetworkMessageChunkedInput(msg, serializationService));
}
@@ -61,11 +66,17 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
/** Message serializer. */
private final MessageSerializer<NetworkMessage> serializer;
+ private final MessageSerializer<ClassDescriptorListMessage> descriptorSerializer;
+
/** Message writer. */
private final DirectMessageWriter writer;
+ private final ClassDescriptorListMessage descriptors;
+ private final PerSessionSerializationService serializationService;
+
/** Whether the message was fully written. */
private boolean finished = false;
+ private boolean descriptorsFinished = false;
/**
* Constructor.
@@ -74,10 +85,23 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
* @param serializationService Serialization service.
*/
private NetworkMessageChunkedInput(
- NetworkMessage msg,
+ NetworkObject object,
PerSessionSerializationService serializationService
) {
- this.msg = msg;
+ this.serializationService = serializationService;
+ this.msg = object.getNetworkMessage();
+
+ if (!object.getDescriptors().isEmpty()) {
+ descriptors = MSG_FACTORY.classDescriptorListMessage().messages(object.getDescriptors()).build();
+ short groupType = descriptors.groupType();
+ short messageType = descriptors.messageType();
+ descriptorSerializer = serializationService.createMessageSerializer(groupType, messageType);
+ } else {
+ descriptors = null;
+ descriptorSerializer = null;
+ descriptorsFinished = true;
+ }
+
this.serializer = serializationService.createMessageSerializer(msg.groupType(), msg.messageType());
this.writer = new DirectMessageWriter(serializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
}
@@ -113,7 +137,14 @@ public class OutboundEncoder extends MessageToMessageEncoder<NetworkMessage> {
writer.setBuffer(byteBuffer);
- finished = serializer.writeMessage(msg, writer);
+ if (!descriptorsFinished) {
+ descriptorsFinished = descriptorSerializer.writeMessage(descriptors, writer);
+ if (descriptorsFinished) {
+ writer.reset();
+ }
+ } else {
+ finished = serializer.writeMessage(msg, writer);
+ }
buffer.writerIndex(byteBuffer.position() - initialPosition);
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
index 5f37db8..8c29293 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.recovery;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
/**
* Recovery protocol handshake manager for a client.
@@ -78,7 +80,7 @@ public class RecoveryClientHandshakeManager implements HandshakeManager {
.connectionsCount(0)
.build();
- ChannelFuture sendFuture = channel.writeAndFlush(response);
+ ChannelFuture sendFuture = channel.writeAndFlush(new NetworkObject(response, Collections.emptyList()));
NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
if (throwable != null) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
index 4954981..bb0b5f3 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.network.recovery;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.network.netty.NettyUtils;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartMessage;
import org.apache.ignite.internal.network.recovery.message.HandshakeStartResponseMessage;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.NetworkObject;
/**
* Recovery protocol handshake manager for a server.
@@ -76,7 +78,7 @@ public class RecoveryServerHandshakeManager implements HandshakeManager {
.consistentId(consistentId)
.build();
- ChannelFuture sendFuture = channel.writeAndFlush(handshakeStartMessage);
+ ChannelFuture sendFuture = channel.writeAndFlush(new NetworkObject(handshakeStartMessage, Collections.emptyList()));
NettyUtils.toCompletableFuture(sendFuture).whenComplete((unused, throwable) -> {
if (throwable != null) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
index 288b51e..4ce1381 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
@@ -25,6 +25,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -184,6 +185,14 @@ public class PerSessionSerializationService {
return messages;
}
+ public boolean isSent(int descriptorId) {
+ return sentDescriptors.contains(descriptorId);
+ }
+
+ public void addSent(int descriptorId) {
+ sentDescriptors.add(descriptorId);
+ }
+
private byte fieldFlags(FieldDescriptor fieldDescriptor) {
int bits = condMask(fieldDescriptor.isUnshared(), FieldDescriptorMessage.UNSHARED_MASK)
| condMask(fieldDescriptor.isPrimitive(), FieldDescriptorMessage.IS_PRIMITIVE)
@@ -232,7 +241,7 @@ public class PerSessionSerializationService {
return id;
}
- private void mergeDescriptors(List<ClassDescriptorMessage> remoteDescriptors) {
+ public void mergeDescriptors(Collection<ClassDescriptorMessage> remoteDescriptors) {
List<ClassDescriptorMessage> leftToProcess = remoteDescriptors.stream()
.filter(classMessage -> !knownMergedDescriptor(classMessage.descriptorId()))
.collect(toCollection(LinkedList::new));
@@ -378,6 +387,11 @@ public class PerSessionSerializationService {
}
}
+ public CompositeDescriptorRegistry compositeDescriptorRegistry() {
+ return descriptors;
+ }
+
+
@TestOnly
Map<Integer, ClassDescriptor> getDescriptorMapView() {
return mergedIdToDescriptorMap;
diff --git a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
index 9d8965d..2ee7961 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
@@ -18,22 +18,40 @@
package org.apache.ignite.network;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.network.NettyBootstrapFactory.isInNetworkThread;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.HashSet;
+import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
import org.apache.ignite.internal.network.message.InvokeRequest;
import org.apache.ignite.internal.network.message.InvokeResponse;
-import org.apache.ignite.internal.network.message.ScaleCubeMessage;
import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.InNetworkObject;
import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.internal.network.serialization.ClassDescriptor;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.CompositeDescriptorRegistry;
+import org.apache.ignite.internal.network.serialization.FieldDescriptor;
+import org.apache.ignite.internal.network.serialization.Serialization;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
import org.apache.ignite.lang.NodeStoppingException;
import org.jetbrains.annotations.Nullable;
@@ -42,9 +60,15 @@ public class DefaultMessagingService extends AbstractMessagingService {
/** Network messages factory. */
private final NetworkMessagesFactory factory;
+ /** Integer value that is sent when there is no descriptor. */
+ private static final int NO_DESCRIPTOR_ID = Integer.MIN_VALUE;
+
/** Topology service. */
private final TopologyService topologyService;
+ private final UserObjectMarshaller marshaller;
+ private final ClassDescriptorRegistry classDescriptorRegistry;
+
/** Connection manager that provides access to {@link NettySender}. */
private volatile ConnectionManager connectionManager;
@@ -75,6 +99,9 @@ public class DefaultMessagingService extends AbstractMessagingService {
public DefaultMessagingService(NetworkMessagesFactory factory, TopologyService topologyService) {
this.factory = factory;
this.topologyService = topologyService;
+ UserObjectSerializationContext userObjectSerializationContext = createUserObjectSerializationContext();
+ this.marshaller = userObjectSerializationContext.marshaller();
+ this.classDescriptorRegistry = userObjectSerializationContext.descriptorRegistry();
}
/**
@@ -158,7 +185,24 @@ public class DefaultMessagingService extends AbstractMessagingService {
String recipientConsistentId = recipient != null ? recipient.name() : address.consistentId();
- return connectionManager.channel(recipientConsistentId, addr).thenCompose(sender -> sender.send(message));
+ return this.sendMessage0(message, recipientConsistentId, addr);
+ }
+
+ private CompletableFuture<Void> sendMessage0(NetworkMessage message, String recipientConsistentId, InetSocketAddress addr) {
+ if (isInNetworkThread()) {
+ return CompletableFuture.supplyAsync(() -> sendMessage0(message, recipientConsistentId, addr), svc).thenCompose(fut -> fut);
+ }
+
+ List<ClassDescriptorMessage> descriptors;
+
+ try {
+ descriptors = beforeRead(message);
+ } catch (Exception e) {
+ return CompletableFuture.failedFuture(e);
+ }
+
+ return connectionManager.channel(recipientConsistentId, addr).thenCompose(
+ sender -> sender.send(new NetworkObject(message, descriptors)));
}
/**
@@ -194,8 +238,95 @@ public class DefaultMessagingService extends AbstractMessagingService {
String recipientConsistentId = recipient != null ? recipient.name() : addr.consistentId();
- return connectionManager.channel(recipientConsistentId, address).thenCompose(sender -> sender.send(message))
- .thenCompose(unused -> responseFuture);
+ return sendMessage0(message, recipientConsistentId, address).thenCompose(unused -> responseFuture);
+ }
+
+ private List<ClassDescriptorMessage> beforeRead(NetworkMessage msg) throws Exception {
+ Set<Integer> ids = msg.beforeRead(new HashSet<>(), marshaller);
+
+ return createClassDescriptorsMessages(ids);
+ }
+
+ public List<ClassDescriptorMessage> createClassDescriptorsMessages(Set<Integer> descriptorIds) {
+ List<ClassDescriptorMessage> messages = descriptorIds.stream()
+ .map(classDescriptorRegistry::getDescriptor)
+ .map(descriptor -> {
+ List<FieldDescriptorMessage> fields = descriptor.fields().stream()
+ .map(d -> {
+ return factory.fieldDescriptorMessage()
+ .name(d.name())
+ .typeDescriptorId(d.typeDescriptorId())
+ .className(d.typeName())
+ .flags(fieldFlags(d))
+ .build();
+ })
+ .collect(toList());
+
+ Serialization serialization = descriptor.serialization();
+
+ return factory.classDescriptorMessage()
+ .fields(fields)
+ .serializationType((byte) serialization.type().value())
+ .serializationFlags(serializationAttributeFlags(serialization))
+ .descriptorId(descriptor.descriptorId())
+ .className(descriptor.className())
+ .superClassDescriptorId(superClassDescriptorIdForMessage(descriptor))
+ .superClassName(descriptor.superClassName())
+ .componentTypeDescriptorId(componentTypeDescriptorIdForMessage(descriptor))
+ .componentTypeName(descriptor.componentTypeName())
+ .attributes(classDescriptorAttributeFlags(descriptor))
+ .build();
+ }).collect(toList());
+
+ return messages;
+ }
+
+ private byte fieldFlags(FieldDescriptor fieldDescriptor) {
+ int bits = condMask(fieldDescriptor.isUnshared(), FieldDescriptorMessage.UNSHARED_MASK)
+ | condMask(fieldDescriptor.isPrimitive(), FieldDescriptorMessage.IS_PRIMITIVE)
+ | condMask(fieldDescriptor.isRuntimeTypeKnownUpfront(), FieldDescriptorMessage.IS_RUNTIME_TYPE_KNOWN_UPFRONT);
+ return (byte) bits;
+ }
+
+ private byte serializationAttributeFlags(Serialization serialization) {
+ int bits = condMask(serialization.hasWriteObject(), ClassDescriptorMessage.HAS_WRITE_OBJECT_MASK)
+ | condMask(serialization.hasReadObject(), ClassDescriptorMessage.HAS_READ_OBJECT_MASK)
+ | condMask(serialization.hasReadObjectNoData(), ClassDescriptorMessage.HAS_READ_OBJECT_NO_DATA_MASK)
+ | condMask(serialization.hasWriteReplace(), ClassDescriptorMessage.HAS_WRITE_REPLACE_MASK)
+ | condMask(serialization.hasReadResolve(), ClassDescriptorMessage.HAS_READ_RESOLVE_MASK);
+ return (byte) bits;
+ }
+
+ private int condMask(boolean value, int mask) {
+ return value ? mask : 0;
+ }
+
+ private byte classDescriptorAttributeFlags(ClassDescriptor descriptor) {
+ int bits = condMask(descriptor.isPrimitive(), ClassDescriptorMessage.IS_PRIMITIVE_MASK)
+ | condMask(descriptor.isArray(), ClassDescriptorMessage.IS_ARRAY_MASK)
+ | condMask(descriptor.isRuntimeEnum(), ClassDescriptorMessage.IS_RUNTIME_ENUM_MASK)
+ | condMask(descriptor.isRuntimeTypeKnownUpfront(), ClassDescriptorMessage.IS_RUNTIME_TYPE_KNOWN_UPFRONT_MASK);
+ return (byte) bits;
+ }
+
+ private int superClassDescriptorIdForMessage(ClassDescriptor descriptor) {
+ Integer id = descriptor.superClassDescriptorId();
+
+ if (id == null) {
+ return NO_DESCRIPTOR_ID;
+ }
+
+ return id;
+ }
+
+ private int componentTypeDescriptorIdForMessage(ClassDescriptor descriptor) {
+ Integer id = descriptor.componentTypeDescriptorId();
+
+ if (id == null) {
+ return NO_DESCRIPTOR_ID;
+ }
+
+ return id;
}
/**
@@ -212,18 +343,28 @@ public class DefaultMessagingService extends AbstractMessagingService {
}
}
+ private final ExecutorService svc = Executors.newSingleThreadExecutor();
+
/**
* Handles an incoming messages.
*
* @param consistentId Sender's consistent id.
* @param msg Incoming message.
*/
- private void onMessage(String consistentId, NetworkMessage msg) {
- if (msg instanceof ScaleCubeMessage) {
- // ScaleCube messages are handled in the ScaleCubeTransport
+ private void onMessage(InNetworkObject obj) {
+ if (isInNetworkThread()) {
+ svc.submit(() -> onMessage(obj));
return;
}
+ NetworkMessage msg = obj.getMessage();
+ CompositeDescriptorRegistry registry = obj.getRegistry();
+ String consistentId = obj.getConsistentId();
+ try {
+ msg.afterRead(marshaller, registry);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
if (msg instanceof InvokeResponse) {
InvokeResponse response = (InvokeResponse) msg;
onInvokeResponse(response.message(), response.correlationId());
@@ -345,4 +486,14 @@ public class DefaultMessagingService extends AbstractMessagingService {
requestsMap.clear();
}
+
+ private UserObjectSerializationContext createUserObjectSerializationContext() {
+ var userObjectDescriptorRegistry = new ClassDescriptorRegistry();
+ var userObjectDescriptorFactory = new ClassDescriptorFactory(userObjectDescriptorRegistry);
+
+ var userObjectMarshaller = new DefaultUserObjectMarshaller(userObjectDescriptorRegistry, userObjectDescriptorFactory);
+
+ return new UserObjectSerializationContext(userObjectDescriptorRegistry, userObjectDescriptorFactory,
+ userObjectMarshaller);
+ }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
index 0d11ccf..a4b7fa8 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/NettyBootstrapFactory.java
@@ -139,6 +139,22 @@ public class NettyBootstrapFactory implements IgniteComponent {
clientWorkerGroup = NamedNioEventLoopGroup.create(eventLoopGroupNamePrefix + "-client");
}
+ public static boolean isInNetworkThread() {
+ String name = Thread.currentThread().getName();
+
+ if (name.contains("-srv-worker")) {
+ return true;
+ }
+ if (name.contains("-client")) {
+ return true;
+ }
+ if (name.contains("-srv-accept")) {
+ return true;
+ }
+
+ return false;
+ }
+
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
diff --git a/modules/network/src/main/java/org/apache/ignite/network/NetworkObject.java b/modules/network/src/main/java/org/apache/ignite/network/NetworkObject.java
new file mode 100644
index 0000000..9b0d7fb
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/network/NetworkObject.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.List;
+import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
+
+public class NetworkObject {
+
+ private final NetworkMessage networkMessage;
+ private final List<ClassDescriptorMessage> descriptors;
+
+ public NetworkObject(NetworkMessage networkMessage, List<ClassDescriptorMessage> descriptors) {
+ this.networkMessage = networkMessage;
+ this.descriptors = descriptors;
+ }
+
+ public NetworkMessage getNetworkMessage() {
+ return networkMessage;
+ }
+
+ public List<ClassDescriptorMessage> getDescriptors() {
+ return descriptors;
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index e1faa56..f8f77be 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -104,7 +104,9 @@ public class ScaleCubeClusterServiceFactory {
nettyBootstrapFactory
);
- var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr, topologyService, messageFactory);
+ connectionMgr.start();
+
+ var transport = new ScaleCubeDirectMarshallerTransport(connectionMgr.getLocalAddress(), messagingService, topologyService, messageFactory);
NodeFinder finder = NodeFinderFactory.createNodeFinder(configView.nodeFinder());
@@ -122,8 +124,6 @@ public class ScaleCubeClusterServiceFactory {
shutdownFuture = cluster.onShutdown().toFuture();
- connectionMgr.start();
-
// resolve cyclic dependencies
topologyService.setCluster(cluster);
messagingService.setConnectionManager(connectionMgr);
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index ad763de..e263467 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Objects;
+import org.apache.ignite.internal.network.NetworkMessageTypes;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.message.ScaleCubeMessage;
import org.apache.ignite.internal.network.message.ScaleCubeMessageBuilder;
@@ -32,6 +33,7 @@ import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
import org.jetbrains.annotations.Nullable;
@@ -63,7 +65,7 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
private final MonoProcessor<Void> onStop = MonoProcessor.create();
/** Connection manager. */
- private final ConnectionManager connectionManager;
+ private final MessagingService messagingService;
/** Message factory. */
private final NetworkMessagesFactory messageFactory;
@@ -82,15 +84,17 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
* @param messageFactory message factory
*/
ScaleCubeDirectMarshallerTransport(
- ConnectionManager connectionManager,
+ SocketAddress localAddress,
+ MessagingService messagingService,
ScaleCubeTopologyService topologyService,
NetworkMessagesFactory messageFactory
) {
- this.connectionManager = connectionManager;
+ this.address = prepareAddress(localAddress);
+ this.messagingService = messagingService;
this.topologyService = topologyService;
this.messageFactory = messageFactory;
- this.connectionManager.addListener(this::onMessage);
+ this.messagingService.addMessageHandler(NetworkMessageTypes.class, (message, senderAddr, correlationId) -> onMessage(message));
// Setup cleanup
stop.then(doStop())
.doFinally(s -> onStop.onComplete())
@@ -145,8 +149,6 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
/** {@inheritDoc} */
@Override
public Mono<Transport> start() {
- address = prepareAddress(connectionManager.getLocalAddress());
-
return Mono.just(this);
}
@@ -171,21 +173,23 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
var addr = InetSocketAddress.createUnresolved(address.host(), address.port());
return Mono.fromFuture(() -> {
- ClusterNode node = topologyService.getByAddress(NetworkAddress.from(addr));
+ NetworkAddress networkAddress = NetworkAddress.from(addr);
+ ClusterNode node = topologyService.getByAddress(networkAddress);
- String consistentId = node != null ? node.name() : null;
+ if (node == null) {
+ node = new ClusterNode(null, null, networkAddress);
+ }
- return connectionManager.channel(consistentId, addr).thenCompose(client -> client.send(fromMessage(message)));
+ return messagingService.send(node, fromMessage(message));
});
}
/**
* Handles new network messages from {@link #connectionManager}.
*
- * @param senderConsistentId Sender's consistent id.
* @param msg Network message.
*/
- private void onMessage(String senderConsistentId, NetworkMessage msg) {
+ private void onMessage(NetworkMessage msg) {
Message message = fromNetworkMessage(msg);
if (message != null) {
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
index cc6cfb0..54ab776 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyClientTest.java
@@ -168,7 +168,7 @@ public class NettyClientTest {
address,
null,
new MockClientHandshakeManager(channel),
- (address1, message) -> {
+ (message) -> {
}
);
@@ -190,7 +190,7 @@ public class NettyClientTest {
address,
null,
new MockClientHandshakeManager(future.channel()),
- (address1, message) -> {
+ (message) -> {
}
);
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index e59db1a..4e84a89 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -183,7 +183,7 @@ public class NettyServerTest {
() -> handshakeManager,
sender -> {
},
- (socketAddress, message) -> {
+ (message) -> {
},
new SerializationService(registry, mock(UserObjectSerializationContext.class)),
bootstrapFactory
diff --git a/modules/raft/pom.xml b/modules/raft/pom.xml
index 5d88ad5..2a3e2bc 100644
--- a/modules/raft/pom.xml
+++ b/modules/raft/pom.xml
@@ -118,7 +118,6 @@
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-network</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
diff --git a/modules/transactions/pom.xml b/modules/transactions/pom.xml
index 7ffc632..26155ec 100644
--- a/modules/transactions/pom.xml
+++ b/modules/transactions/pom.xml
@@ -50,6 +50,11 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-network</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-raft-client</artifactId>
</dependency>