You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/10/20 09:26:54 UTC

[ignite-3] branch main updated: IGNITE-17906 Optimized marshaller implemented (#1214)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8761b64ee6 IGNITE-17906 Optimized marshaller implemented (#1214)
8761b64ee6 is described below

commit 8761b64ee67fdcce5059b43f87b2e3b8c242a843
Author: Ivan Bessonov <be...@gmail.com>
AuthorDate: Thu Oct 20 12:26:49 2022 +0300

    IGNITE-17906 Optimized marshaller implemented (#1214)
---
 .../network/direct/DirectMessageReader.java        |  41 ++++---
 .../network/direct/DirectMessageWriter.java        |  41 ++++---
 .../stream/DirectByteBufferStreamImplV1.java       |  72 +++++-------
 .../internal/network/netty/InboundDecoder.java     |   2 +-
 .../internal/network/netty/OutboundEncoder.java    |   2 +-
 .../PerSessionSerializationService.java            |   8 ++
 .../serialization/SerializationService.java        |   7 ++
 .../internal/network/netty/InboundDecoderTest.java |   4 +-
 .../network/serialization/MarshallableTest.java    |   8 --
 .../internal/raft/util/OptimizedMarshaller.java    | 127 +++++++++++++++++++++
 .../ignite/internal/raft/util/OptimizedStream.java | 109 ++++++++++++++++++
 .../raft/util/ThreadLocalOptimizedMarshaller.java  |  48 ++++++++
 .../apache/ignite/raft/jraft/rpc/RpcRequests.java  |   1 -
 13 files changed, 381 insertions(+), 89 deletions(-)

diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java
index 84a5a060a8..fbfecd1ec0 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageReader.java
@@ -28,10 +28,10 @@ import org.apache.ignite.internal.network.direct.state.DirectMessageState;
 import org.apache.ignite.internal.network.direct.state.DirectMessageStateItem;
 import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStream;
 import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1;
-import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.serialization.MessageReader;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.jetbrains.annotations.Nullable;
 
@@ -48,14 +48,14 @@ public class DirectMessageReader implements MessageReader {
     /**
      * Constructor.
      *
-     * @param serializationService  Serialization service.
-     * @param protoVer              Protocol version.
+     * @param serializationRegistry Serialization registry.
+     * @param protoVer Protocol version.
      */
     public DirectMessageReader(
-            PerSessionSerializationService serializationService,
+            MessageSerializationRegistry serializationRegistry,
             byte protoVer
     ) {
-        state = new DirectMessageState<>(StateItem.class, () -> new StateItem(serializationService, protoVer));
+        state = new DirectMessageState<>(StateItem.class, () -> new StateItem(createStream(serializationRegistry, protoVer)));
     }
 
     /** {@inheritDoc} */
@@ -459,6 +459,22 @@ public class DirectMessageReader implements MessageReader {
         state.reset();
     }
 
+    /**
+     * Returns a stream to read message fields recursively.
+     *
+     * @param serializationRegistry Serialization registry.
+     * @param protoVer Protocol version.
+     */
+    protected DirectByteBufferStream createStream(MessageSerializationRegistry serializationRegistry, byte protoVer) {
+        switch (protoVer) {
+            case 1:
+                return new DirectByteBufferStreamImplV1(serializationRegistry);
+
+            default:
+                throw new IllegalStateException("Invalid protocol version: " + protoVer);
+        }
+    }
+
     /**
      * State item.
      */
@@ -472,19 +488,10 @@ public class DirectMessageReader implements MessageReader {
         /**
          * Constructor.
          *
-         * @param serializationService Serialization service.
-         * @param protoVer              Protocol version.
+         * @param stream Direct byte buffer stream.
          */
-        StateItem(PerSessionSerializationService serializationService, byte protoVer) {
-            switch (protoVer) {
-                case 1:
-                    stream = new DirectByteBufferStreamImplV1(serializationService);
-
-                    break;
-
-                default:
-                    throw new IllegalStateException("Invalid protocol version: " + protoVer);
-            }
+        StateItem(DirectByteBufferStream stream) {
+            this.stream = stream;
         }
 
         /** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java
index da7f165868..04fc3a773d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/DirectMessageWriter.java
@@ -28,9 +28,9 @@ import org.apache.ignite.internal.network.direct.state.DirectMessageState;
 import org.apache.ignite.internal.network.direct.state.DirectMessageStateItem;
 import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStream;
 import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1;
-import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.network.serialization.MessageWriter;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.jetbrains.annotations.Nullable;
@@ -45,11 +45,11 @@ public class DirectMessageWriter implements MessageWriter {
     /**
      * Constructor.
      *
-     * @param serializationService  Serialization service.
-     * @param protoVer              Protocol version.
+     * @param serializationRegistry Serialization registry.
+     * @param protoVer Protocol version.
      */
-    public DirectMessageWriter(PerSessionSerializationService serializationService, byte protoVer) {
-        state = new DirectMessageState<>(StateItem.class, () -> new StateItem(serializationService, protoVer));
+    public DirectMessageWriter(MessageSerializationRegistry serializationRegistry, byte protoVer) {
+        state = new DirectMessageState<>(StateItem.class, () -> new StateItem(createStream(serializationRegistry, protoVer)));
     }
 
     /** {@inheritDoc} */
@@ -418,6 +418,22 @@ public class DirectMessageWriter implements MessageWriter {
         state.reset();
     }
 
+    /**
+     * Returns a stream to write message fields recursively.
+     *
+     * @param serializationRegistry Serialization registry.
+     * @param protoVer Protocol version.
+     */
+    protected DirectByteBufferStream createStream(MessageSerializationRegistry serializationRegistry, byte protoVer) {
+        switch (protoVer) {
+            case 1:
+                return new DirectByteBufferStreamImplV1(serializationRegistry);
+
+            default:
+                throw new IllegalStateException("Invalid protocol version: " + protoVer);
+        }
+    }
+
     /**
      * State item.
      */
@@ -439,19 +455,10 @@ public class DirectMessageWriter implements MessageWriter {
         /**
          * Constructor.
          *
-         * @param serializationService Serialization service.
-         * @param protoVer             Protocol version.
+         * @param stream Direct byte buffer stream.
          */
-        StateItem(PerSessionSerializationService serializationService, byte protoVer) {
-            switch (protoVer) {
-                case 1:
-                    stream = new DirectByteBufferStreamImplV1(serializationService);
-
-                    break;
-
-                default:
-                    throw new IllegalStateException("Invalid protocol version: " + protoVer);
-            }
+        StateItem(DirectByteBufferStream stream) {
+            this.stream = stream;
         }
 
         /** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
index 88bc1aab91..4d79a0580d 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java
@@ -49,7 +49,6 @@ import java.util.RandomAccess;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.IntFunction;
-import org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
 import org.apache.ignite.internal.util.ArrayFactory;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -57,6 +56,7 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.serialization.MessageDeserializer;
 import org.apache.ignite.network.serialization.MessageReader;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.network.serialization.MessageSerializer;
 import org.apache.ignite.network.serialization.MessageWriter;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -70,19 +70,19 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     private static final Object NULL = new Object();
 
     /** Flag that indicates that byte buffer is not null. */
-    private static final byte BYTE_BUFFER_NOT_NULL_FLAG = 1;
+    protected static final byte BYTE_BUFFER_NOT_NULL_FLAG = 1;
 
     /** Flag that indicates that byte buffer has Big Endinan order. */
-    private static final byte BYTE_BUFFER_BIG_ENDIAN_FLAG = 2;
+    protected static final byte BYTE_BUFFER_BIG_ENDIAN_FLAG = 2;
 
     /** Message serialization registry. */
-    private final PerSessionSerializationService serializationService;
+    private final MessageSerializationRegistry serializationRegistry;
 
-    private ByteBuffer buf;
+    protected ByteBuffer buf;
 
-    private byte[] heapArr;
+    protected byte[] heapArr;
 
-    private long baseOff;
+    protected long baseOff;
 
     private int arrOff = -1;
 
@@ -163,10 +163,10 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
     /**
      * Constructor.
      *
-     * @param serializationService Serialization service.       .
+     * @param serializationRegistry Serialization service.       .
      */
-    public DirectByteBufferStreamImplV1(PerSessionSerializationService serializationService) {
-        this.serializationService = serializationService;
+    public DirectByteBufferStreamImplV1(MessageSerializationRegistry serializationRegistry) {
+        this.serializationRegistry = serializationRegistry;
     }
 
     /** {@inheritDoc} */
@@ -234,11 +234,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
         lastFinished = buf.remaining() >= 5;
 
         if (lastFinished) {
-            if (val == Integer.MAX_VALUE) {
-                val = Integer.MIN_VALUE;
-            } else {
-                val++;
-            }
+            val++;
 
             int pos = buf.position();
 
@@ -262,11 +258,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
         lastFinished = buf.remaining() >= 10;
 
         if (lastFinished) {
-            if (val == Long.MAX_VALUE) {
-                val = Long.MIN_VALUE;
-            } else {
-                val++;
-            }
+            val++;
 
             int pos = buf.position();
 
@@ -657,7 +649,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
                     writer.setCurrentWriteClass(msg.getClass());
 
                     if (msgSerializer == null) {
-                        msgSerializer = serializationService.createMessageSerializer(msg.groupType(), msg.messageType());
+                        msgSerializer = serializationRegistry.createSerializer(msg.groupType(), msg.messageType());
                     }
 
                     writer.setBuffer(buf);
@@ -887,25 +879,21 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
 
         int val = 0;
 
-        while (buf.hasRemaining()) {
-            int pos = buf.position();
+        int pos = buf.position();
+
+        int limit = buf.limit();
 
+        while (pos < limit) {
             byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
 
-            buf.position(pos + 1);
+            pos++;
 
             prim |= ((long) b & 0x7F) << (7 * primShift);
 
             if ((b & 0x80) == 0) {
                 lastFinished = true;
 
-                val = (int) prim;
-
-                if (val == Integer.MIN_VALUE) {
-                    val = Integer.MAX_VALUE;
-                } else {
-                    val--;
-                }
+                val = (int) prim - 1;
 
                 prim = 0;
                 primShift = 0;
@@ -916,6 +904,8 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
             }
         }
 
+        buf.position(pos);
+
         return val;
     }
 
@@ -926,25 +916,21 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
 
         long val = 0;
 
-        while (buf.hasRemaining()) {
-            int pos = buf.position();
+        int pos = buf.position();
+
+        int limit = buf.limit();
 
+        while (pos < limit) {
             byte b = GridUnsafe.getByte(heapArr, baseOff + pos);
 
-            buf.position(pos + 1);
+            pos++;
 
             prim |= ((long) b & 0x7F) << (7 * primShift);
 
             if ((b & 0x80) == 0) {
                 lastFinished = true;
 
-                val = prim;
-
-                if (val == Long.MIN_VALUE) {
-                    val = Long.MAX_VALUE;
-                } else {
-                    val--;
-                }
+                val = prim - 1;
 
                 prim = 0;
                 primShift = 0;
@@ -955,6 +941,8 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
             }
         }
 
+        buf.position(pos);
+
         return val;
     }
 
@@ -1291,7 +1279,7 @@ public class DirectByteBufferStreamImplV1 implements DirectByteBufferStream {
                 return null;
             }
 
-            msgDeserializer = serializationService.createMessageDeserializer(msgGroupType, msgType);
+            msgDeserializer = serializationRegistry.createDeserializer(msgGroupType, msgType);
         }
 
         // if the deserializer is not null then we have definitely finished parsing the header and can read the message
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
index c8d7a6d233..889d2a9841 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundDecoder.java
@@ -71,7 +71,7 @@ public class InboundDecoder extends ByteToMessageDecoder {
         MessageReader reader = readerAttr.get();
 
         if (reader == null) {
-            reader = new DirectMessageReader(serializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
+            reader = new DirectMessageReader(serializationService.serializationRegistry(), ConnectionManager.DIRECT_PROTOCOL_VERSION);
             readerAttr.set(reader);
         }
 
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
index 6b8a66f779..31d2859462 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundEncoder.java
@@ -114,7 +114,7 @@ public class OutboundEncoder extends MessageToMessageEncoder<OutNetworkObject> {
             }
 
             this.serializer = serializationService.createMessageSerializer(msg.groupType(), msg.messageType());
-            this.writer = new DirectMessageWriter(serializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
+            this.writer = new DirectMessageWriter(serializationService.serializationRegistry(), ConnectionManager.DIRECT_PROTOCOL_VERSION);
         }
 
         /** {@inheritDoc} */
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
index 0abae91902..44b0706d2a 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/PerSessionSerializationService.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.message.FieldDescriptorMessage;
 import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.serialization.MessageDeserializer;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
 import org.apache.ignite.network.serialization.MessageSerializer;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
@@ -91,6 +92,13 @@ public class PerSessionSerializationService {
         );
     }
 
+    /**
+     * Returns underlying serialization registry.
+     */
+    public MessageSerializationRegistry serializationRegistry() {
+        return serializationService.serializationRegistry();
+    }
+
     /**
      * Creates a message serializer.
      *
diff --git a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/SerializationService.java b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/SerializationService.java
index 96f43aeb1a..a9873888c6 100644
--- a/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/SerializationService.java
+++ b/modules/network/src/main/java/org/apache/ignite/internal/network/serialization/SerializationService.java
@@ -50,6 +50,13 @@ public class SerializationService {
         this.descriptorFactory = userObjectSerializationContext.descriptorFactory();
     }
 
+    /**
+     * Returns underlying serialization registry.
+     */
+    public MessageSerializationRegistry serializationRegistry() {
+        return messageRegistry;
+    }
+
     /**
      * Creates a message serializer.
      *
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
index 1291f8d5bb..34514078da 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/netty/InboundDecoderTest.java
@@ -100,7 +100,7 @@ public class InboundDecoderTest {
         var perSessionSerializationService = new PerSessionSerializationService(serializationService);
         var channel = new EmbeddedChannel(new InboundDecoder(perSessionSerializationService));
 
-        var writer = new DirectMessageWriter(perSessionSerializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
+        var writer = new DirectMessageWriter(registry, ConnectionManager.DIRECT_PROTOCOL_VERSION);
 
         MessageSerializer<NetworkMessage> serializer = registry.createSerializer(msg.groupType(), msg.messageType());
 
@@ -177,7 +177,7 @@ public class InboundDecoderTest {
 
         final var list = new ArrayList<>();
 
-        var writer = new DirectMessageWriter(perSessionSerializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
+        var writer = new DirectMessageWriter(registry, ConnectionManager.DIRECT_PROTOCOL_VERSION);
 
         var msg = new TestMessagesFactory().testMessage().msg("abcdefghijklmn").build();
 
diff --git a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
index baca7682ab..3e41de8e77 100644
--- a/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
+++ b/modules/network/src/test/java/org/apache/ignite/internal/network/serialization/MarshallableTest.java
@@ -42,20 +42,16 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import org.apache.ignite.internal.network.direct.DirectMessageWriter;
 import org.apache.ignite.internal.network.message.ClassDescriptorMessage;
 import org.apache.ignite.internal.network.messages.MessageWithMarshallable;
 import org.apache.ignite.internal.network.messages.TestMessagesFactory;
-import org.apache.ignite.internal.network.netty.ConnectionManager;
 import org.apache.ignite.internal.network.netty.InboundDecoder;
 import org.apache.ignite.internal.network.netty.OutboundEncoder;
 import org.apache.ignite.internal.network.serialization.marshal.MarshalException;
 import org.apache.ignite.internal.network.serialization.marshal.MarshalledObject;
 import org.apache.ignite.internal.network.serialization.marshal.UserObjectMarshaller;
-import org.apache.ignite.network.NetworkMessage;
 import org.apache.ignite.network.OutNetworkObject;
 import org.apache.ignite.network.serialization.MessageSerializationRegistry;
-import org.apache.ignite.network.serialization.MessageSerializer;
 import org.apache.ignite.network.serialization.TestMessageSerializationRegistryImpl;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
@@ -91,16 +87,12 @@ public class MarshallableTest {
     private ByteBuffer write(Map<String, SimpleSerializableObject> testMap) throws Exception {
         var serializers = new Serialization();
 
-        var writer = new DirectMessageWriter(serializers.perSessionSerializationService, ConnectionManager.DIRECT_PROTOCOL_VERSION);
-
         MessageWithMarshallable msg = msgFactory.messageWithMarshallable().marshallableMap(testMap).build();
 
         IntSet ids = new IntOpenHashSet();
 
         msg.prepareMarshal(ids, serializers.userObjectSerializer);
 
-        MessageSerializer<NetworkMessage> serializer = registry.createSerializer(msg.groupType(), msg.messageType());
-
         var channel = new EmbeddedChannel(
                 new ChunkedWriteHandler(),
                 new OutboundEncoder(serializers.perSessionSerializationService)
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
new file mode 100644
index 0000000000..b704cef947
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedMarshaller.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.util;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import org.apache.ignite.internal.network.direct.DirectMessageReader;
+import org.apache.ignite.internal.network.direct.DirectMessageWriter;
+import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.serialization.MessageReader;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.network.serialization.MessageWriter;
+import org.apache.ignite.raft.jraft.util.Marshaller;
+
+/**
+ * Marshaller implementation that uses a {@link DirectByteBufferStream} variant to serialize/deserialize data.
+ */
+public class OptimizedMarshaller implements Marshaller {
+    /** Protocol version. */
+    private static final byte PROTO_VER = 1;
+
+    /** Default buffer size. */
+    private static final int DEFAULT_BUFFER_SIZE = 1024;
+
+    /** Byte buffer order. */
+    private static final ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
+
+    /** Buffer to write data. */
+    private ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(ORDER);
+
+    /** Direct byte-buffer stream instance. */
+    private final OptimizedStream stream;
+
+    /** Message writer. */
+    private final MessageWriter messageWriter;
+
+    /** Message reader. */
+    private final MessageReader messageReader;
+
+    /**
+     * Constructor.
+     *
+     * @param serializationRegistry Serialization registry.
+     */
+    public OptimizedMarshaller(MessageSerializationRegistry serializationRegistry) {
+        stream = new OptimizedStream(serializationRegistry);
+
+        messageWriter = new DirectMessageWriter(serializationRegistry, PROTO_VER) {
+            @Override
+            protected DirectByteBufferStreamImplV1 createStream(MessageSerializationRegistry serializationRegistry, byte protoVer) {
+                assert protoVer == PROTO_VER : protoVer;
+
+                return new OptimizedStream(serializationRegistry);
+            }
+        };
+
+        messageReader = new DirectMessageReader(serializationRegistry, PROTO_VER) {
+            @Override
+            protected DirectByteBufferStream createStream(MessageSerializationRegistry serializationRegistry, byte protoVer) {
+                assert protoVer == PROTO_VER : protoVer;
+
+                return new OptimizedStream(serializationRegistry);
+            }
+        };
+    }
+
+    @Override
+    public byte[] marshall(Object o) {
+        assert o instanceof NetworkMessage;
+
+        NetworkMessage message = (NetworkMessage) o;
+
+        buffer.position(0);
+
+        while (true) {
+            stream.setBuffer(buffer);
+
+            stream.writeMessage(message, messageWriter);
+
+            if (stream.lastFinished()) {
+                break;
+            }
+
+            buffer = expandBuffer(buffer);
+        }
+
+        return Arrays.copyOf(buffer.array(), buffer.position());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T unmarshall(byte[] bytes) {
+        stream.setBuffer(ByteBuffer.wrap(bytes).order(ORDER));
+
+        return stream.readMessage(messageReader);
+    }
+
+    /**
+     * Creates a bigger copy of the buffer.
+     *
+     * @param buffer Smaller byte buffer.
+     * @return Bigger byte buffer.
+     */
+    private ByteBuffer expandBuffer(ByteBuffer buffer) {
+        byte[] newArray = Arrays.copyOf(buffer.array(), (int) (buffer.capacity() * 1.5));
+
+        return ByteBuffer.wrap(newArray).position(buffer.position()).order(ORDER);
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedStream.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedStream.java
new file mode 100644
index 0000000000..9313a5a868
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/OptimizedStream.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.util;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import org.apache.ignite.internal.network.direct.stream.DirectByteBufferStreamImplV1;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+
+/**
+ * Direct byte-buffer stream implementation that contains specific optimizations for optimized marshaller.
+ */
+class OptimizedStream extends DirectByteBufferStreamImplV1 {
+    /**
+     * Constructor.
+     *
+     * @param serializationRegistry Serialization registry.
+     */
+    public OptimizedStream(MessageSerializationRegistry serializationRegistry) {
+        super(serializationRegistry);
+
+        lastFinished = true;
+    }
+
+    @Override
+    public void writeShort(short val) {
+        // Convert short "-1" to int "-1" to preserve the optimization that stores "-1" as "0".
+        // Every other short is coded as a positive integer to avoid two extra 0xFF bytes.
+        writeInt(Short.toUnsignedInt((short) (val + 1)) - 1);
+    }
+
+    @Override
+    public short readShort() {
+        return (short) readLong();
+    }
+
+    @Override
+    public int readInt() {
+        return (int) readLong();
+    }
+
+    @Override
+    public long readLong() {
+        long res = 0;
+
+        int pos = buf.position();
+
+        for (int shift = 0; ; shift += 7) {
+            byte b = GridUnsafe.getByte(heapArr, baseOff + pos++);
+
+            res |= (b & 0x7FL) << shift;
+
+            if (b >= 0) {
+                break;
+            }
+        }
+
+        buf.position(pos);
+
+        return res - 1;
+    }
+
+    @Override
+    public ByteBuffer readByteBuffer() {
+        assert buf.hasArray();
+
+        byte flag = readByte();
+
+        if ((flag & BYTE_BUFFER_NOT_NULL_FLAG) == 0) {
+            return null;
+        }
+
+        int length = readInt();
+
+        int oldLimit = buf.limit();
+
+        try {
+            ByteBuffer val = buf.limit(buf.position() + length).slice();
+
+            buf.position(buf.limit());
+
+            if ((flag & BYTE_BUFFER_BIG_ENDIAN_FLAG) == 0) {
+                val.order(ByteOrder.LITTLE_ENDIAN);
+            } else {
+                val.order(ByteOrder.BIG_ENDIAN);
+            }
+
+            return val;
+        } finally {
+            buf.limit(oldLimit);
+        }
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
new file mode 100644
index 0000000000..9c2464f0d3
--- /dev/null
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/util/ThreadLocalOptimizedMarshaller.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.util;
+
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.apache.ignite.raft.jraft.util.Marshaller;
+
+/**
+ * Thread-safe variant of {@link OptimizedMarshaller}.
+ */
+public class ThreadLocalOptimizedMarshaller implements Marshaller {
+    /** Thread-local optimized marshaller holder. Not static, because it depends on serialization registry. */
+    private final ThreadLocal<Marshaller> marshaller;
+
+    /**
+     * Constructor.
+     *
+     * @param serializationRegistry Serialization registry.
+     */
+    public ThreadLocalOptimizedMarshaller(MessageSerializationRegistry serializationRegistry) {
+        marshaller = ThreadLocal.withInitial(() -> new OptimizedMarshaller(serializationRegistry));
+    }
+
+    @Override
+    public byte[] marshall(Object o) {
+        return marshaller.get().marshall(o);
+    }
+
+    @Override
+    public <T> T unmarshall(byte[] bytes) {
+        return marshaller.get().unmarshall(bytes);
+    }
+}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
index f0d0571922..f0592bcad0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/RpcRequests.java
@@ -88,7 +88,6 @@ public final class RpcRequests {
 
         long term();
 
-        @Marshallable
         RaftOutter.SnapshotMeta meta();
 
         String uri();