You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sd...@apache.org on 2022/01/11 23:56:55 UTC
[ignite-3] 27/27: IGNITE-16250 Raft, SQL and TX use marshallable and network messages
This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch uos-network
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit d2637e72dd6b8d64b23687c3297cb8a52802442d
Author: Semyon Danilov <sa...@yandex.ru>
AuthorDate: Mon Jan 10 19:34:19 2022 +0300
IGNITE-16250 Raft, SQL and TX use marshallable and network messages
---
.../client/ItMetaStorageRaftGroupTest.java | 8 +-
.../client/ItMetaStorageServiceTest.java | 11 ++-
.../serialization/MessageReaderMethodResolver.java | 4 +-
.../network/netty/ItConnectionManagerTest.java | 3 +-
.../network/recovery/ItRecoveryHandshakeTest.java | 6 +-
.../network/scalecube/ItNodeRestartsTest.java | 6 +-
.../scalecube/ItScaleCubeNetworkMessagingTest.java | 35 ++-----
.../ignite/utils/ClusterServiceTestUtils.java | 8 +-
.../stream/DirectByteBufferStreamImplV1.java | 33 +++++--
.../internal/network/message/ScaleCubeMessage.java | 8 +-
.../serialization/ClassDescriptorFactory.java | 2 +-
.../serialization/CompositeIndexedDescriptors.java | 45 +++++++++
.../PerSessionSerializationService.java | 110 ++++++++++++++-------
.../serialization/SerializationService.java | 95 +++++++++++++-----
.../UserObjectSerializationContext.java | 53 ++++++++++
.../UserObjectSerializationException.java} | 33 +++----
.../serialization/UserObjectSerializer.java | 48 ---------
.../scalecube/ScaleCubeClusterServiceFactory.java | 12 ++-
.../ScaleCubeDirectMarshallerTransport.java | 32 +++---
.../internal/network/netty/InboundDecoderTest.java | 8 +-
.../internal/network/netty/NettyServerTest.java | 3 +-
.../network/serialization/MarshallableTest.java | 62 +++++-------
.../apache/ignite/internal/raft/ItLozaTest.java | 11 ++-
.../service/ItAbstractListenerSnapshotTest.java | 11 ++-
.../ignite/raft/jraft/core/ItCliServiceTest.java | 8 +-
.../apache/ignite/raft/jraft/core/ItNodeTest.java | 14 ++-
.../ignite/raft/server/RaftServerAbstractTest.java | 11 ++-
.../ignite/raft/jraft/core/CliServiceImpl.java | 2 +-
.../raft/jraft/entity/LocalFileMetaOutter.java | 6 +-
.../raft/jraft/entity/LocalStorageOutter.java | 12 ++-
.../ignite/raft/jraft/entity/RaftOutter.java | 26 ++---
.../ignite/raft/jraft/rpc/ActionRequest.java | 5 +-
.../ignite/raft/jraft/rpc/ActionResponse.java | 5 +-
.../apache/ignite/raft/jraft/rpc/CliRequests.java | 70 ++++++-------
.../apache/ignite/raft/jraft/rpc/RpcRequests.java | 37 ++++---
.../raft/jraft/rpc/impl/RaftGroupServiceImpl.java | 44 ++++-----
.../apache/ignite/raft/jraft/core/TestCluster.java | 8 +-
.../ignite/raft/jraft/rpc/AbstractRpcTest.java | 8 +-
.../ignite/raft/jraft/rpc/IgniteRpcTest.java | 16 ++-
.../ItDistributedConfigurationPropertiesTest.java | 10 +-
.../ItDistributedConfigurationStorageTest.java | 10 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 10 +-
.../internal/sql/engine/message/ErrorMessage.java | 4 +-
.../sql/engine/message/QueryBatchMessage.java | 4 +-
.../sql/engine/message/QueryStartRequest.java | 5 +-
.../sql/engine/message/QueryStartResponse.java | 4 +-
.../distributed/ItInternalTableScanTest.java | 13 ++-
.../ignite/distributed/ItTablePersistenceTest.java | 5 +
.../distributed/ItTxDistributedTestSingleNode.java | 13 ++-
.../internal/tx/message/TxFinishRequest.java | 5 +-
.../internal/tx/message/TxFinishResponse.java | 2 +-
51 files changed, 642 insertions(+), 362 deletions(-)
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
index 3f3a386..1ff978f 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageRaftGroupTest.java
@@ -50,12 +50,14 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.core.Replicator;
import org.apache.ignite.raft.jraft.entity.PeerId;
@@ -172,13 +174,17 @@ public class ItMetaStorageRaftGroupTest {
var nodeFinder = new StaticNodeFinder(localAddresses);
+ MessageSerializationRegistryImpl registry = new MessageSerializationRegistryImpl();
+ RaftMessagesSerializationRegistryInitializer.registerFactories(registry);
+
localAddresses.stream()
.map(
addr -> ClusterServiceTestUtils.clusterService(
testInfo,
addr.port(),
nodeFinder,
- NETWORK_FACTORY
+ NETWORK_FACTORY,
+ registry
)
)
.forEach(clusterService -> {
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
index 7d9244e..e00081e 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ItMetaStorageServiceTest.java
@@ -65,12 +65,14 @@ import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.jetbrains.annotations.NotNull;
@@ -107,6 +109,12 @@ public class ItMetaStorageServiceTest {
/** Network factory. */
private static final TestScaleCubeClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
+ private static final MessageSerializationRegistryImpl SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
+
+ static {
+ RaftMessagesSerializationRegistryInitializer.registerFactories(SERIALIZATION_REGISTRY);
+ }
+
/** Expected server result entry. */
private static final org.apache.ignite.internal.metastorage.server.Entry EXPECTED_SRV_RESULT_ENTRY =
new org.apache.ignite.internal.metastorage.server.Entry(
@@ -209,7 +217,8 @@ public class ItMetaStorageServiceTest {
testInfo,
addr.port(),
nodeFinder,
- NETWORK_FACTORY
+ NETWORK_FACTORY,
+ SERIALIZATION_REGISTRY
)
)
.forEach(clusterService -> {
diff --git a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
index 99e0dc2..13e9ae2 100644
--- a/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
+++ b/modules/network-annotation-processor/src/main/java/org/apache/ignite/internal/network/processor/serialization/MessageReaderMethodResolver.java
@@ -59,14 +59,14 @@ class MessageReaderMethodResolver {
String parameterName = getter.getSimpleName().toString();
- String methodName = methodNameResolver.resolveBaseMethodName(parameterType);
-
if (getter.getAnnotation(Marshallable.class) != null) {
return CodeBlock.builder()
.add("readMarshallable($S)", parameterName)
.build();
}
+ String methodName = methodNameResolver.resolveBaseMethodName(parameterType);
+
switch (methodName) {
case "ObjectArray":
return resolveReadObjectArray((ArrayType) parameterType, parameterName);
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 8229dbd..bff3ab5 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.NettyBootstrapFactory;
@@ -349,7 +350,7 @@ public class ItConnectionManagerTest {
var manager = new ConnectionManager(
cfg,
- new SerializationService(registry, null),
+ new SerializationService(registry, mock(UserObjectSerializationContext.class)),
consistentId,
() -> new RecoveryServerHandshakeManager(launchId, consistentId, messageFactory),
() -> new RecoveryClientHandshakeManager(launchId, consistentId, messageFactory),
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
index 16d846c..d4ffcee 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/recovery/ItRecoveryHandshakeTest.java
@@ -31,6 +31,7 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
import io.netty.channel.Channel;
import java.util.ArrayList;
@@ -48,6 +49,7 @@ import org.apache.ignite.internal.network.handshake.HandshakeAction;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.netty.NettySender;
import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
@@ -400,7 +402,7 @@ public class ItRecoveryHandshakeTest {
ClientStageFail clientHandshakeFailAt
) {
var registry = new TestMessageSerializationRegistryImpl();
- var serializationService = new SerializationService(registry, null);
+ var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
var messageFactory = new NetworkMessagesFactory();
@@ -439,7 +441,7 @@ public class ItRecoveryHandshakeTest {
*/
private ConnectionManager startManager(int port) {
var registry = new TestMessageSerializationRegistryImpl();
- var serializationService = new SerializationService(registry, null);
+ var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
var messageFactory = new NetworkMessagesFactory();
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItNodeRestartsTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItNodeRestartsTest.java
index 7e3a668..62d9fc7 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItNodeRestartsTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItNodeRestartsTest.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.StaticNodeFinder;
@@ -43,6 +44,8 @@ class ItNodeRestartsTest {
/** Network factory. */
private final TestScaleCubeClusterServiceFactory networkFactory = new TestScaleCubeClusterServiceFactory();
+ private final MessageSerializationRegistryImpl serializationRegistry = new MessageSerializationRegistryImpl();
+
/** Created {@link ClusterService}s. Needed for resource management. */
private List<ClusterService> services;
@@ -112,7 +115,8 @@ class ItNodeRestartsTest {
testInfo,
addr.port(),
nodeFinder,
- networkFactory
+ networkFactory,
+ serializationRegistry
);
clusterService.start();
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index bd2c4f8..749b8b0 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -29,7 +29,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import io.scalecube.cluster.ClusterImpl;
import io.scalecube.cluster.transport.api.Transport;
-import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Collection;
@@ -43,6 +42,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.internal.network.NetworkMessageTypes;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -51,10 +52,10 @@ import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.TestMessageTypes;
import org.apache.ignite.network.TestMessagesFactory;
import org.apache.ignite.network.TopologyEventHandler;
-import org.apache.ignite.network.annotations.MessageGroup;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -264,29 +265,6 @@ class ItScaleCubeNetworkMessagingTest {
}
/**
- * Serializable message that belongs to the {@link NetworkMessageTypes} message group.
- */
- private static class MockNetworkMessage implements NetworkMessage, Serializable {
- /** {@inheritDoc} */
- @Override
- public short messageType() {
- return 666;
- }
-
- /** {@inheritDoc} */
- @Override
- public short groupType() {
- return NetworkMessageTypes.class.getAnnotation(MessageGroup.class).groupType();
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean equals(Object obj) {
- return getClass() == obj.getClass();
- }
- }
-
- /**
* Tests that messages from different message groups can be delivered to different sets of handlers.
*
* @throws Exception in case of errors.
@@ -322,7 +300,7 @@ class ItScaleCubeNetworkMessagingTest {
var testMessage = messageFactory.testMessage().msg("foo").build();
- var networkMessage = new MockNetworkMessage();
+ ScaleCubeMessage networkMessage = new NetworkMessagesFactory().scaleCubeMessage().build();
// test that a message gets delivered to both handlers
node2.messagingService()
@@ -420,6 +398,8 @@ class ItScaleCubeNetworkMessagingTest {
/** Network factory. */
private final TestScaleCubeClusterServiceFactory networkFactory = new TestScaleCubeClusterServiceFactory();
+ private final TestMessageSerializationRegistryImpl serializationRegistry = new TestMessageSerializationRegistryImpl();
+
/** Members of the cluster. */
final List<ClusterService> members;
@@ -466,7 +446,8 @@ class ItScaleCubeNetworkMessagingTest {
testInfo,
addr.port(),
nodeFinder,
- networkFactory
+ networkFactory,
+ serializationRegistry
);
if (initial) {
diff --git a/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java b/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
index 2bf246c..d58618b 100644
--- a/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
+++ b/modules/network/src/integrationTest/java/org/apache/ignite/utils/ClusterServiceTestUtils.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkAddress;
@@ -39,6 +38,7 @@ import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.TopologyService;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.junit.jupiter.api.TestInfo;
/**
@@ -59,10 +59,10 @@ public class ClusterServiceTestUtils {
TestInfo testInfo,
int port,
NodeFinder nodeFinder,
- TestScaleCubeClusterServiceFactory clusterSvcFactory
+ TestScaleCubeClusterServiceFactory clusterSvcFactory,
+ MessageSerializationRegistry registry
) {
- var messageSerializationRegistry = new MessageSerializationRegistryImpl();
- var ctx = new ClusterLocalConfiguration(testNodeName(testInfo, port), messageSerializationRegistry);
+ var ctx = new ClusterLocalConfiguration(testNodeName(testInfo, port), registry);
ConfigurationManager nodeConfigurationMgr = new ConfigurationManager(
Collections.singleton(NetworkConfiguration.KEY),
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
index 0122280..e84a474 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
@@ -46,7 +46,7 @@ import java.util.RandomAccess;
import java.util.UUID;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
-import org.apache.ignite.internal.network.serialization.SerializationResult;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalledObject;
import org.apache.ignite.internal.util.ArrayFactory;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -1365,13 +1365,22 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
public <T> void writeMarshallable(T object, MessageWriter writer) {
switch (marshallableState) {
case 0:
+ writeBoolean(object == null);
+
+ if (!lastFinished || object == null) {
+ return;
+ }
+
+ marshallableState++;
+
+ //noinspection fallthrough
+ case 1:
if (marshallable == null) {
// If object was not serialized to a byte array, serialize it
- SerializationResult res = serializationService.writeMarshallable(object);
- List<Integer> descriptorIds = res.ids();
- marshallable = res.array();
+ MarshalledObject res = serializationService.writeMarshallable(object);
+ marshallable = res.bytes();
// Get descriptors that were not previously sent to the remote node
- descriptors = serializationService.createClassDescriptorsMessages(descriptorIds);
+ descriptors = serializationService.createClassDescriptorsMessages(res.usedDescriptors());
}
writeCollection(descriptors, MessageCollectionItemType.MSG, writer);
@@ -1383,7 +1392,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
marshallableState++;
//noinspection fallthrough
- case 1:
+ case 2:
writeByteArray(marshallable);
if (!lastFinished) {
@@ -1405,6 +1414,16 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
public <T> T readMarshallable(MessageReader reader) {
switch (marshallableState) {
case 0:
+ boolean isNull = readBoolean();
+
+ if (!lastFinished || isNull) {
+ return null;
+ }
+
+ marshallableState++;
+
+ //noinspection fallthrough
+ case 1:
descriptors = readCollection(MessageCollectionItemType.MSG, reader);
if (!lastFinished) {
@@ -1414,7 +1433,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
marshallableState++;
//noinspection fallthrough
- case 1:
+ case 2:
marshallable = readByteArray();
if (!lastFinished) {
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/message/ScaleCubeMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/message/ScaleCubeMessage.java
index 3369684..33fdbe6 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/message/ScaleCubeMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/message/ScaleCubeMessage.java
@@ -21,15 +21,19 @@ import io.scalecube.cluster.transport.api.Message;
import java.util.Map;
import org.apache.ignite.internal.network.NetworkMessageTypes;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
- * Wrapper for ScaleCube's {@link Message}. {@link Message#data} is stored in {@link #array} and {@link Message#headers} are stored in
+ * Wrapper for ScaleCube's {@link Message}. {@link Message#data} is stored in {@link #data} and {@link Message#headers} are stored in
* {@link #headers}.
*/
@Transferable(NetworkMessageTypes.SCALE_CUBE_MESSAGE)
public interface ScaleCubeMessage extends NetworkMessage {
- byte[] array();
+ @Marshallable
+ Object data();
+
+ NetworkMessage message();
Map<String, String> headers();
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptorFactory.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptorFactory.java
index 6d45f34..0649739 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptorFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/ClassDescriptorFactory.java
@@ -150,7 +150,7 @@ public class ClassDescriptorFactory {
private ClassDescriptor superClassDescriptor(Class<?> clazz) {
Class<?> superclass = clazz.getSuperclass();
- if (superclass == null || superclass == Object.class) {
+ if (superclass == null || superclass == Object.class || superclass == Enum.class) {
return null;
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/CompositeIndexedDescriptors.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/CompositeIndexedDescriptors.java
new file mode 100644
index 0000000..ddeb86e
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/CompositeIndexedDescriptors.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Descriptor provider that uses {@link ClassDescriptorFactoryContext} for built-in descriptor ids and
+ * delegates to another {@link IdIndexedDescriptors} for other ids.
+ */
+public class CompositeIndexedDescriptors implements IdIndexedDescriptors {
+ private final IdIndexedDescriptors descriptors;
+ private final ClassDescriptorFactoryContext ctx;
+
+ public CompositeIndexedDescriptors(IdIndexedDescriptors descriptors,
+ ClassDescriptorFactoryContext ctx) {
+ this.descriptors = descriptors;
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public @Nullable ClassDescriptor getDescriptor(int descriptorId) {
+ if (ClassDescriptorFactoryContext.shouldBeBuiltIn(descriptorId)) {
+ return ctx.getDescriptor(descriptorId);
+ }
+
+ return descriptors.getDescriptor(descriptorId);
+ }
+}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
index fb92bd0..f57c013 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
@@ -22,11 +22,13 @@ import static java.util.stream.Collectors.toList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalledObject;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageDeserializer;
import org.apache.ignite.network.serialization.MessageSerializer;
@@ -60,8 +62,25 @@ public class PerSessionSerializationService {
*/
private final Map<Integer, ClassDescriptor> descriptorMapView = Collections.unmodifiableMap(mergedDescriptorMap);
+ /**
+ * A collection of the descriptors that were sent to the remote node.
+ */
+ private final Set<Integer> sentDescriptors = ConcurrentHashMap.newKeySet();
+
+ /**
+ * Descriptors provider.
+ */
+ private final CompositeIndexedDescriptors descriptors;
+
+ /**
+ * Constructor.
+ *
+ * @param serializationService Serialization service.
+ */
public PerSessionSerializationService(@NotNull SerializationService serializationService) {
this.serializationService = serializationService;
+ this.descriptors = new CompositeIndexedDescriptors(new MapBackedIdIndexedDescriptors(descriptorMapView),
+ serializationService.getDescriptorRegistry());
}
/**
@@ -85,59 +104,76 @@ public class PerSessionSerializationService {
/**
* Serializes a marshallable object to a byte array.
*
+ * @param marshallable Marshallable object to serialize.
+ * @param <T> Object's type.
+ * @throws UserObjectSerializationException If failed to serialize an object.
* @see SerializationService#writeMarshallable(Object)
*/
- public <T> SerializationResult writeMarshallable(T marshallable) {
+ public <T> MarshalledObject writeMarshallable(T marshallable)
+ throws UserObjectSerializationException {
return serializationService.writeMarshallable(marshallable);
}
/**
* Deserializes a marshallable object from a byte array.
*
+ * @param missingDescriptors Descriptors that were received from the remote node.
+ * @param array Byte array that contains a serialized object.
+ * @param <T> Object's type.
+ * @throws UserObjectSerializationException If failed to deserialize an object.
* @see SerializationService#readMarshallable(Map, byte[])
*/
- public <T> T readMarshallable(List<ClassDescriptorMessage> missingDescriptors, byte[] marshallableData) {
+ public <T> T readMarshallable(List<ClassDescriptorMessage> missingDescriptors, byte[] array)
+ throws UserObjectSerializationException {
mergeDescriptors(missingDescriptors);
- return serializationService.readMarshallable(descriptorMapView, marshallableData);
+ return serializationService.readMarshallable(descriptors, array);
}
/**
* Creates a list of messages holding class descriptors.
*
- * @param descriptorIds Ids of class descriptors.
+ * @param descriptors Class descriptors.
* @return List of class descriptor network messages.
*/
- public List<ClassDescriptorMessage> createClassDescriptorsMessages(List<Integer> descriptorIds) {
- return descriptorIds.stream().map(id -> {
- ClassDescriptor descriptor = serializationService.getClassDescriptor(id);
-
- List<FieldDescriptorMessage> fields = descriptor.fields().stream()
- .map(d -> {
- return MSG_FACTORY.fieldDescriptorMessage()
- .name(d.name())
- .typeDescriptorId(d.typeDescriptorId())
- .className(d.clazz().getName())
- .build();
- })
- .collect(toList());
-
- Serialization serialization = descriptor.serialization();
-
- return MSG_FACTORY.classDescriptorMessage()
- .fields(fields)
- .isFinal(descriptor.isFinal())
- .serializationType(serialization.type().value())
- .hasSerializationOverride(serialization.hasSerializationOverride())
- .hasReadObjectNoData(serialization.hasReadObjectNoData())
- .hasWriteReplace(serialization.hasWriteReplace())
- .hasReadResolve(serialization.hasReadResolve())
- .descriptorId(descriptor.descriptorId())
- .className(descriptor.className())
- .superClassDescriptorId(superClassDescriptorIdForMessage(descriptor))
- .superClassName(descriptor.superClassName())
- .build();
- }).collect(toList());
+ @Nullable
+ public List<ClassDescriptorMessage> createClassDescriptorsMessages(Set<ClassDescriptor> descriptors) {
+ List<ClassDescriptorMessage> messages = descriptors.stream()
+ .filter(descriptor -> {
+ int descriptorId = descriptor.descriptorId();
+ return !sentDescriptors.contains(descriptorId) && !serializationService.shouldBeBuiltIn(descriptorId);
+ })
+ .map(descriptor -> {
+ List<FieldDescriptorMessage> fields = descriptor.fields().stream()
+ .map(d -> {
+ return MSG_FACTORY.fieldDescriptorMessage()
+ .name(d.name())
+ .typeDescriptorId(d.typeDescriptorId())
+ .className(d.clazz().getName())
+ .build();
+ })
+ .collect(toList());
+
+ Serialization serialization = descriptor.serialization();
+
+ return MSG_FACTORY.classDescriptorMessage()
+ .fields(fields)
+ .isFinal(descriptor.isFinal())
+ .serializationType(serialization.type().value())
+ .hasSerializationOverride(serialization.hasSerializationOverride())
+ .hasReadObjectNoData(serialization.hasReadObjectNoData())
+ .hasWriteReplace(serialization.hasWriteReplace())
+ .hasReadResolve(serialization.hasReadResolve())
+ .descriptorId(descriptor.descriptorId())
+ .className(descriptor.className())
+ .superClassDescriptorId(superClassDescriptorIdForMessage(descriptor))
+ .superClassName(descriptor.superClassName())
+ .build();
+ }).collect(toList());
+
+ messages.forEach(classDescriptorMessage -> sentDescriptors.add(classDescriptorMessage.descriptorId()));
+
+ return messages;
}
private int superClassDescriptorIdForMessage(ClassDescriptor descriptor) {
@@ -177,7 +213,7 @@ public class PerSessionSerializationService {
*/
@NotNull
private ClassDescriptor messageToMergedClassDescriptor(ClassDescriptorMessage clsMsg) {
- ClassDescriptor localDescriptor = serializationService.getClassDescriptor(clsMsg.className());
+ ClassDescriptor localDescriptor = serializationService.getOrCreateDescriptor(clsMsg.className());
List<FieldDescriptor> remoteFields = clsMsg.fields().stream()
.map(fieldMsg -> fieldDescriptorFromMessage(fieldMsg, localDescriptor.clazz()))
@@ -224,9 +260,9 @@ public class PerSessionSerializationService {
private ClassDescriptor getClassDescriptor(int descriptorId, String typeName) {
if (serializationService.shouldBeBuiltIn(descriptorId)) {
- return serializationService.getClassDescriptor(descriptorId);
+ return serializationService.getDescriptor(descriptorId);
} else {
- return serializationService.getClassDescriptor(typeName);
+ return serializationService.getOrCreateDescriptor(typeName);
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/SerializationService.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/SerializationService.java
index 3f56725..045feee 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/SerializationService.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/SerializationService.java
@@ -17,7 +17,10 @@
package org.apache.ignite.internal.network.serialization;
-import java.util.Map;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalException;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalledObject;
+import org.apache.ignite.internal.network.serialization.marshal.UnmarshalException;
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageDeserializer;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
@@ -30,12 +33,29 @@ public class SerializationService {
/** Message serialization registry. */
private final MessageSerializationRegistry messageRegistry;
- /** User object serializer. */
- private final UserObjectSerializer userObjectSerializer;
+ /** Descriptor registry. */
+ private final ClassDescriptorFactoryContext descriptorRegistry;
- public SerializationService(MessageSerializationRegistry messageRegistry, UserObjectSerializer userObjectSerializer) {
+ /** Descriptor factory. */
+ private final ClassDescriptorFactory descriptorFactory;
+
+ /** User object marshaller. */
+ private final UserObjectMarshaller marshaller;
+
+ /**
+ * Constructor.
+ *
+ * @param messageRegistry Message registry.
+ * @param userObjectSerializationContext User object serialization context.
+ */
+ public SerializationService(
+ MessageSerializationRegistry messageRegistry,
+ UserObjectSerializationContext userObjectSerializationContext
+ ) {
this.messageRegistry = messageRegistry;
- this.userObjectSerializer = userObjectSerializer;
+ this.descriptorRegistry = userObjectSerializationContext.descriptorRegistry();
+ this.descriptorFactory = userObjectSerializationContext.descriptorFactory();
+ this.marshaller = userObjectSerializationContext.marshaller();
}
/**
@@ -59,19 +79,36 @@ public class SerializationService {
/**
* Serializes a marshallable object to a byte array.
*
- * @see UserObjectSerializer#write(Object)
+ * @param object Object to serialize.
+ * @param <T> Object's type.
+ * @throws UserObjectSerializationException If failed to serialize an object.
+ * @see UserObjectMarshaller#marshal(Object, Class)
*/
- public <T> SerializationResult writeMarshallable(T object) {
- return userObjectSerializer.write(object);
+ public <T> MarshalledObject writeMarshallable(T object) throws UserObjectSerializationException {
+ Class<?> clazz = object.getClass();
+ try {
+ return marshaller.marshal(object, clazz);
+ } catch (MarshalException e) {
+ throw new UserObjectSerializationException("Failed to serialize object of type " + clazz.getName(), e);
+ }
}
/**
* Deserializes a marshallable object from a byte array.
*
- * @see UserObjectSerializer#read(Map, byte[])
+ * @param descriptors Descriptors provider.
+ * @param array Byte array that contains a serialized object.
+ * @param <T> Object's type.
+ * @throws UserObjectSerializationException If failed to deserialize an object.
+ * @see UserObjectMarshaller#unmarshal(byte[], IdIndexedDescriptors)
*/
- public <T> T readMarshallable(Map<Integer, ClassDescriptor> descriptor, byte[] array) {
- return userObjectSerializer.read(descriptor, array);
+ public <T> T readMarshallable(IdIndexedDescriptors descriptors, byte[] array)
+ throws UserObjectSerializationException {
+ try {
+ return marshaller.unmarshal(array, descriptors);
+ } catch (UnmarshalException e) {
+ throw new UserObjectSerializationException("Failed to deserialize object: " + e.getMessage(), e);
+ }
}
/**
@@ -81,17 +118,7 @@ public class SerializationService {
* @return {@code true} if descriptor belongs to the range reserved for built-in types, {@code false} otherwise.
*/
public boolean shouldBeBuiltIn(int typeDescriptorId) {
- return userObjectSerializer.shouldBeBuiltIn(typeDescriptorId);
- }
-
- /**
- * Gets a class descriptor by the descriptor id.
- *
- * @param typeDescriptorId Type descriptor id.
- * @return Class descriptor.
- */
- public ClassDescriptor getClassDescriptor(int typeDescriptorId) {
- return userObjectSerializer.getClassDescriptor(typeDescriptorId);
+ return ClassDescriptorFactoryContext.shouldBeBuiltIn(typeDescriptorId);
}
/**
@@ -100,7 +127,27 @@ public class SerializationService {
* @param typeName Class' name.
* @return Class descriptor.
*/
- public ClassDescriptor getClassDescriptor(String typeName) {
- return userObjectSerializer.getClassDescriptor(typeName);
+ public ClassDescriptor getOrCreateDescriptor(String typeName) {
+ Class<?> clazz;
+ try {
+ clazz = Class.forName(typeName);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Class " + typeName + " is not found", e);
+ }
+
+ ClassDescriptor descriptor = descriptorRegistry.getDescriptor(clazz);
+ if (descriptor != null) {
+ return descriptor;
+ } else {
+ return descriptorFactory.create(clazz);
+ }
+ }
+
+ public ClassDescriptor getDescriptor(int descriptorId) {
+ return descriptorRegistry.getDescriptor(descriptorId);
+ }
+
+ public ClassDescriptorFactoryContext getDescriptorRegistry() {
+ return descriptorRegistry;
}
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/UserObjectSerializationContext.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/UserObjectSerializationContext.java
new file mode 100644
index 0000000..cf88a76
--- /dev/null
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/UserObjectSerializationContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.serialization;
+
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
+
+/** User object serialization objects wrapper. */
+public class UserObjectSerializationContext {
+ private final ClassDescriptorFactoryContext descriptorRegistry;
+ private final ClassDescriptorFactory descriptorFactory;
+ private final UserObjectMarshaller marshaller;
+
+ /**
+ * Constructor.
+ *
+ * @param descriptorRegistry Descriptor registry.
+ * @param descriptorFactory Descriptor factory.
+ * @param marshaller User object marshaller.
+ */
+ public UserObjectSerializationContext(ClassDescriptorFactoryContext descriptorRegistry,
+ ClassDescriptorFactory descriptorFactory, UserObjectMarshaller marshaller) {
+ this.descriptorRegistry = descriptorRegistry;
+ this.descriptorFactory = descriptorFactory;
+ this.marshaller = marshaller;
+ }
+
+ public ClassDescriptorFactoryContext descriptorRegistry() {
+ return descriptorRegistry;
+ }
+
+ public ClassDescriptorFactory descriptorFactory() {
+ return descriptorFactory;
+ }
+
+ public UserObjectMarshaller marshaller() {
+ return marshaller;
+ }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/UserObjectSerializationException.java
similarity index 56%
copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java
copy to modules/network/src/main/java/org/apache/ignite/internal/network/serialization/UserObjectSerializationException.java
index 3eada38..862498c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/UserObjectSerializationException.java
@@ -15,31 +15,22 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.serialization;
-import java.io.Serializable;
-import java.util.UUID;
-import org.apache.ignite.network.NetworkMessage;
-import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
/**
- * ErrorMessage interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * User object serialization exception that can be thrown when {@link UserObjectMarshaller} is unable to
+ * marshall or unmarshall an object.
*/
-@Transferable(value = SqlQueryMessageGroup.ERROR_MESSAGE, autoSerializable = false)
-public interface ErrorMessage extends NetworkMessage, Serializable {
+public class UserObjectSerializationException extends RuntimeException {
/**
- * Get query ID.
+ * Constructor.
+ *
+ * @param message Exception message.
+ * @param cause Cause throwable.
*/
- UUID queryId();
-
- /**
- * Get fragment ID.
- */
- long fragmentId();
-
- /**
- * Get error.
- */
- Throwable error();
+ public UserObjectSerializationException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/UserObjectSerializer.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/UserObjectSerializer.java
deleted file mode 100644
index ffddbba..0000000
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/UserObjectSerializer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.network.serialization;
-
-import java.util.Map;
-
-/** User object serializer. */
-public interface UserObjectSerializer {
- /**
- * Reads a marshallable object from a byte array using a map of descriptors.
- *
- * @param descriptors Descriptor map.
- * @param array Byte array.
- * @param <T> Object type.
- * @return Unmarshalled object.
- */
- <T> T read(Map<Integer, ClassDescriptor> descriptors, byte[] array);
-
- /**
- * Marshalls object.
- *
- * @param object Object.
- * @param <T> Object's type.
- * @return {@link SerializationResult}.
- */
- <T> SerializationResult write(T object);
-
- ClassDescriptor getClassDescriptor(int typeDescriptorId);
-
- ClassDescriptor getClassDescriptor(String typeName);
-
- boolean shouldBeBuiltIn(int typeDescriptorId);
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
index 3aa9a83..90a7dc0 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
@@ -34,7 +34,11 @@ import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
import org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactoryContext;
import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
import org.apache.ignite.network.AbstractClusterService;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
@@ -74,7 +78,13 @@ public class ScaleCubeClusterServiceFactory {
public void start() {
String consistentId = context.getName();
- var serializationService = new SerializationService(context.getSerializationRegistry(), null);
+ var userObjectDescriptorRegistry = new ClassDescriptorFactoryContext();
+ var userObjectDescriptorFactory = new ClassDescriptorFactory(userObjectDescriptorRegistry);
+ var userObjectMarshaller = new DefaultUserObjectMarshaller(userObjectDescriptorRegistry, userObjectDescriptorFactory);
+ var userObjectSerialization = new UserObjectSerializationContext(userObjectDescriptorRegistry, userObjectDescriptorFactory,
+ userObjectMarshaller);
+
+ var serializationService = new SerializationService(context.getSerializationRegistry(), userObjectSerialization);
UUID launchId = UUID.randomUUID();
diff --git a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
index 0268b84..16bb286 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
@@ -20,11 +20,6 @@ package org.apache.ignite.network.scalecube;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -32,6 +27,7 @@ import java.util.Map;
import java.util.Objects;
import org.apache.ignite.internal.network.NetworkMessagesFactory;
import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.message.ScaleCubeMessageBuilder;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
@@ -206,16 +202,16 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
*/
private NetworkMessage fromMessage(Message message) throws IgniteInternalException {
Object dataObj = message.data();
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- try (ObjectOutputStream oos = new ObjectOutputStream(stream)) {
- oos.writeObject(dataObj);
- } catch (IOException e) {
- throw new IgniteInternalException(e);
+ ScaleCubeMessageBuilder scaleCubeMessageBuilder = messageFactory.scaleCubeMessage();
+
+ if (dataObj instanceof NetworkMessage) {
+ scaleCubeMessageBuilder.message((NetworkMessage) dataObj);
+ } else {
+ scaleCubeMessageBuilder.data(dataObj);
}
- return messageFactory.scaleCubeMessage()
- .array(stream.toByteArray())
+ return scaleCubeMessageBuilder
.headers(message.headers())
.build();
}
@@ -234,15 +230,13 @@ class ScaleCubeDirectMarshallerTransport implements Transport {
Map<String, String> headers = msg.headers();
- Object obj;
+ Object obj = msg.data();
+
+ NetworkMessage message = msg.message();
- try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(msg.array()))) {
- obj = ois.readObject();
- } catch (Exception e) {
- throw new IgniteInternalException(e);
- }
+ Object data = obj != null ? obj : message;
- return Message.withHeaders(headers).data(obj).build();
+ return Message.withHeaders(headers).data(data).build();
}
return null;
}
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
index 694274f..3f7dd02 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.network.netty;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
@@ -38,6 +39,7 @@ import org.apache.ignite.internal.network.NestedMessageMessage;
import org.apache.ignite.internal.network.direct.DirectMessageWriter;
import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TestMessage;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
@@ -93,7 +95,7 @@ public class InboundDecoderTest {
* Serializes and then deserializes the given message.
*/
private <T extends NetworkMessage> T sendAndReceive(T msg) {
- var serializationService = new SerializationService(registry, null);
+ var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
var perSessionSerializationService = new PerSessionSerializationService(serializationService);
var channel = new EmbeddedChannel(new InboundDecoder(perSessionSerializationService));
@@ -131,7 +133,7 @@ public class InboundDecoderTest {
*/
@Test
public void testPartialHeader() throws Exception {
- var serializationService = new SerializationService(registry, null);
+ var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
var perSessionSerializationService = new PerSessionSerializationService(serializationService);
var channel = new EmbeddedChannel(new InboundDecoder(perSessionSerializationService));
@@ -164,7 +166,7 @@ public class InboundDecoderTest {
Mockito.doReturn(channel).when(ctx).channel();
- var serializationService = new SerializationService(registry, null);
+ var serializationService = new SerializationService(registry, mock(UserObjectSerializationContext.class));
var perSessionSerializationService = new PerSessionSerializationService(serializationService);
final var decoder = new InboundDecoder(perSessionSerializationService);
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
index c80ec5e..fbd5088 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/NettyServerTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import org.apache.ignite.internal.network.handshake.HandshakeAction;
import org.apache.ignite.internal.network.handshake.HandshakeManager;
import org.apache.ignite.internal.network.serialization.SerializationService;
+import org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.NetworkMessage;
@@ -183,7 +184,7 @@ public class NettyServerTest {
},
(socketAddress, message) -> {
},
- new SerializationService(registry, null),
+ new SerializationService(registry, mock(UserObjectSerializationContext.class)),
bootstrapFactory
);
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
index b056187..50f893c 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
@@ -41,11 +41,16 @@ import java.util.Map;
import org.apache.ignite.internal.network.direct.DirectMessageWriter;
import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.netty.InboundDecoder;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalException;
+import org.apache.ignite.internal.network.serialization.marshal.MarshalledObject;
+import org.apache.ignite.internal.network.serialization.marshal.UnmarshalException;
+import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
import org.apache.ignite.network.TestMessagesFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.network.serialization.MessageSerializer;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
/**
@@ -151,29 +156,28 @@ public class MarshallableTest {
/** Helper class that holds classes needed for serialization. */
private class Serialization {
- private final ClassDescriptorFactoryContext descriptorContext;
- private final ClassDescriptorFactory factory;
- private final ClassDescriptor descriptor;
- private final StubSerializer userObjectSerializer;
- private final SerializationService serializationService;
private final PerSessionSerializationService perSessionSerializationService;
+ private final ClassDescriptor descriptor;
+
Serialization() {
- this.descriptorContext = new ClassDescriptorFactoryContext();
- this.factory = new ClassDescriptorFactory(descriptorContext);
+ var descriptorContext = new ClassDescriptorFactoryContext();
+ var factory = new ClassDescriptorFactory(descriptorContext);
// Create descriptor for SimpleSerializableObject
this.descriptor = factory.create(SimpleSerializableObject.class);
- this.userObjectSerializer = new StubSerializer(descriptorContext, descriptor);
+ var userObjectSerializer = new StubSerializer(descriptorContext, descriptor);
+
+ var ser = new UserObjectSerializationContext(descriptorContext, factory, userObjectSerializer);
- this.serializationService = new SerializationService(registry, userObjectSerializer);
+ var serializationService = new SerializationService(registry, ser);
this.perSessionSerializationService = new PerSessionSerializationService(serializationService);
}
}
/** Stub implementation of the serializer, uses standard JDK serializable serialization to actually marshall an object. */
- private static class StubSerializer implements UserObjectSerializer {
+ private static class StubSerializer implements UserObjectMarshaller {
private final ClassDescriptorFactoryContext descriptorContext;
@@ -184,46 +188,24 @@ public class MarshallableTest {
this.descriptor = descriptor;
}
- /** {@inheritDoc} */
- @Override
- public <T> T read(Map<Integer, ClassDescriptor> descriptor, byte[] array) {
- try (ByteArrayInputStream bais = new ByteArrayInputStream(array); ObjectInputStream ois = new ObjectInputStream(bais)) {
- return (T) ois.readObject();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- /** {@inheritDoc} */
@Override
- public <T> SerializationResult write(T object) {
+ public MarshalledObject marshal(@Nullable Object object, Class<?> declaredClass) throws MarshalException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(object);
oos.close();
- return new SerializationResult(baos.toByteArray(), Collections.singletonList(descriptor.descriptorId()));
+ return new MarshalledObject(baos.toByteArray(), Collections.singleton(descriptor));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- /** {@inheritDoc} */
@Override
- public ClassDescriptor getClassDescriptor(int typeDescriptorId) {
- return descriptorContext.getDescriptor(typeDescriptorId);
- }
-
- /** {@inheritDoc} */
- @Override
- public ClassDescriptor getClassDescriptor(String typeName) {
- assertEquals(descriptor.className(), typeName);
-
- return descriptor;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean shouldBeBuiltIn(int typeDescriptorId) {
- return ClassDescriptorFactoryContext.shouldBeBuiltIn(typeDescriptorId);
+ public <T> @Nullable T unmarshal(byte[] bytes, IdIndexedDescriptors mergedDescriptors) throws UnmarshalException {
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream(bais)) {
+ return (T) ois.readObject();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
index 2c80c9c..9a21632 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raft/ItLozaTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NetworkMessage;
@@ -44,6 +45,7 @@ import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -57,6 +59,12 @@ public class ItLozaTest {
/** Network factory. */
private static final TestScaleCubeClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
+ private static final MessageSerializationRegistryImpl SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
+
+ static {
+ RaftMessagesSerializationRegistryInitializer.registerFactories(SERIALIZATION_REGISTRY);
+ }
+
/** Server port offset. */
private static final int PORT = 20010;
@@ -87,7 +95,8 @@ public class ItLozaTest {
testInfo,
port,
new StaticNodeFinder(srvs),
- NETWORK_FACTORY
+ NETWORK_FACTORY,
+ SERIALIZATION_REGISTRY
);
network.start();
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
index 5bceb02..9b1993d 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/client/service/ItAbstractListenerSnapshotTest.java
@@ -38,11 +38,13 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
@@ -76,6 +78,12 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener
/** Factory. */
private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
+ protected static final MessageSerializationRegistryImpl SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
+
+ static {
+ RaftMessagesSerializationRegistryInitializer.registerFactories(SERIALIZATION_REGISTRY);
+ }
+
/** Network factory. */
private static final TestScaleCubeClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
@@ -349,7 +357,8 @@ public abstract class ItAbstractListenerSnapshotTest<T extends RaftGroupListener
testInfo,
port,
new StaticNodeFinder(List.of(otherPeer)),
- NETWORK_FACTORY
+ NETWORK_FACTORY,
+ SERIALIZATION_REGISTRY
);
network.start();
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
index bc9c640..7353410 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItCliServiceTest.java
@@ -45,12 +45,14 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.CliService;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.entity.PeerId;
@@ -127,11 +129,15 @@ public class ItCliServiceTest {
.map(JRaftUtils::addressFromEndpoint)
.collect(toList());
+ var serializationRegistry = new MessageSerializationRegistryImpl();
+ RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+
ClusterService clientSvc = ClusterServiceTestUtils.clusterService(
testInfo,
TestUtils.INIT_PORT - 1,
new StaticNodeFinder(addressList),
- new TestScaleCubeClusterServiceFactory()
+ new TestScaleCubeClusterServiceFactory(),
+ serializationRegistry
);
clientSvc.start();
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
index 4cb08e7..b577252 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.StaticNodeFinder;
@@ -66,6 +67,7 @@ import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.raft.jraft.StateMachine;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.closure.JoinableClosure;
@@ -3538,11 +3540,15 @@ public class ItNodeTest {
var nodeManager = new NodeManager();
+ var serializationRegistry = new MessageSerializationRegistryImpl();
+ RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+
ClusterService clusterService = ClusterServiceTestUtils.clusterService(
testInfo,
peerId.getEndpoint().getPort(),
new StaticNodeFinder(addressList),
- new TestScaleCubeClusterServiceFactory()
+ new TestScaleCubeClusterServiceFactory(),
+ serializationRegistry
);
ExecutorService requestExecutor = JRaftUtils.createRequestExecutor(nodeOptions);
@@ -3574,11 +3580,15 @@ public class ItNodeTest {
* Creates a non-started {@link ClusterService}.
*/
private ClusterService createClusterService(Endpoint endpoint, NodeFinder nodeFinder) {
+ var serializationRegistry = new MessageSerializationRegistryImpl();
+ RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+
return ClusterServiceTestUtils.clusterService(
testInfo,
endpoint.getPort(),
nodeFinder,
- new TestScaleCubeClusterServiceFactory()
+ new TestScaleCubeClusterServiceFactory(),
+ serializationRegistry
);
}
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
index 6df91cd..6226735 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/RaftServerAbstractTest.java
@@ -21,10 +21,12 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -38,6 +40,12 @@ abstract class RaftServerAbstractTest {
protected static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
+ private static final MessageSerializationRegistryImpl SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
+
+ static {
+ RaftMessagesSerializationRegistryInitializer.registerFactories(SERIALIZATION_REGISTRY);
+ }
+
/** Network factory. */
protected static final TestScaleCubeClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
@@ -73,7 +81,8 @@ abstract class RaftServerAbstractTest {
testInfo,
port,
new StaticNodeFinder(servers),
- NETWORK_FACTORY
+ NETWORK_FACTORY,
+ SERIALIZATION_REGISTRY
);
if (start) {
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
index 48ba115..aaa99d9 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/CliServiceImpl.java
@@ -664,7 +664,7 @@ public class CliServiceImpl implements CliService {
if (result instanceof GetPeersResponse) {
final GetPeersResponse resp = (GetPeersResponse) result;
final List<PeerId> peerIdList = new ArrayList<>();
- final List<String> responsePeers = returnLearners ? resp.learnersList() : resp.peersList();
+ final Collection<String> responsePeers = returnLearners ? resp.learnersList() : resp.peersList();
for (final String peerIdStr : responsePeers) {
final PeerId newPeer = new PeerId();
newPeer.parse(peerIdStr);
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java
index 95acc04..391b0f0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalFileMetaOutter.java
@@ -19,8 +19,9 @@
package org.apache.ignite.raft.jraft.entity;
-import org.apache.ignite.raft.jraft.RaftMessageGroup;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
import org.apache.ignite.raft.jraft.rpc.Message;
public final class LocalFileMetaOutter {
@@ -69,8 +70,9 @@ public final class LocalFileMetaOutter {
}
}
- @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.LOCAL_FILE_META, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.LOCAL_FILE_META)
public interface LocalFileMeta extends Message {
+ @Marshallable
FileSource source();
String checksum();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalStorageOutter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalStorageOutter.java
index 3be9298..287cdf8 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalStorageOutter.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/LocalStorageOutter.java
@@ -20,28 +20,32 @@
package org.apache.ignite.raft.jraft.entity;
import java.util.List;
-import org.apache.ignite.raft.jraft.RaftMessageGroup;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
import org.apache.ignite.raft.jraft.rpc.Message;
public final class LocalStorageOutter {
- @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.STABLE_PB_META, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.STABLE_PB_META)
public interface StablePBMeta extends Message {
long term();
String votedFor();
}
- @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.LOCAL_SNAPSHOT_PB_META, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.LOCAL_SNAPSHOT_PB_META)
public interface LocalSnapshotPbMeta extends Message {
+ @Marshallable
RaftOutter.SnapshotMeta meta();
+ @Marshallable
List<LocalStorageOutter.LocalSnapshotPbMeta.File> filesList();
- @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.LOCAL_SNAPSHOT_META_FILE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.LOCAL_SNAPSHOT_META_FILE)
interface File extends Message {
String name();
+ @Marshallable
LocalFileMetaOutter.LocalFileMeta meta();
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java
index 3b1abb7..a09df27 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/entity/RaftOutter.java
@@ -19,29 +19,31 @@
package org.apache.ignite.raft.jraft.entity;
-import java.util.List;
-import org.apache.ignite.raft.jraft.RaftMessageGroup;
+import java.util.Collection;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
import org.apache.ignite.raft.jraft.rpc.Message;
public final class RaftOutter {
- @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.ENTRY_META, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.ENTRY_META)
public interface EntryMeta extends Message {
long term();
+ @Marshallable
EnumOutter.EntryType type();
- List<String> peersList();
+ Collection<String> peersList();
long dataLen();
- List<String> oldPeersList();
+ Collection<String> oldPeersList();
long checksum();
- List<String> learnersList();
+ Collection<String> learnersList();
- List<String> oldLearnersList();
+ Collection<String> oldLearnersList();
/**
* @return True when the entry has a checksum, false otherwise.
@@ -49,18 +51,18 @@ public final class RaftOutter {
boolean hasChecksum();
}
- @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.SNAPSHOT_META, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RaftOutterMessageGroup.SNAPSHOT_META)
public interface SnapshotMeta extends Message {
long lastIncludedIndex();
long lastIncludedTerm();
- List<String> peersList();
+ Collection<String> peersList();
- List<String> oldPeersList();
+ Collection<String> oldPeersList();
- List<String> learnersList();
+ Collection<String> learnersList();
- List<String> oldLearnersList();
+ Collection<String> oldLearnersList();
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionRequest.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionRequest.java
index cb7448f..9ab4b9f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionRequest.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionRequest.java
@@ -17,15 +17,15 @@
package org.apache.ignite.raft.jraft.rpc;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.jraft.RaftMessageGroup;
-import org.apache.ignite.raft.jraft.rpc.Message;
/**
* Submit an action to a replication group.
*/
-@Transferable(value = RaftMessageGroup.RpcActionMessageGroup.ACTION_REQUEST, autoSerializable = false)
+@Transferable(value = RaftMessageGroup.RpcActionMessageGroup.ACTION_REQUEST)
public interface ActionRequest extends Message {
/**
* @return Group id.
@@ -35,6 +35,7 @@ public interface ActionRequest extends Message {
/**
* @return Action's command.
*/
+ @Marshallable
Command command();
/**
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionResponse.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionResponse.java
index e2c1154..a066d1a 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionResponse.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/ActionResponse.java
@@ -17,17 +17,18 @@
package org.apache.ignite.raft.jraft.rpc;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
import org.apache.ignite.raft.jraft.RaftMessageGroup;
-import org.apache.ignite.raft.jraft.rpc.Message;
/**
* The result of an action.
*/
-@Transferable(value = RaftMessageGroup.RpcActionMessageGroup.ACTION_RESPONSE, autoSerializable = false)
+@Transferable(value = RaftMessageGroup.RpcActionMessageGroup.ACTION_RESPONSE)
public interface ActionResponse extends Message {
/**
* @return A result for this request, can be of any type.
*/
+ @Marshallable
Object result();
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliRequests.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliRequests.java
index daff213..f005ed2 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliRequests.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/CliRequests.java
@@ -19,12 +19,12 @@
package org.apache.ignite.raft.jraft.rpc;
-import java.util.List;
-import org.apache.ignite.raft.jraft.RaftMessageGroup;
+import java.util.Collection;
import org.apache.ignite.network.annotations.Transferable;
+import org.apache.ignite.raft.jraft.RaftMessageGroup;
public final class CliRequests {
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.ADD_PEER_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.ADD_PEER_REQUEST)
public interface AddPeerRequest extends Message {
String groupId();
@@ -33,14 +33,14 @@ public final class CliRequests {
String peerId();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.ADD_PEER_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.ADD_PEER_RESPONSE)
public interface AddPeerResponse extends Message {
- List<String> oldPeersList();
+ Collection<String> oldPeersList();
- List<String> newPeersList();
+ Collection<String> newPeersList();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.REMOVE_PEER_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.REMOVE_PEER_REQUEST)
public interface RemovePeerRequest extends Message {
String groupId();
@@ -49,48 +49,48 @@ public final class CliRequests {
String peerId();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.REMOVE_PEER_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.REMOVE_PEER_RESPONSE)
public interface RemovePeerResponse extends Message {
- List<String> oldPeersList();
+ Collection<String> oldPeersList();
- List<String> newPeersList();
+ Collection<String> newPeersList();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.CHANGE_PEERS_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.CHANGE_PEERS_REQUEST)
public interface ChangePeersRequest extends Message {
String groupId();
String leaderId();
- List<String> newPeersList();
+ Collection<String> newPeersList();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.CHANGE_PEERS_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.CHANGE_PEERS_RESPONSE)
public interface ChangePeersResponse extends Message {
- List<String> oldPeersList();
+ Collection<String> oldPeersList();
- List<String> newPeersList();
+ Collection<String> newPeersList();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.SNAPSHOT_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.SNAPSHOT_REQUEST)
public interface SnapshotRequest extends Message {
String groupId();
String peerId();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.RESET_PEER_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.RESET_PEER_REQUEST)
public interface ResetPeerRequest extends Message {
String groupId();
String peerId();
- List<String> oldPeersList();
+ Collection<String> oldPeersList();
- List<String> newPeersList();
+ Collection<String> newPeersList();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.TRANSFER_LEADER_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.TRANSFER_LEADER_REQUEST)
public interface TransferLeaderRequest extends Message {
String groupId();
@@ -99,21 +99,21 @@ public final class CliRequests {
String peerId();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.GET_LEADER_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.GET_LEADER_REQUEST)
public interface GetLeaderRequest extends Message {
String groupId();
String peerId();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.GET_LEADER_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.GET_LEADER_RESPONSE)
public interface GetLeaderResponse extends Message {
String leaderId();
long currentTerm();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.GET_PEERS_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.GET_PEERS_REQUEST)
public interface GetPeersRequest extends Message {
String groupId();
@@ -122,44 +122,44 @@ public final class CliRequests {
boolean onlyAlive();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.GET_PEERS_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.GET_PEERS_RESPONSE)
public interface GetPeersResponse extends Message {
- List<String> peersList();
+ Collection<String> peersList();
- List<String> learnersList();
+ Collection<String> learnersList();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.ADD_LEARNERS_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.ADD_LEARNERS_REQUEST)
public interface AddLearnersRequest extends Message {
String groupId();
String leaderId();
- List<String> learnersList();
+ Collection<String> learnersList();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.REMOVE_LEARNERS_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.REMOVE_LEARNERS_REQUEST)
public interface RemoveLearnersRequest extends Message {
String groupId();
String leaderId();
- List<String> learnersList();
+ Collection<String> learnersList();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.RESET_LEARNERS_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.RESET_LEARNERS_REQUEST)
public interface ResetLearnersRequest extends Message {
String groupId();
String leaderId();
- List<String> learnersList();
+ Collection<String> learnersList();
}
- @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.LEARNERS_OP_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcClientMessageGroup.LEARNERS_OP_RESPONSE)
public interface LearnersOpResponse extends Message {
- List<String> oldLearnersList();
+ Collection<String> oldLearnersList();
- List<String> newLearnersList();
+ Collection<String> newLearnersList();
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
index 1324c52..e003f6c 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
@@ -18,6 +18,7 @@
package org.apache.ignite.raft.jraft.rpc;
import java.util.List;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
import org.apache.ignite.raft.jraft.RaftMessageGroup;
import org.apache.ignite.raft.jraft.entity.RaftOutter;
@@ -30,7 +31,7 @@ public final class RpcRequests {
private RpcRequests() {
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.PING_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.PING_REQUEST)
public interface PingRequest extends Message {
/**
* <code>required int64 send_timestamp = 1;</code>
@@ -38,7 +39,7 @@ public final class RpcRequests {
long sendTimestamp();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.ERROR_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.ERROR_RESPONSE)
public interface ErrorResponse extends Message {
/**
* Error code.
@@ -65,15 +66,16 @@ public final class RpcRequests {
@Nullable String leaderId();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.SM_ERROR_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.SM_ERROR_RESPONSE)
public interface SMErrorResponse extends Message {
/**
* @return Throwable from client's state machine logic.
*/
+ @Marshallable
SMThrowable error();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.INSTALL_SNAPSHOT_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.INSTALL_SNAPSHOT_REQUEST)
public interface InstallSnapshotRequest extends Message {
String groupId();
@@ -83,19 +85,20 @@ public final class RpcRequests {
long term();
+ @Marshallable
RaftOutter.SnapshotMeta meta();
String uri();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.INSTALL_SNAPSHOT_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.INSTALL_SNAPSHOT_RESPONSE)
public interface InstallSnapshotResponse extends Message {
long term();
boolean success();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.TIMEOUT_NOW_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.TIMEOUT_NOW_REQUEST)
public interface TimeoutNowRequest extends Message {
String groupId();
@@ -106,7 +109,7 @@ public final class RpcRequests {
long term();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.TIMEOUT_NOW_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.TIMEOUT_NOW_RESPONSE)
public interface TimeoutNowResponse extends Message {
/**
* <code>required int64 term = 1;</code>
@@ -119,7 +122,7 @@ public final class RpcRequests {
boolean success();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.REQUEST_VOTE_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.REQUEST_VOTE_REQUEST)
public interface RequestVoteRequest extends Message {
String groupId();
@@ -136,7 +139,7 @@ public final class RpcRequests {
boolean preVote();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.REQUEST_VOTE_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.REQUEST_VOTE_RESPONSE)
public interface RequestVoteResponse extends Message {
/**
* <code>required int64 term = 1;</code>
@@ -149,7 +152,7 @@ public final class RpcRequests {
boolean granted();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.APPEND_ENTRIES_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.APPEND_ENTRIES_REQUEST)
public interface AppendEntriesRequest extends Message {
String groupId();
@@ -163,14 +166,16 @@ public final class RpcRequests {
long prevLogIndex();
+ @Marshallable
List<RaftOutter.EntryMeta> entriesList();
long committedIndex();
+ @Marshallable
ByteString data();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.APPEND_ENTRIES_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.APPEND_ENTRIES_RESPONSE)
public interface AppendEntriesResponse extends Message {
long term();
@@ -179,7 +184,7 @@ public final class RpcRequests {
long lastLogIndex();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.GET_FILE_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.GET_FILE_REQUEST)
public interface GetFileRequest extends Message {
long readerId();
@@ -192,27 +197,29 @@ public final class RpcRequests {
boolean readPartly();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.GET_FILE_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.GET_FILE_RESPONSE)
public interface GetFileResponse extends Message {
boolean eof();
long readSize();
+ @Marshallable
ByteString data();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.READ_INDEX_REQUEST, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.READ_INDEX_REQUEST)
public interface ReadIndexRequest extends Message {
String groupId();
String serverId();
+ @Marshallable
List<ByteString> entriesList();
String peerId();
}
- @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.READ_INDEX_RESPONSE, autoSerializable = false)
+ @Transferable(value = RaftMessageGroup.RpcRequestsMessageGroup.READ_INDEX_RESPONSE)
public interface ReadIndexResponse extends Message {
long index();
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index 0711b2d..e4721c6 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -17,31 +17,11 @@
package org.apache.ignite.raft.jraft.rpc.impl;
-import static java.lang.System.currentTimeMillis;
-import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.ThreadLocalRandom.current;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
-import static org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
-
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -66,6 +46,26 @@ import org.apache.ignite.raft.jraft.rpc.ActionResponse;
import org.apache.ignite.raft.jraft.rpc.RpcRequests;
import org.jetbrains.annotations.NotNull;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.ThreadLocalRandom.current;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.AddPeerResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ChangePeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetLeaderResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.GetPeersResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.LearnersOpResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemoveLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.RemovePeerResponse;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.ResetLearnersRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.SnapshotRequest;
+import static org.apache.ignite.raft.jraft.rpc.CliRequests.TransferLeaderRequest;
+
/**
* The implementation of {@link RaftGroupService}
*/
@@ -655,7 +655,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
* @param peers List of {@link PeerId} string representations.
* @return List of {@link PeerId}
*/
- private List<Peer> parsePeerList(List<String> peers) {
+ private List<Peer> parsePeerList(Collection<String> peers) {
if (peers == null)
return null;
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
index 7e07c49..fc12687 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/TestCluster.java
@@ -41,6 +41,7 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
@@ -49,6 +50,7 @@ import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.Node;
import org.apache.ignite.raft.jraft.NodeManager;
import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.raft.jraft.conf.Configuration;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.option.NodeOptions;
@@ -247,11 +249,15 @@ public class TestCluster {
NodeManager nodeManager = new NodeManager();
+ var serializationRegistry = new MessageSerializationRegistryImpl();
+ RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+
ClusterService clusterService = ClusterServiceTestUtils.clusterService(
testInfo,
listenAddr.getPort(),
new StaticNodeFinder(addressList),
- new TestScaleCubeClusterServiceFactory()
+ new TestScaleCubeClusterServiceFactory(),
+ serializationRegistry
);
var rpcClient = new IgniteRpcClient(clusterService);
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
index 5ebc4ac..42dc2b7 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/AbstractRpcTest.java
@@ -248,28 +248,28 @@ public abstract class AbstractRpcTest {
}
/** */
- @Transferable(value = 0, autoSerializable = false)
+ @Transferable(value = 0)
public static interface Request1 extends Message {
/** */
int val();
}
/** */
- @Transferable(value = 1, autoSerializable = false)
+ @Transferable(value = 1)
public static interface Request2 extends Message {
/** */
int val();
}
/** */
- @Transferable(value = 2, autoSerializable = false)
+ @Transferable(value = 2)
public static interface Response1 extends Message {
/** */
int val();
}
/** */
- @Transferable(value = 3, autoSerializable = false)
+ @Transferable(value = 3)
public static interface Response2 extends Message {
/** */
int val();
diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
index 1666ef7..727318e 100644
--- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
+++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/rpc/IgniteRpcTest.java
@@ -24,10 +24,12 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.jraft.JRaftUtils;
import org.apache.ignite.raft.jraft.NodeManager;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient;
import org.apache.ignite.raft.jraft.util.Endpoint;
@@ -64,11 +66,16 @@ public class IgniteRpcTest extends AbstractRpcTest {
/** {@inheritDoc} */
@Override public RpcServer<?> createServer(Endpoint endpoint) {
+ var serializationRegistry = new MessageSerializationRegistryImpl();
+ RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+ TestRaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+
ClusterService service = ClusterServiceTestUtils.clusterService(
testInfo,
endpoint.getPort(),
new StaticNodeFinder(Collections.emptyList()),
- new TestScaleCubeClusterServiceFactory()
+ new TestScaleCubeClusterServiceFactory(),
+ serializationRegistry
);
NodeOptions nodeOptions = new NodeOptions();
@@ -92,11 +99,16 @@ public class IgniteRpcTest extends AbstractRpcTest {
@Override public RpcClient createClient0() {
int i = cntr.incrementAndGet();
+ var serializationRegistry = new MessageSerializationRegistryImpl();
+ RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+ TestRaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+
ClusterService service = ClusterServiceTestUtils.clusterService(
testInfo,
endpoint.getPort() - i,
new StaticNodeFinder(List.of(addressFromEndpoint(endpoint))),
- new TestScaleCubeClusterServiceFactory()
+ new TestScaleCubeClusterServiceFactory(),
+ serializationRegistry
);
IgniteRpcClient client = new IgniteRpcClient(service) {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
index de684e6..d6e1786 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java
@@ -49,13 +49,16 @@ import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStora
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.tx.message.TxMessagesSerializationRegistryInitializer;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
@@ -111,11 +114,16 @@ public class ItDistributedConfigurationPropertiesTest {
vaultManager = new VaultManager(new InMemoryVaultService());
+ var serializationRegistry = new MessageSerializationRegistryImpl();
+ RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+ TxMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+
clusterService = ClusterServiceTestUtils.clusterService(
testInfo,
addr.port(),
new StaticNodeFinder(memberAddrs),
- new TestScaleCubeClusterServiceFactory()
+ new TestScaleCubeClusterServiceFactory(),
+ serializationRegistry
);
raftManager = new Loza(clusterService, workDir);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 58b4ea4..c2f0d4b 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -44,12 +44,15 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
+import org.apache.ignite.internal.tx.message.TxMessagesSerializationRegistryInitializer;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -92,11 +95,16 @@ public class ItDistributedConfigurationStorageTest {
vaultManager = new VaultManager(new PersistentVaultService(workDir.resolve("vault")));
+ var serializationRegistry = new MessageSerializationRegistryImpl();
+ RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+ TxMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+
clusterService = ClusterServiceTestUtils.clusterService(
testInfo,
addr.port(),
new StaticNodeFinder(List.of(addr)),
- new TestScaleCubeClusterServiceFactory()
+ new TestScaleCubeClusterServiceFactory(),
+ serializationRegistry
);
lockManager = new HeapLockManager();
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index b146534..a98bc0c 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -46,11 +46,13 @@ import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValue
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesSerializationRegistryInitializer;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableTxManagerImpl;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
+import org.apache.ignite.internal.tx.message.TxMessagesSerializationRegistryInitializer;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.VaultService;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
@@ -63,6 +65,7 @@ import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NettyBootstrapFactory;
import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.rest.RestModule;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
@@ -166,7 +169,12 @@ public class IgniteImpl implements Ignite {
NetworkConfiguration networkConfiguration = nodeCfgMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY);
- var clusterLocalConfiguration = new ClusterLocalConfiguration(name, new MessageSerializationRegistryImpl());
+ MessageSerializationRegistryImpl serializationRegistry = new MessageSerializationRegistryImpl();
+ RaftMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+ SqlQueryMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+ TxMessagesSerializationRegistryInitializer.registerFactories(serializationRegistry);
+
+ var clusterLocalConfiguration = new ClusterLocalConfiguration(name, serializationRegistry);
nettyBootstrapFactory = new NettyBootstrapFactory(networkConfiguration, clusterLocalConfiguration.getName());
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java
index 3eada38..2694b59 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/ErrorMessage.java
@@ -20,13 +20,14 @@ package org.apache.ignite.internal.sql.engine.message;
import java.io.Serializable;
import java.util.UUID;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
* ErrorMessage interface.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
-@Transferable(value = SqlQueryMessageGroup.ERROR_MESSAGE, autoSerializable = false)
+@Transferable(value = SqlQueryMessageGroup.ERROR_MESSAGE)
public interface ErrorMessage extends NetworkMessage, Serializable {
/**
* Get query ID.
@@ -41,5 +42,6 @@ public interface ErrorMessage extends NetworkMessage, Serializable {
/**
* Get error.
*/
+ @Marshallable
Throwable error();
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
index 3d120e9..c30118a 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchMessage.java
@@ -18,13 +18,14 @@
package org.apache.ignite.internal.sql.engine.message;
import java.util.List;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
* QueryBatchMessage interface.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
-@Transferable(value = SqlQueryMessageGroup.QUERY_BATCH_MESSAGE, autoSerializable = false)
+@Transferable(value = SqlQueryMessageGroup.QUERY_BATCH_MESSAGE)
public interface QueryBatchMessage extends ExecutionContextAwareMessage {
/**
* Get exchange ID.
@@ -44,5 +45,6 @@ public interface QueryBatchMessage extends ExecutionContextAwareMessage {
/**
* Get rows.
*/
+ @Marshallable
List<Object> rows();
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
index 3de7004..5c525f2 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartRequest.java
@@ -18,12 +18,13 @@
package org.apache.ignite.internal.sql.engine.message;
import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
* QueryStartRequest interface.
*/
-@Transferable(value = SqlQueryMessageGroup.QUERY_START_REQUEST, autoSerializable = false)
+@Transferable(value = SqlQueryMessageGroup.QUERY_START_REQUEST)
public interface QueryStartRequest extends ExecutionContextAwareMessage {
/**
* Get schema name.
@@ -33,6 +34,7 @@ public interface QueryStartRequest extends ExecutionContextAwareMessage {
/**
* Get fragment description.
*/
+ @Marshallable
FragmentDescription fragmentDescription();
/**
@@ -48,5 +50,6 @@ public interface QueryStartRequest extends ExecutionContextAwareMessage {
/**
* Get query parameters.
*/
+ @Marshallable
Object[] parameters();
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartResponse.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartResponse.java
index f245f8f..a7ce03a 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartResponse.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryStartResponse.java
@@ -20,13 +20,14 @@ package org.apache.ignite.internal.sql.engine.message;
import java.io.Serializable;
import java.util.UUID;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
* QueryStartResponse interface.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
-@Transferable(value = SqlQueryMessageGroup.QUERY_START_RESPONSE, autoSerializable = false)
+@Transferable(value = SqlQueryMessageGroup.QUERY_START_RESPONSE)
public interface QueryStartResponse extends NetworkMessage, Serializable {
/**
* Get query ID.
@@ -41,5 +42,6 @@ public interface QueryStartResponse extends NetworkMessage, Serializable {
/**
* Get error.
*/
+ @Marshallable
Throwable error();
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
index 1eddc47..ebe2348 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
@@ -65,18 +65,21 @@ import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.message.TxMessagesSerializationRegistryInitializer;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.jetbrains.annotations.NotNull;
@@ -96,6 +99,13 @@ import org.mockito.junit.jupiter.MockitoExtension;
public class ItInternalTableScanTest {
private static final TestScaleCubeClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
+ private static final MessageSerializationRegistryImpl SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
+
+ static {
+ RaftMessagesSerializationRegistryInitializer.registerFactories(SERIALIZATION_REGISTRY);
+ TxMessagesSerializationRegistryInitializer.registerFactories(SERIALIZATION_REGISTRY);
+ }
+
private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
private static final String TEST_TABLE_NAME = "testTbl";
@@ -139,7 +149,8 @@ public class ItInternalTableScanTest {
testInfo,
20_000,
new StaticNodeFinder(List.of(nodeNetworkAddress)),
- NETWORK_FACTORY
+ NETWORK_FACTORY,
+ SERIALIZATION_REGISTRY
);
network.start();
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 330de6c..9420cf2 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.table.distributed.storage.VersionedRowStore;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.message.TxMessagesSerializationRegistryInitializer;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
@@ -71,6 +72,10 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
private static final Row SECOND_VALUE = createKeyValueRow(1, 1);
+ static {
+ TxMessagesSerializationRegistryInitializer.registerFactories(ItAbstractListenerSnapshotTest.SERIALIZATION_REGISTRY);
+ }
+
/**
* Paths for created partition listeners.
*/
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 46c22de..d4b3512 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -49,11 +49,13 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.message.TxMessagesSerializationRegistryInitializer;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lang.IgniteUuidGenerator;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.MessageSerializationRegistryImpl;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.NodeFinder;
import org.apache.ignite.network.StaticNodeFinder;
@@ -61,6 +63,7 @@ import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.RaftMessagesSerializationRegistryInitializer;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.table.Table;
import org.apache.ignite.utils.ClusterServiceTestUtils;
@@ -79,6 +82,13 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
private static final TestScaleCubeClusterServiceFactory NETWORK_FACTORY = new TestScaleCubeClusterServiceFactory();
+ private static final MessageSerializationRegistryImpl SERIALIZATION_REGISTRY = new MessageSerializationRegistryImpl();
+
+ static {
+ RaftMessagesSerializationRegistryInitializer.registerFactories(SERIALIZATION_REGISTRY);
+ TxMessagesSerializationRegistryInitializer.registerFactories(SERIALIZATION_REGISTRY);
+ }
+
private ClusterService client;
protected Map<ClusterNode, Loza> raftServers;
@@ -377,7 +387,8 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
testInfo,
port,
nodeFinder,
- NETWORK_FACTORY
+ NETWORK_FACTORY,
+ SERIALIZATION_REGISTRY
);
network.start();
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishRequest.java
index b7a2270..de3554d 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishRequest.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishRequest.java
@@ -21,18 +21,20 @@ import java.io.Serializable;
import java.util.Set;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Marshallable;
import org.apache.ignite.network.annotations.Transferable;
/**
* Submit an action to a replication group.
*/
-@Transferable(value = TxMessageGroup.TX_FINISH_REQUEST, autoSerializable = false)
+@Transferable(value = TxMessageGroup.TX_FINISH_REQUEST)
public interface TxFinishRequest extends NetworkMessage, Serializable {
/**
* Returns the timestamp.
*
* @return The timestamp.
*/
+ @Marshallable
Timestamp timestamp();
/**
@@ -47,5 +49,6 @@ public interface TxFinishRequest extends NetworkMessage, Serializable {
*
* @return Enlisted partition groups.
*/
+ @Marshallable
Set<String> groups();
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishResponse.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishResponse.java
index 51c4cb3..eec2522 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishResponse.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxFinishResponse.java
@@ -25,7 +25,7 @@ import org.jetbrains.annotations.Nullable;
/**
* The result of an action.
*/
-@Transferable(value = TxMessageGroup.TX_FINISH_RESPONSE, autoSerializable = false)
+@Transferable(value = TxMessageGroup.TX_FINISH_RESPONSE)
public interface TxFinishResponse extends NetworkMessage, Serializable {
/**
* Returns the error message.