You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/10/20 09:26:54 UTC
[ignite-3] branch main updated: IGNITE-17906 Optimized marshaller implemented (#1214)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8761b64ee6 IGNITE-17906 Optimized marshaller implemented (#1214)
8761b64ee6 is described below
commit 8761b64ee67fdcce5059b43f87b2e3b8c242a843
Author: Ivan Bessonov <be...@gmail.com>
AuthorDate: Thu Oct 20 12:26:49 2022 +0300
IGNITE-17906 Optimized marshaller implemented (#1214)
---
.../network/direct/DirectMessageReader.java | 41 ++++---
.../network/direct/DirectMessageWriter.java | 41 ++++---
.../stream/DirectByteBufferStreamImplV1.java | 72 +++++-------
.../internal/network/netty/InboundDecoder.java | 2 +-
.../internal/network/netty/OutboundEncoder.java | 2 +-
.../PerSessionSerializationService.java | 8 ++
.../serialization/SerializationService.java | 7 ++
.../internal/network/netty/InboundDecoderTest.java | 4 +-
.../network/serialization/MarshallableTest.java | 8 --
.../internal/raft/util/OptimizedMarshaller.java | 127 +++++++++++++++++++++
.../ignite/internal/raft/util/OptimizedStream.java | 109 ++++++++++++++++++
.../raft/util/ThreadLocalOptimizedMarshaller.java | 48 ++++++++
.../apache/ignite/raft/jraft/rpc/RpcRequests.java | 1 -
13 files changed, 381 insertions(+), 89 deletions(-)
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java
index 84a5a060a8..fbfecd1ec0 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java
@@ -28,10 +28,10 @@ import org.apache.ignite.internal.network.direct.state.DirectMessageState;
import org.apache.ignite.internal.network.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStream;
import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1;
-import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageReader;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.jetbrains.annotations.Nullable;
@@ -48,14 +48,14 @@ public class DirectMessageReader implements MessageReader {
/**
* Constructor.
*
- * @param serializationService Serialization service.
- * @param protoVer Protocol version.
+ * @param serializationRegistry Serialization registry.
+ * @param protoVer Protocol version.
*/
public DirectMessageReader(
- PerSessionSerializationService serializationService,
+ MessageSerializationRegistry serializationRegistry,
byte protoVer
) {
- state = new DirectMessageState<>(StateItem.class, () -> new StateItem(serializationService, protoVer));
+ state = new DirectMessageState<>(StateItem.class, () -> new StateItem(createStream(serializationRegistry, protoVer)));
}
/** {@inheritDoc} */
@@ -459,6 +459,22 @@ public class DirectMessageReader implements MessageReader {
state.reset();
}
+ /**
+ * Returns a stream to read message fields recursively.
+ *
+ * @param serializationRegistry Serialization registry.
+ * @param protoVer Protocol version.
+ */
+ protected DirectByteBufferStream createStream(MessageSerializationRegistry serializationRegistry, byte protoVer) {
+ switch (protoVer) {
+ case 1:
+ return new DirectByteBufferStreamImplV1(serializationRegistry);
+
+ default:
+ throw new IllegalStateException("Invalid protocol version: " + protoVer);
+ }
+ }
+
/**
* State item.
*/
@@ -472,19 +488,10 @@ public class DirectMessageReader implements MessageReader {
/**
* Constructor.
*
- * @param serializationService Serialization service.
- * @param protoVer Protocol version.
+ * @param stream Direct byte buffer stream.
*/
- StateItem(PerSessionSerializationService serializationService, byte protoVer) {
- switch (protoVer) {
- case 1:
- stream = new DirectByteBufferStreamImplV1(serializationService);
-
- break;
-
- default:
- throw new IllegalStateException("Invalid protocol version: " + protoVer);
- }
+ StateItem(DirectByteBufferStream stream) {
+ this.stream = stream;
}
/** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java
index da7f165868..04fc3a773d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java
@@ -28,9 +28,9 @@ import org.apache.ignite.internal.network.direct.state.DirectMessageState;
import org.apache.ignite.internal.network.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStream;
import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1;
-import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.network.serialization.MessageWriter;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.jetbrains.annotations.Nullable;
@@ -45,11 +45,11 @@ public class DirectMessageWriter implements MessageWriter {
/**
* Constructor.
*
- * @param serializationService Serialization service.
- * @param protoVer Protocol version.
+ * @param serializationRegistry Serialization registry.
+ * @param protoVer Protocol version.
*/
- public DirectMessageWriter(PerSessionSerializationService serializationService, byte protoVer) {
- state = new DirectMessageState<>(StateItem.class, () -> new StateItem(serializationService, protoVer));
+ public DirectMessageWriter(MessageSerializationRegistry serializationRegistry, byte protoVer) {
+ state = new DirectMessageState<>(StateItem.class, () -> new StateItem(createStream(serializationRegistry, protoVer)));
}
/** {@inheritDoc} */
@@ -418,6 +418,22 @@ public class DirectMessageWriter implements MessageWriter {
state.reset();
}
+ /**
+ * Returns a stream to write message fields recursively.
+ *
+ * @param serializationRegistry Serialization registry.
+ * @param protoVer Protocol version.
+ */
+ protected DirectByteBufferStream createStream(MessageSerializationRegistry serializationRegistry, byte protoVer) {
+ switch (protoVer) {
+ case 1:
+ return new DirectByteBufferStreamImplV1(serializationRegistry);
+
+ default:
+ throw new IllegalStateException("Invalid protocol version: " + protoVer);
+ }
+ }
+
/**
* State item.
*/
@@ -439,19 +455,10 @@ public class DirectMessageWriter implements MessageWriter {
/**
* Constructor.
*
- * @param serializationService Serialization service.
- * @param protoVer Protocol version.
+ * @param stream Direct byte buffer stream.
*/
- StateItem(PerSessionSerializationService serializationService, byte protoVer) {
- switch (protoVer) {
- case 1:
- stream = new DirectByteBufferStreamImplV1(serializationService);
-
- break;
-
- default:
- throw new IllegalStateException("Invalid protocol version: " + protoVer);
- }
+ StateItem(DirectByteBufferStream stream) {
+ this.stream = stream;
}
/** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
index 88bc1aab91..4d79a0580d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
@@ -49,7 +49,6 @@ import java.util.RandomAccess;
import java.util.Set;
import java.util.UUID;
import java.util.function.IntFunction;
-import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
import org.apache.ignite.internal.util.ArrayFactory;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -57,6 +56,7 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageDeserializer;
import org.apache.ignite.network.serialization.MessageReader;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.network.serialization.MessageSerializer;
import org.apache.ignite.network.serialization.MessageWriter;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -70,19 +70,19 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
private static final Object NULL = new Object();
/** Flag that indicates that byte buffer is not null. */
- private static final byte BYTE_BUFFER_NOT_NULL_FLAG = 1;
+ protected static final byte BYTE_BUFFER_NOT_NULL_FLAG = 1;
/** Flag that indicates that byte buffer has Big Endinan order. */
- private static final byte BYTE_BUFFER_BIG_ENDIAN_FLAG = 2;
+ protected static final byte BYTE_BUFFER_BIG_ENDIAN_FLAG = 2;
/** Message serialization registry. */
- private final PerSessionSerializationService serializationService;
+ private final MessageSerializationRegistry serializationRegistry;
- private ByteBuffer buf;
+ protected ByteBuffer buf;
- private byte[] heapArr;
+ protected byte[] heapArr;
- private long baseOff;
+ protected long baseOff;
private int arrOff = -1;
@@ -163,10 +163,10 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
/**
* Constructor.
*
- * @param serializationService Serialization service. .
+ * @param serializationRegistry Serialization service. .
*/
- public DirectByteBufferStreamImplV1(PerSessionSerializationService serializationService) {
- this.serializationService = serializationService;
+ public DirectByteBufferStreamImplV1(MessageSerializationRegistry serializationRegistry) {
+ this.serializationRegistry = serializationRegistry;
}
/** {@inheritDoc} */
@@ -234,11 +234,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
lastFinished = buf.remaining() >= 5;
if (lastFinished) {
- if (val == Integer.MAX_VALUE) {
- val = Integer.MIN_VALUE;
- } else {
- val++;
- }
+ val++;
int pos = buf.position();
@@ -262,11 +258,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
lastFinished = buf.remaining() >= 10;
if (lastFinished) {
- if (val == Long.MAX_VALUE) {
- val = Long.MIN_VALUE;
- } else {
- val++;
- }
+ val++;
int pos = buf.position();
@@ -657,7 +649,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
writer.setCurrentWriteClass(msg.getClass());
if (msgSerializer == null) {
- msgSerializer = serializationService.createMessageSerializer(msg.groupType(), msg.messageType());
+ msgSerializer = serializationRegistry.createSerializer(msg.groupType(), msg.messageType());
}
writer.setBuffer(buf);
@@ -887,25 +879,21 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
int val = 0;
- while (buf.hasRemaining()) {
- int pos = buf.position();
+ int pos = buf.position();
+
+ int limit = buf.limit();
+ while (pos < limit) {
byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
- buf.position(pos + 1);
+ pos++;
prim |= ((long) b & 0x7F) << (7 * primShift);
if ((b & 0x80) == 0) {
lastFinished = true;
- val = (int) prim;
-
- if (val == Integer.MIN_VALUE) {
- val = Integer.MAX_VALUE;
- } else {
- val--;
- }
+ val = (int) prim - 1;
prim = 0;
primShift = 0;
@@ -916,6 +904,8 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
}
}
+ buf.position(pos);
+
return val;
}
@@ -926,25 +916,21 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
long val = 0;
- while (buf.hasRemaining()) {
- int pos = buf.position();
+ int pos = buf.position();
+
+ int limit = buf.limit();
+ while (pos < limit) {
byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
- buf.position(pos + 1);
+ pos++;
prim |= ((long) b & 0x7F) << (7 * primShift);
if ((b & 0x80) == 0) {
lastFinished = true;
- val = prim;
-
- if (val == Long.MIN_VALUE) {
- val = Long.MAX_VALUE;
- } else {
- val--;
- }
+ val = prim - 1;
prim = 0;
primShift = 0;
@@ -955,6 +941,8 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
}
}
+ buf.position(pos);
+
return val;
}
@@ -1291,7 +1279,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
return null;
}
- msgDeserializer = serializationService.createMessageDeserializer(msgGroupType, msgType);
+ msgDeserializer = serializationRegistry.createDeserializer(msgGroupType, msgType);
}
// if the deserializer is not null then we have definitely finished parsing the header and can read the message
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
index c8d7a6d233..889d2a9841 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
@@ -71,7 +71,7 @@ public class InboundDecoder extends ByteToMessageDecoder {
MessageReader reader = readerAttr.get();
if (reader == null) {
- reader = new DirectMessageReader(serializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
+ reader = new DirectMessageReader(serializationService.serializationRegistry(), ConnectionManager.DIRECT_PROTOCOL_VERSION);
readerAttr.set(reader);
}
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index 6b8a66f779..31d2859462 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -114,7 +114,7 @@ public class OutboundEncoder extends MessageToMessageEncoder<OutNetworkObject> {
}
this.serializer = serializationService.createMessageSerializer(msg.groupType(), msg.messageType());
- this.writer = new DirectMessageWriter(serializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
+ this.writer = new DirectMessageWriter(serializationService.serializationRegistry(), ConnectionManager.DIRECT_PROTOCOL_VERSION);
}
/** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
index 0abae91902..44b0706d2a 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.serialization.MessageDeserializer;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.network.serialization.MessageSerializer;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -91,6 +92,13 @@ public class PerSessionSerializationService {
);
}
+ /**
+ * Returns underlying serialization registry.
+ */
+ public MessageSerializationRegistry serializationRegistry() {
+ return serializationService.serializationRegistry();
+ }
+
/**
* Creates a message serializer.
*
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/SerializationService.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/SerializationService.java
index 96f43aeb1a..a9873888c6 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/SerializationService.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/SerializationService.java
@@ -50,6 +50,13 @@ public class SerializationService {
this.descriptorFactory = userObjectSerializationContext.descriptorFactory();
}
+ /**
+ * Returns underlying serialization registry.
+ */
+ public MessageSerializationRegistry serializationRegistry() {
+ return messageRegistry;
+ }
+
/**
* Creates a message serializer.
*
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
index 1291f8d5bb..34514078da 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
@@ -100,7 +100,7 @@ public class InboundDecoderTest {
var perSessionSerializationService = new PerSessionSerializationService(serializationService);
var channel = new EmbeddedChannel(new InboundDecoder(perSessionSerializationService));
- var writer = new DirectMessageWriter(perSessionSerializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
+ var writer = new DirectMessageWriter(registry, ConnectionManager.DIRECT_PROTOCOL_VERSION);
MessageSerializer<NetworkMessage> serializer = registry.createSerializer(msg.groupType(), msg.messageType());
@@ -177,7 +177,7 @@ public class InboundDecoderTest {
final var list = new ArrayList<>();
- var writer = new DirectMessageWriter(perSessionSerializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
+ var writer = new DirectMessageWriter(registry, ConnectionManager.DIRECT_PROTOCOL_VERSION);
var msg = new TestMessagesFactory().testMessage().msg("abcdefghijklmn").build();
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
index baca7682ab..3e41de8e77 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
@@ -42,20 +42,16 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.ignite.internal.network.direct.DirectMessageWriter;
import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
import org.apache.ignite.internal.network.messages.MessageWithMarshallable;
import org.apache.ignite.internal.network.messages.TestMessagesFactory;
-import org.apache.ignite.internal.network.netty.ConnectionManager;
import org.apache.ignite.internal.network.netty.InboundDecoder;
import org.apache.ignite.internal.network.netty.OutboundEncoder;
import org.apache.ignite.internal.network.serialization.marshal.MarshalException;
import org.apache.ignite.internal.network.serialization.marshal.MarshalledObject;
import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
-import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.OutNetworkObject;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
-import org.apache.ignite.network.serialization.MessageSerializer;
import org.apache.ignite.network.serialization.TestMessageSerializationRegistryImpl;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
@@ -91,16 +87,12 @@ public class MarshallableTest {
private ByteBuffer write(Map<String, SimpleSerializableObject> testMap) throws Exception {
var serializers = new Serialization();
- var writer = new DirectMessageWriter(serializers.perSessionSerializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
-
MessageWithMarshallable msg = msgFactory.messageWithMarshallable().marshallableMap(testMap).build();
IntSet ids = new IntOpenHashSet();
msg.prepareMarshal(ids, serializers.userObjectSerializer);
- MessageSerializer<NetworkMessage> serializer = registry.createSerializer(msg.groupType(), msg.messageType());
-
var channel = new EmbeddedChannel(
new ChunkedWriteHandler(),
new OutboundEncoder(serializers.perSessionSerializationService)
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
new file mode 100644
index 0000000000..b704cef947
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.util;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import org.apache.ignite.internal.network.direct.DirectMessageReader;
+import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.serialization.MessageReader;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.network.serialization.MessageWriter;
+import org.apache.ignite.raft.jraft.util.Marshaller;
+
+/**
+ * Marshaller implementation that uses a {@link DirectByteBufferStream} variant to serialize/deserialize data.
+ */
+public class OptimizedMarshaller implements Marshaller {
+ /** Protocol version. */
+ private static final byte PROTO_VER = 1;
+
+ /** Default buffer size. */
+ private static final int DEFAULT_BUFFER_SIZE = 1024;
+
+ /** Byte buffer order. */
+ private static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
+
+ /** Buffer to write data. */
+ private ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(ORDER);
+
+ /** Direct byte-buffer stream instance. */
+ private final OptimizedStream stream;
+
+ /** Message writer. */
+ private final MessageWriter messageWriter;
+
+ /** Message reader. */
+ private final MessageReader messageReader;
+
+ /**
+ * Constructor.
+ *
+ * @param serializationRegistry Serialization registry.
+ */
+ public OptimizedMarshaller(MessageSerializationRegistry serializationRegistry) {
+ stream = new OptimizedStream(serializationRegistry);
+
+ messageWriter = new DirectMessageWriter(serializationRegistry, PROTO_VER) {
+ @Override
+ protected DirectByteBufferStreamImplV1 createStream(MessageSerializationRegistry serializationRegistry, byte protoVer) {
+ assert protoVer == PROTO_VER : protoVer;
+
+ return new OptimizedStream(serializationRegistry);
+ }
+ };
+
+ messageReader = new DirectMessageReader(serializationRegistry, PROTO_VER) {
+ @Override
+ protected DirectByteBufferStream createStream(MessageSerializationRegistry serializationRegistry, byte protoVer) {
+ assert protoVer == PROTO_VER : protoVer;
+
+ return new OptimizedStream(serializationRegistry);
+ }
+ };
+ }
+
+ @Override
+ public byte[] marshall(Object o) {
+ assert o instanceof NetworkMessage;
+
+ NetworkMessage message = (NetworkMessage) o;
+
+ buffer.position(0);
+
+ while (true) {
+ stream.setBuffer(buffer);
+
+ stream.writeMessage(message, messageWriter);
+
+ if (stream.lastFinished()) {
+ break;
+ }
+
+ buffer = expandBuffer(buffer);
+ }
+
+ return Arrays.copyOf(buffer.array(), buffer.position());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T unmarshall(byte[] bytes) {
+ stream.setBuffer(ByteBuffer.wrap(bytes).order(ORDER));
+
+ return stream.readMessage(messageReader);
+ }
+
+ /**
+ * Creates a bigger copy of the buffer.
+ *
+ * @param buffer Smaller byte buffer.
+ * @return Bigger byte buffer.
+ */
+ private ByteBuffer expandBuffer(ByteBuffer buffer) {
+ byte[] newArray = Arrays.copyOf(buffer.array(), (int) (buffer.capacity() * 1.5));
+
+ return ByteBuffer.wrap(newArray).position(buffer.position()).order(ORDER);
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedStream.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedStream.java
new file mode 100644
index 0000000000..9313a5a868
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedStream.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.util;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+
+/**
+ * Direct byte-buffer stream implementation that contains specific optimizations for optimized marshaller.
+ */
+class OptimizedStream extends DirectByteBufferStreamImplV1 {
+ /**
+ * Constructor.
+ *
+ * @param serializationRegistry Serialization registry.
+ */
+ public OptimizedStream(MessageSerializationRegistry serializationRegistry) {
+ super(serializationRegistry);
+
+ lastFinished = true;
+ }
+
+ @Override
+ public void writeShort(short val) {
+ // Convert short "-1" to int "-1" to preserve the optimization that stores "-1" as "0".
+ // Every other short is coded as a positive integer to avoid two extra 0xFF bytes.
+ writeInt(Short.toUnsignedInt((short) (val + 1)) - 1);
+ }
+
+ @Override
+ public short readShort() {
+ return (short) readLong();
+ }
+
+ @Override
+ public int readInt() {
+ return (int) readLong();
+ }
+
+ @Override
+ public long readLong() {
+ long res = 0;
+
+ int pos = buf.position();
+
+ for (int shift = 0; ; shift += 7) {
+ byte b = GridUnsafe.getByte(heapArr, baseOff + pos++);
+
+ res |= (b & 0x7FL) << shift;
+
+ if (b >= 0) {
+ break;
+ }
+ }
+
+ buf.position(pos);
+
+ return res - 1;
+ }
+
+ @Override
+ public ByteBuffer readByteBuffer() {
+ assert buf.hasArray();
+
+ byte flag = readByte();
+
+ if ((flag & BYTE_BUFFER_NOT_NULL_FLAG) == 0) {
+ return null;
+ }
+
+ int length = readInt();
+
+ int oldLimit = buf.limit();
+
+ try {
+ ByteBuffer val = buf.limit(buf.position() + length).slice();
+
+ buf.position(buf.limit());
+
+ if ((flag & BYTE_BUFFER_BIG_ENDIAN_FLAG) == 0) {
+ val.order(ByteOrder.LITTLE_ENDIAN);
+ } else {
+ val.order(ByteOrder.BIG_ENDIAN);
+ }
+
+ return val;
+ } finally {
+ buf.limit(oldLimit);
+ }
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
new file mode 100644
index 0000000000..9c2464f0d3
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.util;
+
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.jraft.util.Marshaller;
+
+/**
+ * Thread-safe variant of {@link OptimizedMarshaller}.
+ */
+public class ThreadLocalOptimizedMarshaller implements Marshaller {
+ /** Thread-local optimized marshaller holder. Not static, because it depends on serialization registry. */
+ private final ThreadLocal<Marshaller> marshaller;
+
+ /**
+ * Constructor.
+ *
+ * @param serializationRegistry Serialization registry.
+ */
+ public ThreadLocalOptimizedMarshaller(MessageSerializationRegistry serializationRegistry) {
+ marshaller = ThreadLocal.withInitial(() -> new OptimizedMarshaller(serializationRegistry));
+ }
+
+ @Override
+ public byte[] marshall(Object o) {
+ return marshaller.get().marshall(o);
+ }
+
+ @Override
+ public <T> T unmarshall(byte[] bytes) {
+ return marshaller.get().unmarshall(bytes);
+ }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
index f0d0571922..f0592bcad0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
@@ -88,7 +88,6 @@ public final class RpcRequests {
long term();
- @Marshallable
RaftOutter.SnapshotMeta meta();
String uri();